From 320c0c10a90140627b10a6fcf498e79d09785da6 Mon Sep 17 00:00:00 2001
From: zhuguifei <312353457@qq.com>
Date: 星期三, 27 十一月 2024 13:42:58 +0800
Subject: [PATCH] 添加mqtt数据接口

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java              |   15 +
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java     |  625 ++++++++++++++++++++++++++++-----------------------
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java             |   22 +
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java |   23 +
 jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java                             |   28 +
 5 files changed, 412 insertions(+), 301 deletions(-)

diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 54530d6..f7867b5 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -31,10 +31,9 @@
   String MOBILE_QUERY_EQU_STATU = MOBILE_UP_PREFIX + "/query/equ/statu";
   //绉诲姩绔繙绋嬭姹傛寚浠�
   String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd";
+
+
   /**************************绉诲姩绔悜鏈嶅姟绔姹傛寚浠nd*******************************/
-
-
-
 
 
 
@@ -47,7 +46,26 @@
   String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
   //杩斿洖绉诲姩绔繙绋嬭姹傛寚浠�
   String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
+
+
+
   /**************************鏈嶅姟绔悜绉诲姩绔搷搴旀寚浠nd*******************************/
+
+
+  /**************************鏈嶅姟绔悜绉诲姩绔彂閫佸箍鎾璼tart*******************************/
+  //骞挎挱绫诲瀷鎺ㄩ�佹棤鍏崇Щ鍔ㄧ璁惧id锛屽悜鎵�鏈夊湪绾跨Щ鍔ㄧ鍙戦��
+  String SERVICE_BROADCAST_PREFIX = "service/broadcast";
+
+  //鏈嶅姟绔悜鍚勭鎴峰鎴风鍙戦�佸疄鏃舵晠闅滃箍鎾�
+  String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"  ;
+
+
+
+  /**************************鏈嶅姟绔悜绉诲姩绔彂閫佸箍鎾璭nd*******************************/
+
+
+
+
 
   /**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/
 
@@ -80,9 +98,9 @@
 
   //service(cloud)
   //鍦ㄧ嚎瀹㈡埛绔�
-  String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
+  String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
   //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
-  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:";
+  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
 
 
 
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
index d67b9d5..feea7b1 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -18,7 +18,9 @@
 import org.jeecg.config.mybatis.MybatisPlusSaasConfig;
 import org.jeecg.modules.dry.api.EmqxApi;
 import org.jeecg.modules.dry.entity.DryEquipment;
+import org.jeecg.modules.dry.entity.DryFaultRecord;
 import org.jeecg.modules.dry.service.IDryEquipmentService;
+import org.jeecg.modules.dry.service.IDryFaultRecordService;
 import org.jeecg.modules.dry.vo.MoEquVo;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +42,8 @@
 public class MobileController {
   @Autowired
   private IDryEquipmentService dryEquipmentService;
+  @Autowired
+  private IDryFaultRecordService faultRecordService;
   @Autowired
   private RedisUtil redisUtil;
 
@@ -64,18 +68,27 @@
     return Result.OK(voPage);
   }
 
+  @ApiOperation(value = "璁惧鎶ヨ鏁版嵁", notes = "璁惧鎶ヨ鏁版嵁鍒楄〃鏌ヨ")
+  @GetMapping(value = "/fault/list")
+  public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){
+    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
+    QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
+    Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize);
+    IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper);
+    return Result.OK(pageList);
+  }
+
+
+
   private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
-
-
     //褰撳墠绉熸埛id
     int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
-
-
     List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
       MoEquVo vo = new MoEquVo();
       BeanUtils.copyProperties(item, vo);
       String clientid = "client-" + tenantId + "-" + item.getCode();
-      JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
+      //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId);
+      JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid);
       //缁勮鐘舵�佹暟鎹�
       if (client != null) {
         vo.setOnline(true);
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
index 892bf70..685b416 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
@@ -10,6 +10,7 @@
 public class MqMessage<T> {
     private T data;
     private String tentId;
+    private String topic;
 
 
     public MqMessage() {
@@ -19,4 +20,18 @@
         this.data = data;
         this.tentId = tentId;
     }
+    public MqMessage(T data, String tentId,String topic) {
+        this.data = data;
+        this.tentId = tentId;
+        this.topic = topic;
+    }
+
+    @Override
+    public String toString() {
+        return "MqMessage{" +
+                "data=" + data +
+                ", tentId='" + tentId + '\'' +
+                ", topic='" + topic + '\'' +
+                '}';
+    }
 }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index 41d3db4..43c838f 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.RedisTemplate;
 
 import java.util.*;
 
@@ -44,6 +45,8 @@
   private RedisUtil redisUtil;
   @Autowired
   private EmqxApi emqxApi;
+  @Autowired
+  private RedisTemplate redisTemplate;
 
 
   @Bean
@@ -98,6 +101,8 @@
             mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
             System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
             // 璁㈤槄绉熸埛鎶ヨ鏁版嵁
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
             mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
             System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
             mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
@@ -153,7 +158,14 @@
    * 鏈嶅姟绔紙admin瑙掕壊锛夊惎鍔ㄦ椂鏌ヨ鎵�鏈夎澶囧苟缂撳瓨鍒皉edis
    */
   private void initClients() {
-    redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT);
+    //鍒濆鍖栨椂鍏堝垹闄ゆ墍鏈夊湪绾胯澶�
+    Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
+    if (keys != null && !keys.isEmpty()) {
+      keys.forEach(key -> System.out.println("鍒濆鍖栧垹闄ゅ湪绾胯澶�: " + key));
+      redisTemplate.delete(keys);
+    } else {
+      System.out.println("鍒濆鍖栨棤鍦ㄧ嚎璁惧: " + MqttConstant.MQTT_ONLINE_CLIENT);
+    }
 
     JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
     //TODO 鏍规嵁emqx杩斿洖缂栧啓瀹炰綋绫�
