From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期五, 25 七月 2025 15:08:07 +0800
Subject: [PATCH] 接收mqtt数据高并发处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java |  112 +++++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 78 insertions(+), 34 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 4558603..6fcfa4b 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.poi.ss.formula.functions.T;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -42,7 +43,6 @@
 public class MqttSampleCallback implements MqttCallback {
     @Value(value = "${jeecg.mqtt.role}")
     private String role;
-
 
 
     @Autowired
@@ -88,7 +88,7 @@
                 String message = new String(mqttMessage.getPayload());
                 JSONObject messageJson = JSONObject.parseObject(message);
 
-                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
+                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) {
                     JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                     if (client == null) {
                         JSONObject item = new JSONObject();
@@ -102,7 +102,7 @@
                         String clientid = messageJson.getString("clientid");
                         item.put("clientid", clientid);
                         // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
-                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
+                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                         //鏄惁杩炴帴
                         item.put("connected", true);
                         //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
@@ -113,6 +113,12 @@
                             item.put("code", info[2]);
                             redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                             System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+
+                            // 鎺ㄩ�佸埌绉诲姩绔�
+                            String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId"));
+                            MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic);
+                            sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -124,8 +130,23 @@
                 if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                     try {
                         String clientid = messageJson.getString("clientid");
-                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
+                        // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
+                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
+                        String tenantId = clientid.split("-")[1];
+                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
                         System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+
+                        //鎺ㄩ�佸埌绉诲姩绔�
+                        JSONObject item = new JSONObject();
+                        String[] info = clientid.split("-");
+                        item.put("type", info[0]);
+                        item.put("tenantId", info[1]);
+                        item.put("code", info[2]);
+                        item.put("connected", false);
+                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId);
+                        MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic);
+                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
@@ -177,38 +198,55 @@
         switch (topic) {
             // 鏌ヨ璁惧鍦ㄧ嚎
             case MqttConstant.MOBILE_QUERY_EQU_STATU:
-                System.err.println("admin鏀跺埌" + topic);
-                // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
-                String clientId = messageJson.getString("clientId");
-                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
+                log.info("admin鏀跺埌MQTT璇锋眰锛宼opic: {}", topic);  // 鏀圭敤鏇磋鑼冪殑鏃ュ織璁板綍
 
-                ThreadUtil.execute(() -> {
+                try {
 
-                    if (client == null || client.isEmpty()) {
-                        JSONObject res = new JSONObject();
-                        res.put("success", false);
-                        res.put("msg", "鏌ヨ澶辫触");
-                        try {
-                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
-                            sendMessage.setQos(0);
-                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
+                    // 1. 鍙傛暟鎻愬彇
+                    String clientId = messageJson.getString("clientId");
+                    if (StringUtils.isEmpty(clientId)) {
                         return;
                     }
+                    String deviceKey = clientId.split("-")[1];  // 鎻愬彇璁惧鏍囪瘑
 
-                    client.put("success", true);
-                    client.put("msg", "鏌ヨ鎴愬姛");
-                    try {
-                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
-                        sendMessage.setQos(0);
-                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                });
+                    // 2. 鏌ヨ璁惧鐘舵��
+                    String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey);
+                    JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId);
+
+                    // 3. 寮傛澶勭悊鍝嶅簲
+                    ThreadUtil.execute(() -> {
+                        JSONObject response = new JSONObject();
+
+                        // 3.1 澶勭悊鏌ヨ缁撴灉
+                        if (client == null || client.isEmpty()) {
+                            response.put("success", false);
+                            response.put("msg", "鏌ヨ澶辫触锛岃澶囦笉瀛樺湪鎴栫绾�");
+                        } else {
+                            response = client;  // 澶嶇敤鏌ヨ缁撴灉
+                            response.put("success", true);
+                            response.put("msg", "鏌ヨ鎴愬姛");
+                        }
+
+                        // 3.2 鍙戦�丮QTT鍝嶅簲
+                        try {
+                            String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req);
+                            MqMessage<JSONObject> mqMessage = new MqMessage<>(
+                                    response,
+                                    response.getString("tenantId"),
+                                    resTopic
+                            );
+
+                            sendMqttMessage(resTopic, mqMessage, 2);
+                            log.debug("璁惧鐘舵�佸搷搴斿彂閫佹垚鍔�: {}", response);
+
+                        } catch (Exception e) {
+                            log.error("MQTT鍝嶅簲鍙戦�佸け璐�", e);
+                        }
+                    });
+
+                } catch (Exception e) {
+                    log.error("澶勭悊璁惧鐘舵�佹煡璇㈠紓甯�", e);
+                }
                 break;
 
             // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP
@@ -227,10 +265,16 @@
             case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                 ThreadUtil.execute(() -> {
                     try {
+
                         RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
-                        synchronized (realTimeDataService) {
-                            realTimeDataService.realTimeDataHandle(vo);
-                        }
+                        // 鍚戝悇绉熸埛绉诲姩绔彂閫佹暟鎹�
+                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid());
+                        MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic);
+                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1);
+
+
+                         realTimeDataService.realTimeDataHandle(vo);
+
                     } catch (Exception e) {
                         e.printStackTrace();
                     }

--
Gitblit v1.9.3