From 5775cd2c00bd3138a5c639a4d8aab91d287f9064 Mon Sep 17 00:00:00 2001
From: bsw215583320 <baoshiwei121@163.com>
Date: 星期五, 08 十一月 2024 11:27:59 +0800
Subject: [PATCH] 云平台mqtt接收设备信息后处理逻辑

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java |  128 +++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 123 insertions(+), 5 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 55c571f..b51e204 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -1,21 +1,34 @@
 package org.jeecg.modules.dry.mqtt;
 
 import cn.hutool.core.thread.ThreadUtil;
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.jeecg.common.config.TenantContext;
 import org.jeecg.common.constant.CommonConstant;
 import org.jeecg.common.constant.MqttConstant;
 import org.jeecg.common.util.DateUtils;
 import org.jeecg.common.util.RedisUtil;
 import org.jeecg.modules.base.service.BaseCommonService;
 import org.jeecg.modules.dry.api.EmqxApi;
+import org.jeecg.modules.dry.entity.DryEqpType;
+import org.jeecg.modules.dry.entity.DryEquipment;
+import org.jeecg.modules.dry.entity.DryShop;
+import org.jeecg.modules.dry.service.IDryEqpTypeService;
+import org.jeecg.modules.dry.service.IDryEquipmentService;
+import org.jeecg.modules.dry.service.IDryRealTimeDataService;
+import org.jeecg.modules.dry.service.IDryShopService;
+import org.jeecg.modules.dry.vo.DryEquipmentVo;
+import org.jeecg.modules.dry.vo.RealTimeDataVo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
+@Slf4j
 @Component
 @Scope("prototype")
 public class MqttSampleCallback implements MqttCallback {
@@ -29,6 +42,21 @@
   private BaseCommonService baseCommonService;
   @Autowired
   private RedisUtil redisUtil;
+
+  @Autowired
+  private IDryRealTimeDataService realTimeDataService;
+
+
+  @Autowired
+  private IDryEquipmentService equipmentService;
+
+  @Autowired
+  private IDryEqpTypeService eqpTypeService;
+
+  @Autowired
+  private IDryShopService dryShopService;
+
+
 
 
   @Override
@@ -74,9 +102,14 @@
 
         }
         if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
-          String clientid = messageJson.getString("clientid");
-          redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
-          System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+          try {
+            String clientid = messageJson.getString("clientid");
+            redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
+            System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+
         }
         parseAdminCommand(topic, mqttMessage);
 
@@ -84,7 +117,12 @@
       // 鏅�氱敤鎴�
       case "user":
         System.err.println("user");
-        parseUserCommand(topic, mqttMessage);
+        try {
+          parseUserCommand(topic, mqttMessage);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+
         break;
 
     }
@@ -111,7 +149,11 @@
     if (messageJson.containsKey("timestamp")) {
       messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
     }
-    baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
+    // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
+    if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){
+      baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
+    }
+
     switch (topic) {
       // 鏌ヨ璁惧鍦ㄧ嚎
       case MqttConstant.MOBILE_QUERY_EQU_STATU:
@@ -148,6 +190,46 @@
           }
         });
         break;
+
+        // 鎺ユ敹璁惧瀹炴椂鏁版嵁
+      case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
+        ThreadUtil.execute(() -> {
+          try {
+            RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
+            realTimeDataService.realTimeDataHandle(vo);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        });
+
+        break;
+      case MqttConstant.TENANT_UP_PREFIX_EQU:
+        ThreadUtil.execute(() -> {
+          try {
+            Object equObj = messageJson.get("equipment");
+            DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
+            TenantContext.setTenant(equipment.getTenantId()+"");
+            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
+            if (dryEquipment == null) {
+              equipmentService.save(equipment);
+            }
+            Object typeObj = messageJson.get("eqpType");
+            DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
+            DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
+            if (dryEqpType == null) {
+              eqpTypeService.save(eqpType);
+            }
+            // 鑾峰彇璁惧鎵�灞炶溅闂�
+            Object shopObj = messageJson.get("shop");
+            DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
+            DryShop dryShop = dryShopService.getById(shop.getId());
+            if (dryShop == null) {
+              dryShopService.save(shop);
+            }
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        });
 
     }
 
@@ -192,6 +274,42 @@
 
 
         break;
+      case MqttConstant.SERVICE_REQ_PREFIX:
+        log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
+        ThreadUtil.execute(() -> {
+          String tenantId = messageJson.getString("tenantId");
+          String clientId = mqttUtil.getMqttClient().getClientId();
+          String tenant = clientId.substring(clientId.lastIndexOf("_")+1);
+          if (tenantId!=null && tenantId.equals(tenant)) {
+            // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
+            String code = messageJson.getString("code");
+            DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code);
+            // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
+            DryShop shop = dryShopService.selectByTenantIdShopId(tenantId, equipmentVo.getShopId());
+            // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅
+            DryEqpType eqpType = eqpTypeService.selectByTenantIdTypeId(tenantId, equipmentVo.getType());
+
+            JSONObject res = new JSONObject();
+
+            res.put("tenant", clientId);
+            res.put("equipment", equipmentVo);
+            res.put("shop", shop);
+            res.put("eqpType", eqpType);
+            try {
+              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+              sendMessage.setQos(0);
+              mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
+              // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+            } catch (Exception e) {
+              e.printStackTrace();
+            }
+          }
+        });
+
+
+
+        break;
+
     }
 
   }

--
Gitblit v1.9.3