@@ -180,13 +192,15 @@
           item.put("type", info[0]);
           item.put("tenantId", info[1]);
           //item.put("code", info[2]);
+
+          if (connected) {
+            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
+          }
         }catch (Exception e){
           e.printStackTrace();
         }
 
-        if (connected) {
-          redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
-        }
+
 
 
       }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 1143056..068d415 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.poi.ss.formula.functions.T;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -22,324 +23,374 @@
 import org.jeecg.modules.dry.entity.DryShop;
 import org.jeecg.modules.dry.service.*;
 import org.jeecg.modules.dry.vo.DryEquipmentVo;
+import org.jeecg.modules.dry.vo.DryFaultRecordVo;
 import org.jeecg.modules.dry.vo.RealTimeDataVo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Component
 @Scope("prototype")
 public class MqttSampleCallback implements MqttCallback {
-  @Value(value = "${jeecg.mqtt.role}")
-  private String role;
-  @Autowired
-  private MqttUtil mqttUtil;
-  @Autowired
-  private EmqxApi emqxApi;
-  @Autowired
-  private BaseCommonService baseCommonService;
-  @Autowired
-  private RedisUtil redisUtil;
+    @Value(value = "${jeecg.mqtt.role}")
+    private String role;
+    @Autowired
+    private MqttUtil mqttUtil;
+    @Autowired
+    private EmqxApi emqxApi;
+    @Autowired
+    private BaseCommonService baseCommonService;
+    @Autowired
+    private RedisUtil redisUtil;
 
-  @Autowired
-  private IDryRealTimeDataService realTimeDataService;
+    @Autowired
+    private IDryRealTimeDataService realTimeDataService;
 
 
-  @Autowired
-  private IDryEquipmentService equipmentService;
+    @Autowired
+    private IDryEquipmentService equipmentService;
 
-  @Autowired
-  private IDryEqpTypeService eqpTypeService;
+    @Autowired
+    private IDryEqpTypeService eqpTypeService;
 
-  @Autowired
-  private IDryShopService dryShopService;
+    @Autowired
+    private IDryShopService dryShopService;
 
-  @Autowired
-  private IDryFaultRecordService faultRecordService;
+    @Autowired
+    private IDryFaultRecordService faultRecordService;
 
 
+    @Override
+    public void connectionLost(Throwable throwable) {
+        System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎");
+        System.err.println("杩炴帴鏂紑锛氾細" + throwable.toString());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage mqttMessage) {
+        System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
+                + new String(mqttMessage.getPayload()));
+
+        switch (role) {
+            // 绠$悊鍛�
+            case "admin":
+                String message = new String(mqttMessage.getPayload());
+                JSONObject messageJson = JSONObject.parseObject(message);
+
+                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
+                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
+                    if (client == null) {
+                        JSONObject item = new JSONObject();
+                        //username
+                        item.put("username", messageJson.get("username"));
+                        //杩炴帴鏃堕棿
+                        Long st = messageJson.getLong("connected_at");
+                        String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
+                        item.put("connectedAt", upTime);
+                        //clientid
+                        String clientid = messageJson.getString("clientid");
+                        item.put("clientid", clientid);
+                        //鏄惁杩炴帴
+                        item.put("connected", true);
+                        //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
+                        try {
+                            String[] info = clientid.split("-");
+                            item.put("type", info[0]);
+                            item.put("tenantId", info[1]);
+                            //item.put("code", info[2]);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
+                        System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+                    }
+
+                }
+                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
+                    try {
+                        String clientid = messageJson.getString("clientid");
+                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
+                        System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                }
+                parseAdminCommand(topic, mqttMessage);
+
+                break;
+            // 鏅�氱敤鎴�
+            case "user":
+                System.err.println("user");
+                try {
+                    parseUserCommand(topic, mqttMessage);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+
+                break;
+
+        }
+
+    }
 
 
-  @Override
-  public void connectionLost(Throwable throwable) {
-    System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎");
-    System.err.println("杩炴帴鏂紑锛氾細"+throwable.toString());
-  }
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+        System.err.println("娑堟伅浼犻�掓垚鍔�");
+    }
 
