From 46e512321221b9ae01a513f04c80a8f73f20e0a0 Mon Sep 17 00:00:00 2001
From: bsw215583320 <baoshiwei121@163.com>
Date: 星期四, 07 十一月 2024 08:48:55 +0800
Subject: [PATCH] 增加设备信息获取mqtt主题和订阅,租户上传设备实时数据时,若服务端无相关设备信息,可通过此接口获取
---
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 46 +++++++++++++--
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 60 ++++++++++++++++++++
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 1
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 34 +++++++++--
4 files changed, 128 insertions(+), 13 deletions(-)
diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 0ad3087..55a1cfa 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -5,6 +5,13 @@
*/
public interface MqttConstant {
+ /**************************** MQTT瑙掕壊 start *************************************/
+ String ROLE_ADMIN = "admin";
+ String ROLE_USER = "user";
+
+
+ /**************************** MQTT瑙掕壊 end *************************************/
+
/**************************绯荤粺璁㈤槄strat*******************************/
//涓婄嚎璁㈤槄
String MQTT_TOPIC_ONLINE = "$SYS/brokers/+/clients/+/connected";
@@ -28,14 +35,7 @@
- /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/
-
- String TENANT_UP_PREFIX = "tenant/up";
- String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
-
-
- /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹甧nd*******************************/
@@ -49,7 +49,27 @@
String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
/**************************鏈嶅姟绔悜绉诲姩绔搷搴旀寚浠nd*******************************/
+ /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/
+
+ String TENANT_UP_PREFIX = "tenant/up";
+ String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
+
+ String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment";
+
+
+ /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹甧nd*******************************/
+
+ /**************************鏈嶅姟绔悜绉熸埛绔姹傛暟鎹畇tart*******************************/
+
+
+ String SERVICE_REQ_PREFIX = "service/req";
+ String SERVICE_REQ_EQUIPMENT = SERVICE_REQ_PREFIX + "/%s/equipment";
+
+ String SERVICE_REQ_EQU_TOPIC = SERVICE_REQ_PREFIX + "/+/equipment";
+
+
+ /**************************鏈嶅姟绔悜绉熸埛绔姹傛暟鎹甧nd*******************************/
//redis缂撳瓨
String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index b55069f..4871e16 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -104,6 +104,7 @@
case "user":
//鏅�氬鎴风鍙渶璁㈤槄鑷韩鐩稿叧娑堟伅
mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
+ mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX + "/" + mqttClientId.substring(mqttClientId.lastIndexOf("_")+1) + "/#");
System.err.println("user璁㈤槄" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
break;
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index e1ccd32..cb5861d 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -3,22 +3,30 @@
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.modules.dry.api.EmqxApi;
+import org.jeecg.modules.dry.entity.DryEquipment;
+import org.jeecg.modules.dry.entity.DryShop;
+import org.jeecg.modules.dry.service.IDryEquipmentService;
import org.jeecg.modules.dry.service.IDryRealTimeDataService;
+import org.jeecg.modules.dry.service.IDryShopService;
+import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
+@Slf4j
@Component
@Scope("prototype")
public class MqttSampleCallback implements MqttCallback {
@@ -35,6 +43,15 @@
@Autowired
private IDryRealTimeDataService realTimeDataService;
+
+
+ @Autowired
+ private IDryEquipmentService equipmentService;
+
+ @Autowired
+ private IDryShopService dryShopService;
+
+
@Override
@@ -174,7 +191,26 @@
}
break;
+ case MqttConstant.TENANT_UP_PREFIX_EQU:
+ try {
+ DryEquipment equipment = (DryEquipment) messageJson.get("equipment");
+ DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
+
+ if (dryEquipment == null) {
+ equipmentService.save(equipment);
+ }
+
+
+
+ // 鑾峰彇璁惧鎵�灞炶溅闂�
+ DryShop shop = (DryShop) messageJson.get("shop");
+ shop.setTenantId(equipment.getTenantId());
+ dryShopService.save(shop);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
}
}
@@ -218,6 +254,30 @@
break;
+ case MqttConstant.SERVICE_REQ_EQU_TOPIC:
+ log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
+ // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
+ String code = messageJson.getString("code");
+ DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(TenantContext.getTenant(),code);
+ // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
+ DryShop shop = dryShopService.getById(equipmentVo.getShopId());
+
+ JSONObject res = new JSONObject();
+
+ res.put("tenant", TenantContext.getTenant());
+ res.put("equipment", equipmentVo);
+ res.put("shop", shop);
+ try {
+ MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
+ // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ break;
+
}
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 5c6d723..1c222f0 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -8,16 +8,23 @@
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
+import org.apache.shiro.SecurityUtils;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.api.CommonAPI;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.CommonConstant;
+import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.system.util.JwtUtil;
+import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.dry.common.CacheConstants;
import org.jeecg.modules.dry.entity.*;
+import org.jeecg.modules.dry.mqtt.MqttUtil;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.socket.ServerHandler;
import org.jeecg.modules.dry.socket.SocketServerConfig;
@@ -25,9 +32,11 @@
import org.jeecg.modules.dry.vo.*;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import javax.security.auth.login.LoginContext;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
@@ -63,7 +72,11 @@
private String token;
+ @Value(value = "${jeecg.mqtt.role}")
+ private String role;
+ @Autowired
+ private MqttUtil mqttUtil;
public String getTemporaryToken() {
if (token == null) {
@@ -238,17 +251,23 @@
*/
private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
DryOrderVo orderVo;
- // 鏌ヨ鑽潗
- DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo);
+
// 鏌ヨ璁惧
DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo);
+ if (equ == null) {
+ log.error("鏈壘鍒拌澶囷細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid());
+ return null;
+ }
+ // 鏌ヨ鑽潗
+ DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo);
- // 鍒涘缓鏂板伐鍗�
- orderVo = new DryOrderVo(realTimeDataVo);
if (herbFormula == null) {
log.error("鏈壘鍒拌嵂鏉愶細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid());
return null;
}
+ // 鍒涘缓鏂板伐鍗�
+ orderVo = new DryOrderVo(realTimeDataVo);
+
orderVo.setHerbId(herbFormula.getId());
orderVo.setEquId(equ.getId());
DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
@@ -268,8 +287,23 @@
queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid());
DryEquipment one = equipmentService.getOne(queryWrapper);
if (one == null) {
- one = new DryEquipment(realTimeDataVo);
- equipmentService.save(one);
+ log.error(role+"淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�"+realTimeDataVo.getMachineid());
+// one = new DryEquipment(realTimeDataVo);
+// equipmentService.save(one);
+ if (MqttConstant.ROLE_ADMIN.equals(role)) {
+ MqttMessage mqttMessage = new MqttMessage();
+ mqttMessage.setQos(0);
+ JSONObject object = new JSONObject();
+ object.put("code", realTimeDataVo.getMachineid());
+ mqttMessage.setPayload(object.toJSONString().getBytes());
+ try {
+ mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_REQ_EQUIPMENT, TenantContext.getTenant()) ,mqttMessage);
+ }catch (MqttException e) {
+ e.printStackTrace();
+ }
+
+ }
+ return null;
}
return one;
}
--
Gitblit v1.9.3