From 9a906b9d83ccd8a638a407eb16a44a44025225c0 Mon Sep 17 00:00:00 2001 From: bsw215583320 <baoshiwei121@163.com> Date: 星期四, 07 十一月 2024 09:03:11 +0800 Subject: [PATCH] 增加设备信息获取mqtt主题和订阅,租户上传设备实时数据时,若服务端无相关设备信息,可通过此接口获取 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 46 +++++++++++++-- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 60 ++++++++++++++++++++ jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 1 jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 34 +++++++++-- 4 files changed, 128 insertions(+), 13 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java index 0ad3087..55a1cfa 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java @@ -5,6 +5,13 @@ */ public interface MqttConstant { + /**************************** MQTT瑙掕壊 start *************************************/ + String ROLE_ADMIN = "admin"; + String ROLE_USER = "user"; + + + /**************************** MQTT瑙掕壊 end *************************************/ + /**************************绯荤粺璁㈤槄strat*******************************/ //涓婄嚎璁㈤槄 String MQTT_TOPIC_ONLINE = "$SYS/brokers/+/clients/+/connected"; @@ -28,14 +35,7 @@ - /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/ - - String TENANT_UP_PREFIX = "tenant/up"; - String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; - - - /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹甧nd*******************************/ @@ -49,7 +49,27 @@ String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd"; /**************************鏈嶅姟绔悜绉诲姩绔搷搴旀寚浠nd*******************************/ + /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/ + + String TENANT_UP_PREFIX = "tenant/up"; + String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; + + String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment"; + + + /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹甧nd*******************************/ + + /**************************鏈嶅姟绔悜绉熸埛绔姹傛暟鎹畇tart*******************************/ + + + String SERVICE_REQ_PREFIX = "service/req"; + String SERVICE_REQ_EQUIPMENT = SERVICE_REQ_PREFIX + "/%s/equipment"; + + String SERVICE_REQ_EQU_TOPIC = SERVICE_REQ_PREFIX + "/+/equipment"; + + + /**************************鏈嶅姟绔悜绉熸埛绔姹傛暟鎹甧nd*******************************/ //redis缂撳瓨 String MQTT_ONLINE_CLIENT = "mqtt:online:client::"; diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java index b55069f..4871e16 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java @@ -104,6 +104,7 @@ case "user": //鏅�氬鎴风鍙渶璁㈤槄鑷韩鐩稿叧娑堟伅 mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); + mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX + "/" + mqttClientId.substring(mqttClientId.lastIndexOf("_")+1) + "/#"); System.err.println("user璁㈤槄" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); break; diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index e1ccd32..cb5861d 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -3,22 +3,30 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; +import org.jeecg.modules.dry.entity.DryEquipment; +import org.jeecg.modules.dry.entity.DryShop; +import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryRealTimeDataService; +import org.jeecg.modules.dry.service.IDryShopService; +import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +@Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @@ -35,6 +43,15 @@ @Autowired private IDryRealTimeDataService realTimeDataService; + + + @Autowired + private IDryEquipmentService equipmentService; + + @Autowired + private IDryShopService dryShopService; + + @Override @@ -174,7 +191,26 @@ } break; + case MqttConstant.TENANT_UP_PREFIX_EQU: + try { + DryEquipment equipment = (DryEquipment) messageJson.get("equipment"); + DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); + + if (dryEquipment == null) { + equipmentService.save(equipment); + } + + + + // 鑾峰彇璁惧鎵�灞炶溅闂� + DryShop shop = (DryShop) messageJson.get("shop"); + shop.setTenantId(equipment.getTenantId()); + dryShopService.save(shop); + + } catch (Exception e) { + e.printStackTrace(); + } } } @@ -218,6 +254,30 @@ break; + case MqttConstant.SERVICE_REQ_EQU_TOPIC: + log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); + // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 + String code = messageJson.getString("code"); + DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(TenantContext.getTenant(),code); + // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 + DryShop shop = dryShopService.getById(equipmentVo.getShopId()); + + JSONObject res = new JSONObject(); + + res.put("tenant", TenantContext.getTenant()); + res.put("equipment", equipmentVo); + res.put("shop", shop); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); + // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + + break; + } } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 5c6d723..1c222f0 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -8,16 +8,23 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IoSession; +import org.apache.shiro.SecurityUtils; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.api.CommonAPI; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.constant.CommonConstant; +import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; +import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; +import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; import org.jeecg.modules.dry.socket.SocketServerConfig; @@ -25,9 +32,11 @@ import org.jeecg.modules.dry.vo.*; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import javax.security.auth.login.LoginContext; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.Socket; @@ -63,7 +72,11 @@ private String token; + @Value(value = "${jeecg.mqtt.role}") + private String role; + @Autowired + private MqttUtil mqttUtil; public String getTemporaryToken() { if (token == null) { @@ -238,17 +251,23 @@ */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { DryOrderVo orderVo; - // 鏌ヨ鑽潗 - DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); + // 鏌ヨ璁惧 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); + if (equ == null) { + log.error("鏈壘鍒拌澶囷細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid()); + return null; + } + // 鏌ヨ鑽潗 + DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); - // 鍒涘缓鏂板伐鍗� - orderVo = new DryOrderVo(realTimeDataVo); if (herbFormula == null) { log.error("鏈壘鍒拌嵂鏉愶細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid()); return null; } + // 鍒涘缓鏂板伐鍗� + orderVo = new DryOrderVo(realTimeDataVo); + orderVo.setHerbId(herbFormula.getId()); orderVo.setEquId(equ.getId()); DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); @@ -268,8 +287,23 @@ queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); DryEquipment one = equipmentService.getOne(queryWrapper); if (one == null) { - one = new DryEquipment(realTimeDataVo); - equipmentService.save(one); + log.error(role+"淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�"+realTimeDataVo.getMachineid()); +// one = new DryEquipment(realTimeDataVo); +// equipmentService.save(one); + if (MqttConstant.ROLE_ADMIN.equals(role)) { + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(0); + JSONObject object = new JSONObject(); + object.put("code", realTimeDataVo.getMachineid()); + mqttMessage.setPayload(object.toJSONString().getBytes()); + try { + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_REQ_EQUIPMENT, TenantContext.getTenant()) ,mqttMessage); + }catch (MqttException e) { + e.printStackTrace(); + } + + } + return null; } return one; } -- Gitblit v1.9.3