-  @Override
-  public void messageArrived(String topic, MqttMessage mqttMessage) {
-    System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
-      + new String(mqttMessage.getPayload()));
-
-    switch (role) {
-      // 绠$悊鍛�
-      case "admin":
+    // 瑙f瀽admin瑙掕壊鎸囦护
+    private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
         String message = new String(mqttMessage.getPayload());
         JSONObject messageJson = JSONObject.parseObject(message);
 
-        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
-          JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
-          if (client == null) {
-            JSONObject item = new JSONObject();
-            //username
-            item.put("username", messageJson.get("username"));
-            //杩炴帴鏃堕棿
-            Long st = messageJson.getLong("connected_at");
-            String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
-            item.put("connectedAt", upTime);
-            //clientid
-            String clientid = messageJson.getString("clientid");
-            item.put("clientid", clientid);
-            //鏄惁杩炴帴
-            item.put("connected", true);
-            //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
+        //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
+        StringBuilder req = new StringBuilder();
+        if (messageJson.containsKey("req")) {
+            req.append(messageJson.get("req"));
+        }
+        //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
+        if (messageJson.containsKey("timestamp")) {
+            messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
+        }
+        // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
+        if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
+           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
+        }
+
+        switch (topic) {
+            // 鏌ヨ璁惧鍦ㄧ嚎
+            case MqttConstant.MOBILE_QUERY_EQU_STATU:
+                System.err.println("admin鏀跺埌" + topic);
+                // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
+                String clientId = messageJson.getString("clientId");
+                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
+
+                ThreadUtil.execute(() -> {
+
+                    if (client == null || client.isEmpty()) {
+                        JSONObject res = new JSONObject();
+                        res.put("success", false);
+                        res.put("msg", "鏌ヨ澶辫触");
+                        try {
+                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+                            sendMessage.setQos(0);
+                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                        return;
+                    }
+
+                    client.put("success", true);
+                    client.put("msg", "鏌ヨ鎴愬姛");
+                    try {
+                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
+                        sendMessage.setQos(0);
+                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
+                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+                break;
+
+            // 鎺ユ敹璁惧瀹炴椂鏁版嵁
+            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
+                ThreadUtil.execute(() -> {
+                    try {
+                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
+                        realTimeDataService.realTimeDataHandle(vo);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+
+                break;
+            //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁
+            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
+                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
+                });
+                //鏁呴殰鏁版嵁
+                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
+                //绉熸埛id
+                String tentId = realFaultMessage.getTentId();
+                //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis
+                //杞崲涓� Map<String, Object>
+                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
+                        .collect(Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> (Object) entry.getValue()
+                        ));
+                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
+                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+                if(dryFaultMap.isEmpty()){
+                    return;
+                }
+                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
+                //鏁版嵁杞崲
+                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
+                //鍙戦�佸箍鎾�
+                System.err.println("骞挎挱缁欙細" + recTopic);
+                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
+
+                break;
+            // 鎺ユ敹璁惧鎶ヨ鏁版嵁
+            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
+                ThreadUtil.execute(() -> {
+                    try {
+                        MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
+                        });
+                        //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
+                        System.err.println(faultMessage.toString());
+                        faultRecordService.saveBatch(faultMessage.getData());
+
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+
+                break;
+
+
+            case MqttConstant.TENANT_UP_PREFIX_EQU:
+                ThreadUtil.execute(() -> {
+                    try {
+                        Object equObj = messageJson.get("equipment");
+                        DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
+                        TenantContext.setTenant(equipment.getTenantId() + "");
+                        DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
+                        if (dryEquipment == null) {
+                            equipmentService.save(equipment);
+                        }
+                        Object typeObj = messageJson.get("eqpType");
+                        DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
+                        DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
+                        if (dryEqpType == null) {
+                            eqpTypeService.save(eqpType);
+                        }
+                        // 鑾峰彇璁惧鎵�灞炶溅闂�
+                        Object shopObj = messageJson.get("shop");
+                        DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
+                        DryShop dryShop = dryShopService.getById(shop.getId());
+                        if (dryShop == null) {
+                            dryShopService.save(shop);
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                });
+
+        }
+
+    }
+
+
+    // 瑙f瀽user瑙掕壊鎸囦护
+    private void parseUserCommand(String topic, MqttMessage mqttMessage) {
+
+        String message = new String(mqttMessage.getPayload());
+        JSONObject messageJson = JSONObject.parseObject(message);
+
+        //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
+        StringBuilder req = new StringBuilder();
+        if (messageJson.containsKey("req")) {
+            req.append(messageJson.get("req"));
+        }
+        //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
+        if (messageJson.containsKey("timestamp")) {
+            messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
+        }
+
+        switch (topic) {
+            case MqttConstant.MOBILE_REQ_EQU_CMD:
+                System.err.println("user鏀跺埌" + topic);
+                System.err.println(message);
+                ThreadUtil.execute(() -> {
+                    //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅
+                    JSONObject res = new JSONObject();
+                    res.put("success", true);
+                    res.put("msg", "鎿嶄綔鎴愬姛");
+                    try {
+                        MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
+                        sendMessage.setQos(0);
+                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
+                        baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+
+                });
+
+
+                break;
+            case MqttConstant.SERVICE_REQ_PREFIX:
+                log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
+                ThreadUtil.execute(() -> {
+                    String tenantId = messageJson.getString("tenantId");
+                    String clientId = mqttUtil.getMqttClient().getClientId();
+                    String tenant = clientId.substring(clientId.lastIndexOf("_") + 1);
+                    if (tenantId != null && tenantId.equals(tenant)) {
+                        TenantContext.setTenant(tenantId);
+                        // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
+                        String code = messageJson.getString("code");
+                        DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code);
+                        // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
+                        DryShop shop = dryShopService.getById(equipmentVo.getShopId());
+                        // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅
+                        DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
+
+                        JSONObject res = new JSONObject();
+
+                        res.put("tenant", tenantId);
+                        res.put("equipment", equipmentVo);
+                        res.put("shop", shop);
+                        res.put("eqpType", eqpType);
+                        try {
+                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+                            sendMessage.setQos(0);
+                            mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
+                            // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+                        } catch (Exception e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+
+
+                break;
+
+        }
+
+
+
+    }
+
+    /**
+     * 鍙戦�佹秷鎭�
+     * @param topic       璁㈤槄
+     * @param mqMessage   娑堟伅浣�
+     */
+    private void sendMqttMessage(String topic, MqMessage mqMessage){
+        ThreadUtil.execute(() -> {
             try {
-              String[] info = clientid.split("-");
-              item.put("type", info[0]);
-              item.put("tenantId", info[1]);
-              //item.put("code", info[2]);
+                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+                sendMessage.setQos(0);
+                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
             }catch (Exception e){
-              e.printStackTrace();
+                e.printStackTrace();
             }
-            redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
-            System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
-          }
-
-        }
-        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
-          try {
-            String clientid = messageJson.getString("clientid");
-            redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
-            System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-
-        }
-        parseAdminCommand(topic, mqttMessage);
-
-        break;
-      // 鏅�氱敤鎴�
-      case "user":
-        System.err.println("user");
-        try {
-          parseUserCommand(topic, mqttMessage);
-        } catch (Exception e) {
-          e.printStackTrace();
-        }
-
-        break;
-
-    }
-
-  }
-
-
-  @Override
-  public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
-    System.err.println("娑堟伅浼犻�掓垚鍔�");
-  }
-
-  // 瑙f瀽admin瑙掕壊鎸囦护
-  private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
-    String message = new String(mqttMessage.getPayload());
-    JSONObject messageJson = JSONObject.parseObject(message);
-
-    //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
-    StringBuilder req = new StringBuilder();
-    if (messageJson.containsKey("req")) {
-      req.append(messageJson.get("req"));
-    }
-    //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
-    if (messageJson.containsKey("timestamp")) {
-      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
-    }
-    // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
-    if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){
-      baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
-    }
-
-    switch (topic) {
-      // 鏌ヨ璁惧鍦ㄧ嚎
-      case MqttConstant.MOBILE_QUERY_EQU_STATU:
-        System.err.println("admin鏀跺埌" + topic);
-        // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
-        String clientId = messageJson.getString("clientId");
-        JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
-
-        ThreadUtil.execute(() -> {
-
-          if (client == null || client.isEmpty()) {
-            JSONObject res = new JSONObject();
-            res.put("success", false);
-            res.put("msg", "鏌ヨ澶辫触");
-            try {
-              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
-              sendMessage.setQos(0);
-              mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-            } catch (Exception e) {
-              e.printStackTrace();
-            }
-            return;
-          }
-
-          client.put("success", true);
-          client.put("msg", "鏌ヨ鎴愬姛");
-          try {
-            MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
-            sendMessage.setQos(0);
-            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-            baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
         });
-        break;
-
-        // 鎺ユ敹璁惧瀹炴椂鏁版嵁
-      case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
-        ThreadUtil.execute(() -> {
-          try {
-            RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
-            realTimeDataService.realTimeDataHandle(vo);
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        });
-
-        break;
-      // 鎺ユ敹璁惧鎶ヨ鏁版嵁
-      case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
-        ThreadUtil.execute(() -> {
-          try {
-            MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
-            });
-         //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
-            System.err.println(listMqMessage.toString());
-            faultRecordService.saveBatch(listMqMessage.getData());
-
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        });
-
-        break;
-
-
-      case MqttConstant.TENANT_UP_PREFIX_EQU:
-        ThreadUtil.execute(() -> {
-          try {
-            Object equObj = messageJson.get("equipment");
-            DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
-            TenantContext.setTenant(equipment.getTenantId()+"");
-            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
-            if (dryEquipment == null) {
-              equipmentService.save(equipment);
-            }
-            Object typeObj = messageJson.get("eqpType");
-            DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
-            DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
-            if (dryEqpType == null) {
-              eqpTypeService.save(eqpType);
-            }
-            // 鑾峰彇璁惧鎵�灞炶溅闂�
-            Object shopObj = messageJson.get("shop");
-            DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
-            DryShop dryShop = dryShopService.getById(shop.getId());
-            if (dryShop == null) {
-              dryShopService.save(shop);
-            }
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-        });
-
     }
 
-  }
-
-
-  // 瑙f瀽user瑙掕壊鎸囦护
-  private void parseUserCommand(String topic, MqttMessage mqttMessage) {
-
-    String message = new String(mqttMessage.getPayload());
-    JSONObject messageJson = JSONObject.parseObject(message);
-
-    //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
-    StringBuilder req = new StringBuilder();
-    if (messageJson.containsKey("req")) {
-      req.append(messageJson.get("req"));
-    }
-    //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
-    if (messageJson.containsKey("timestamp")) {
-      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
-    }
-
-    switch (topic) {
-      case MqttConstant.MOBILE_REQ_EQU_CMD:
-        System.err.println("user鏀跺埌" + topic);
-        System.err.println(message);
-        ThreadUtil.execute(() -> {
-          //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅
-          JSONObject res = new JSONObject();
-          res.put("success", true);
-          res.put("msg", "鎿嶄綔鎴愬姛");
-          try {
-            MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
-            sendMessage.setQos(0);
-            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
-            baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
-          } catch (Exception e) {
-            e.printStackTrace();
-          }
-
-        });
-
-
-        break;
-      case MqttConstant.SERVICE_REQ_PREFIX:
-        log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
-        ThreadUtil.execute(() -> {
-          String tenantId = messageJson.getString("tenantId");
-          String clientId = mqttUtil.getMqttClient().getClientId();
-          String tenant = clientId.substring(clientId.lastIndexOf("_")+1);
-          if (tenantId!=null && tenantId.equals(tenant)) {
-            TenantContext.setTenant(tenantId);
-            // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
-            String code = messageJson.getString("code");
-            DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code);
-            // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
-            DryShop shop = dryShopService.getById(equipmentVo.getShopId());
-            // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅
-            DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
-
-            JSONObject res = new JSONObject();
-
-            res.put("tenant", tenantId);
-            res.put("equipment", equipmentVo);
-            res.put("shop", shop);
-            res.put("eqpType", eqpType);
-            try {
-              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
-              sendMessage.setQos(0);
-              mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
-              // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
-            } catch (Exception e) {
-              e.printStackTrace();
-            }
-          }
-        });
-
-
-
-        break;
-
-    }
-
-  }
 
 }

--
Gitblit v1.9.3