From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期二, 22 七月 2025 08:55:15 +0800
Subject: [PATCH] 修改接收实时数据接口-故障处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java |   60 ++++++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 48 insertions(+), 12 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
old mode 100644
new mode 100755
index 395d41d..a490a0a
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -3,6 +3,7 @@
 import cn.hutool.core.thread.ThreadUtil;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
@@ -14,12 +15,15 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.data.redis.core.RedisTemplate;
 
 import java.util.*;
 
 /**
  * mqtt
  */
+@Data
 @Slf4j
 @Configuration
 public class MqttConfig {
@@ -33,6 +37,8 @@
   private String mqttClientId;
   @Value(value = "${jeecg.mqtt.role}")
   private String role;
+  @Value(value = "${jeecg.mqtt.enable}")
+  private boolean enable;
 
   @Autowired
   private MqttSampleCallback mqttSampleCallback;
@@ -42,6 +48,8 @@
   private RedisUtil redisUtil;
   @Autowired
   private EmqxApi emqxApi;
+  @Autowired
+  private RedisTemplate redisTemplate;
 
 
   @Bean
@@ -54,6 +62,7 @@
    * mqtt杩炴帴閰嶇疆
    */
   private void conn() {
+    if (!isEnable()) return;
     MemoryPersistence persistence = new MemoryPersistence();
     MqttConnectOptions mqttConnOpt = new MqttConnectOptions();
     mqttConnOpt.setUserName(mqttName);
@@ -92,6 +101,16 @@
             //璁㈤槄绉诲姩绔笂琛屾寚浠�
             mqttClient.subscribe(MqttConstant.MOBILE_UP);
             System.err.println("admin璁㈤槄" + MqttConstant.MOBILE_UP);
+            // 璁㈤槄绉熸埛瀹炴椂鏁版嵁
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
+            // 璁㈤槄绉熸埛鎶ヨ鏁版嵁
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_EQU);
             initClients();
 
             break;
@@ -99,6 +118,7 @@
           case "user":
             //鏅�氬鎴风鍙渶璁㈤槄鑷韩鐩稿叧娑堟伅
             mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
+            mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX);
             System.err.println("user璁㈤槄" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
             break;
 
@@ -116,13 +136,14 @@
 
   //閲嶈繛
   private void reconn() {
+    if (!isEnable()) return;
     Timer timer = new Timer();
 
     TimerTask task = new TimerTask() {
       @Override
       public void run() {
         // 鍦ㄨ繖閲岀紪鍐欏畾鏃舵墽琛岀殑浠诲姟閫昏緫
-        System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date());
+        //System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date());
         if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) {
           try {
             conn();
@@ -142,7 +163,14 @@
    * 鏈嶅姟绔紙admin瑙掕壊锛夊惎鍔ㄦ椂鏌ヨ鎵�鏈夎澶囧苟缂撳瓨鍒皉edis
    */
   private void initClients() {
-    redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT);
+    //鍒濆鍖栨椂鍏堝垹闄ゆ墍鏈夊湪绾胯澶�
+    Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
+    if (keys != null && !keys.isEmpty()) {
+      keys.forEach(key -> System.out.println("鍒濆鍖栧垹闄ゅ湪绾胯澶�: " + key));
+      redisTemplate.delete(keys);
+    } else {
+      System.out.println("鍒濆鍖栨棤鍦ㄧ嚎璁惧: " + MqttConstant.MQTT_ONLINE_CLIENT);
+    }
 
     JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
     //TODO 鏍规嵁emqx杩斿洖缂栧啓瀹炰綋绫�
@@ -151,29 +179,37 @@
       for (int i = 0; i < data.size(); i++) {
         JSONObject obj = data.getJSONObject(i);
         JSONObject item = new JSONObject();
+        //clientid
+        String clientid = obj.getString("clientid");
+        item.put("clientid", clientid);
+        //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪
+        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
         //username
         item.put("username", obj.get("username"));
         //杩炴帴鏃堕棿
         String st = obj.getString("connected_at");
         String upTime = DateUtils.zone2Str(st);
         item.put("connectedAt", upTime);
-        //clientid
-        String clientid = obj.getString("clientid");
-        item.put("clientid", clientid);
         //鏄惁杩炴帴
         Boolean connected = obj.getBoolean("connected");
         item.put("connected", connected);
-        //
-        String[] info = clientid.split("-");
-        item.put("type", info[0]);
-        item.put("tenantId", info[1]);
-        item.put("code", info[2]);
+        //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
+        try {
+          String[] info = clientid.split("-");
+          item.put("type", info[0]);
+          item.put("tenantId", info[1]);
+          item.put("code", info[2]);
 
-        if (connected) {
-          redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
+          if (connected) {
+            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
+          }
+        }catch (Exception e){
+          e.printStackTrace();
         }
 
 
+
+
       }
     }
   }

--
Gitblit v1.9.3