From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期二, 22 七月 2025 08:55:15 +0800
Subject: [PATCH] 修改接收实时数据接口-故障处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java |   91 +++++++++++++++++++++++++--------------------
 1 files changed, 50 insertions(+), 41 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index b381df7..4558603 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -42,6 +42,9 @@
 public class MqttSampleCallback implements MqttCallback {
     @Value(value = "${jeecg.mqtt.role}")
     private String role;
+
+
+
     @Autowired
     private MqttUtil mqttUtil;
     @Autowired
@@ -76,8 +79,8 @@
 
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) {
-        System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
-                + new String(mqttMessage.getPayload()));
+//        System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
+//                + new String(mqttMessage.getPayload()));
 
         switch (role) {
             // 绠$悊鍛�
@@ -98,6 +101,8 @@
                         //clientid
                         String clientid = messageJson.getString("clientid");
                         item.put("clientid", clientid);
+                        // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
+                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                         //鏄惁杩炴帴
                         item.put("connected", true);
                         //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
@@ -105,12 +110,14 @@
                             String[] info = clientid.split("-");
                             item.put("type", info[0]);
                             item.put("tenantId", info[1]);
-                            //item.put("code", info[2]);
+                            item.put("code", info[2]);
+                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
+                            System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
-                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
-                        System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+
+
                     }
 
                 }
@@ -204,16 +211,16 @@
                 });
                 break;
 
-            // 鎺ユ敹璁惧瀹炴椂鏁版嵁
+            // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP
             case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
-                ThreadUtil.execute(() -> {
-                    try {
-                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
-                        realTimeDataService.realTimeDataHandle(vo);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                });
+//                ThreadUtil.execute(() -> {
+//                    try {
+//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
+//                        realTimeDataService.realTimeDataHandle(vo);
+//                    } catch (Exception e) {
+//                        e.printStackTrace();
+//                    }
+//                });
 
                 break;
             // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴
@@ -221,40 +228,42 @@
                 ThreadUtil.execute(() -> {
                     try {
                         RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
-                        realTimeDataService.realTimeDataHandle(vo);
+                        synchronized (realTimeDataService) {
+                            realTimeDataService.realTimeDataHandle(vo);
+                        }
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 });
                 break;
-            //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁
+            //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP
             case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
 
-                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
-                });
-                //鏁呴殰鏁版嵁
-                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
-                //绉熸埛id
-                String tenantId = realFaultMessage.getTentId();
+//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
+//                });
+//                //鏁呴殰鏁版嵁
+//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
+//                //绉熸埛id
+//                String tenantId = realFaultMessage.getTentId();
                 //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis
-                //杞崲涓� Map<String, Object>
-                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
-                        .collect(Collectors.toMap(
-                                Map.Entry::getKey,
-                                entry -> (Object) entry.getValue()
-                        ));
-                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
-                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if (dryFaultMap.isEmpty()) {
-                    return;
-                }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
-                //鏁版嵁杞崲
-                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
-                //鍙戦�佸箍鎾�
-                System.err.println("骞挎挱缁欙細" + recTopic);
-                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+//                //杞崲涓� Map<String, Object>
+//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
+//                        .collect(Collectors.toMap(
+//                                Map.Entry::getKey,
+//                                entry -> (Object) entry.getValue()
+//                        ));
+//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
+//                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+//                if (dryFaultMap.isEmpty()) {
+//                    return;
+//                }
+//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+//                //鏁版嵁杞崲
+//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+//                //鍙戦�佸箍鎾�
+//                System.err.println("骞挎挱缁欙細" + recTopic);
+//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
             }
             break;
             //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級
@@ -263,7 +272,7 @@
                 if (req.toString().isEmpty() || tenantId == null) {
                     return;
                 }
-                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
+                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                 //杞崲涓� Map<String, DryFaultRecordVo>
                 Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                         .collect(Collectors.toMap(

--
Gitblit v1.9.3