From 7c585586e9bea943161676bd9d127e81123891c3 Mon Sep 17 00:00:00 2001
From: baoshiwei <baoshiwei@shlanbao.cn>
Date: 星期三, 11 十二月 2024 11:01:35 +0800
Subject: [PATCH] Merge branch 'refs/heads/master' into herb

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java |   85 +++++++++++++++++++++++++++++++-----------
 1 files changed, 62 insertions(+), 23 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 068d415..e835376 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -85,7 +85,7 @@
                 JSONObject messageJson = JSONObject.parseObject(message);
 
                 if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
-                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
+                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                     if (client == null) {
                         JSONObject item = new JSONObject();
                         //username
@@ -108,7 +108,7 @@
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
-                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
+                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                         System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
                     }
 
@@ -116,7 +116,7 @@
                 if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                     try {
                         String clientid = messageJson.getString("clientid");
-                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
+                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                         System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
                     } catch (Exception e) {
                         e.printStackTrace();
@@ -163,7 +163,7 @@
         }
         // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
         if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
-           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
+            // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
         }
 
         switch (topic) {
@@ -172,7 +172,7 @@
                 System.err.println("admin鏀跺埌" + topic);
                 // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
                 String clientId = messageJson.getString("clientId");
-                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
+                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
 
                 ThreadUtil.execute(() -> {
 
@@ -216,13 +216,14 @@
 
                 break;
             //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁
-            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
+            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
+
                 MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                 });
                 //鏁呴殰鏁版嵁
-                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
+                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                 //绉熸埛id
-                String tentId = realFaultMessage.getTentId();
+                String tenantId = realFaultMessage.getTentId();
                 //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis
                 //杞崲涓� Map<String, Object>
                 Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
@@ -230,21 +231,51 @@
                                 Map.Entry::getKey,
                                 entry -> (Object) entry.getValue()
                         ));
-                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
+                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                 //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if(dryFaultMap.isEmpty()){
+                if (dryFaultMap.isEmpty()) {
                     return;
                 }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
+                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                 //鏁版嵁杞崲
                 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
+                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                 //鍙戦�佸箍鎾�
                 System.err.println("骞挎挱缁欙細" + recTopic);
-                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
+                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
 
-                break;
-            // 鎺ユ敹璁惧鎶ヨ鏁版嵁
+
+            }
+            break;
+            //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級
+            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
+                String tenantId = (String) messageJson.get("tenantId");
+                if (req.toString().isEmpty() || tenantId == null) {
+                    return;
+                }
+                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
+                //杞崲涓� Map<String, DryFaultRecordVo>
+                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
+                        .collect(Collectors.toMap(
+                                entry -> entry.getKey().toString(),
+                                entry -> (DryFaultRecordVo) entry.getValue()
+                        ));
+
+
+                if (dryFaultMap.isEmpty()) {
+                    return;
+                }
+                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
+                //鏁版嵁杞崲
+                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
+                //鍙戦�佽姹傝澶�
+                System.err.println("鍙戦�佺粰锛�" + resTopic);
+                sendMqttMessage(resTopic, mqMessage, 2);
+
+            }
+            break;
+            // 鎺ユ敹璁惧鎶ヨ鍘嗗彶鏁版嵁
             case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                 ThreadUtil.execute(() -> {
                     try {
@@ -372,21 +403,29 @@
         }
 
 
-
     }
 
     /**
      * 鍙戦�佹秷鎭�
-     * @param topic       璁㈤槄
-     * @param mqMessage   娑堟伅浣�
+     *
+     * @param topic     璁㈤槄
+     * @param mqMessage 娑堟伅浣�
+     * @param type      1-鍙戦�佺粰绉熸埛   2-鍙戦�佺粰鍥哄畾id
      */
-    private void sendMqttMessage(String topic, MqMessage mqMessage){
+    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
         ThreadUtil.execute(() -> {
             try {
-                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
-                sendMessage.setQos(0);
-                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
-            }catch (Exception e){
+                if (type == 1) {
+                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+                    sendMessage.setQos(0);
+                    mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
+                } else if (type == 2) {
+                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+                    sendMessage.setQos(0);
+                    mqttUtil.getMqttClient().publish(topic, sendMessage);
+                }
+
+            } catch (Exception e) {
                 e.printStackTrace();
             }
         });

--
Gitblit v1.9.3