From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期二, 22 七月 2025 08:55:15 +0800
Subject: [PATCH] 修改接收实时数据接口-故障处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java |  371 +++++++++++++++++++--------------
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java                 |   91 ++++---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java                         |   15 
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java    |  135 +++++-------
 jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java                                         |    7 
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java                           |   29 ++
 6 files changed, 366 insertions(+), 282 deletions(-)

diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 6aa48fe..6264a3b 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -97,16 +97,15 @@
   /**************************start*******************************/
   /**************************end*******************************/
   //redis缂撳瓨
-  //client
-  String MQTT_REAL_FAULT = "mqtt:real:fault";
+  //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
+  String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
 
 
 
   //service(cloud)
   //鍦ㄧ嚎瀹㈡埛绔�
   String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
-  //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
-  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
+//  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
 
 
 
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
index 3344466..0f1b58f 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
@@ -70,25 +70,24 @@
     private IDryEquipmentService dryEquipmentService;
 
 
-
-    @ApiOperation(value="娴嬭瘯", notes="杩斿洖Hello")
+    @ApiOperation(value = "娴嬭瘯", notes = "杩斿洖Hello")
     @GetMapping("/hello")
     public Result<?> sayHello() {
         return Result.ok("Hello");
     }
 
-    @ApiOperation(value="鎺ユ敹瀹炴椂鏁版嵁Json", notes="璁惧瀹炴椂鏁版嵁涓婁紶")
+    @ApiOperation(value = "鎺ユ敹瀹炴椂鏁版嵁Json", notes = "璁惧瀹炴椂鏁版嵁涓婁紶")
     @PostMapping("/sendRealTimeDataJson")
-    public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo)  {
+    public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo) {
         try {
-            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){
+            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())) {
                 MqttMessage mqttMessage = new MqttMessage();
                 mqttMessage.setQos(0);
                 mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataVo).getBytes());
-                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage);
+                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA, mqttMessage);
             }
 
-            if ("user".equals(mqttConfig.getRole())){
+            if ("user".equals(mqttConfig.getRole())) {
                 //澶勭悊鏁呴殰淇℃伅
                 dryRealTimeDataService.fitFaultRecord(realTimeDataVo);
             }
@@ -100,51 +99,32 @@
         return dryRealTimeDataService.realTimeDataHandle(realTimeDataVo);
     }
 
-    @ApiOperation(value="鎺ユ敹瀹炴椂鏁版嵁Json", notes="璁惧瀹炴椂鏁版嵁涓婁紶")
+    @ApiOperation(value = "鎺ユ敹瀹炴椂鏁版嵁Json", notes = "璁惧瀹炴椂鏁版嵁涓婁紶")
     @PostMapping("/sendRealTimeDataJson2")
-    public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo)  {
-        try {
-            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){
-                MqttMessage mqttMessage = new MqttMessage();
-                mqttMessage.setQos(0);
-                mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataParentVo).getBytes());
-                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage);
-                //澶勭悊鏁呴殰淇℃伅
-                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
-            }
-
-            if ("user".equals(mqttConfig.getRole()) && realTimeDataParentVo.getFault() != null){
-                //澶勭悊鏁呴殰淇℃伅
-                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
-            }
-
-        } catch (MqttException e) {
-            e.printStackTrace();
-        }
-
+    public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo) {
         return dryRealTimeDataService.realTimeDataHandle(realTimeDataParentVo);
+
     }
 
 
-
-    @ApiOperation(value="鑾峰彇璁惧瀹炴椂鏁版嵁", notes="閫氳繃绉熸埛ID鍜岃澶囩紪鐮佽幏鍙栧疄鏃舵暟鎹�")
+    @ApiOperation(value = "鑾峰彇璁惧瀹炴椂鏁版嵁", notes = "閫氳繃绉熸埛ID鍜岃澶囩紪鐮佽幏鍙栧疄鏃舵暟鎹�")
     @GetMapping("/getRealTimeData")
     public Result<?> queryMachineRealTimeData(RealTimeDataVo realTimeDataVo) {
         return dryRealTimeDataService.queryMachineRealTImeData(realTimeDataVo);
     }
 
-    @ApiOperation(value="鑾峰彇杞﹂棿缁熻鏁版嵁", notes="閫氳繃绉熸埛ID鑾峰彇杞﹂棿缁熻鏁版嵁")
+    @ApiOperation(value = "鑾峰彇杞﹂棿缁熻鏁版嵁", notes = "閫氳繃绉熸埛ID鑾峰彇杞﹂棿缁熻鏁版嵁")
     @GetMapping("/workshopStatistics")
     public Result<?> workshopStatistics(RealTimeDataVo realTimeDataVo) {
         return dryRealTimeDataService.queryWorkshopStatistics(realTimeDataVo);
     }
 
 
-    @ApiOperation(value="鑾峰彇鎵�鏈夋満鍙�", notes="閫氳繃绉熸埛ID鑾峰彇鎵�鏈夋満鍙版暟鎹�")
+    @ApiOperation(value = "鑾峰彇鎵�鏈夋満鍙�", notes = "閫氳繃绉熸埛ID鑾峰彇鎵�鏈夋満鍙版暟鎹�")
     @GetMapping("/queryAllEqps")
     public Result<?> queryAllEqps(DryEquipment equipment) {
         List<DryEquipment> dryEquipments = dryEquipmentService.queryEqusByTenantId(equipment);
-        return  Result.OK(dryEquipments);
+        return Result.OK(dryEquipments);
     }
 
 
@@ -157,7 +137,7 @@
      * 1013 鐑鍚姩    1014 寮�闂ㄨ瀵�
      * 1015 鍑烘枡鎸夐挳
      */
-    @ApiOperation(value="鍙戦�佹帶鍒舵寚浠�", notes="鍚戞湇鍔$鍙戦�佹帶鍒舵寚浠わ紝鐢辨湇鍔$閫氳繃socket杞彂缁欐帶鍒舵ā鍧�")
+    @ApiOperation(value = "鍙戦�佹帶鍒舵寚浠�", notes = "鍚戞湇鍔$鍙戦�佹帶鍒舵寚浠わ紝鐢辨湇鍔$閫氳繃socket杞彂缁欐帶鍒舵ā鍧�")
     @PostMapping("/sendCommand")
     public Result<?> sendCommand(@RequestBody CommandMessageVo msgVo) {
         return dryRealTimeDataService.sendSocketMsg(msgVo);
@@ -204,19 +184,20 @@
 
     /**
      * 鏍规嵁璁惧鍜岀鎴锋煡璇㈣璁惧绫诲瀷鐨勫共鐕ラ厤鏂癸紝灏嗛厤鏂硅浆鎴恱ml鏍煎紡锛屼互瀛楃涓叉柟寮忚繑鍥�
+     *
      * @param tenantId
      * @param eqpCode
      * @return
      * @throws JAXBException
      */
-    @ApiOperation(value="骞茬嚗閰嶆柟鑾峰彇", notes="骞茬嚗閰嶆柟涓嬪彂")
+    @ApiOperation(value = "骞茬嚗閰嶆柟鑾峰彇", notes = "骞茬嚗閰嶆柟涓嬪彂")
     @GetMapping(value = "/queryFormula")
     public Result<String> queryFormulaByEqpType(Integer tenantId, String eqpCode) throws JAXBException {
         //鑾峰彇request
         HttpServletRequest request = SpringContextUtils.getHttpServletRequest();
         // 鑾峰彇璇锋眰涓绘満鐨処P鍦板潃
         String ip = IpUtils.getIpAddr(request);
-        DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId+ "", eqpCode);
+        DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId + "", eqpCode);
         if (dryEquipment != null) {
             if (dryEquipment.getIp().equals(ip)) {
             } else {
@@ -225,46 +206,46 @@
         } else {
             return Result.error("璁惧涓嶅瓨鍦�");
         }
-            LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>();
-            queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType())
-                    .eq(DryHerbFormula::getTenantId, tenantId);
-            List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper);
-            Formulas formulas = new Formulas();
-            list.forEach(item -> {
-                DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId());
-                if (byId!=null) {
-                    item.setPinyin(byId.getPinyin());
-                    item.setName(byId.getName());
-                }
-                Formula formula = new Formula();
-                BaseParam baseParam = new BaseParam();
-                WaterParam waterParam = new WaterParam();
-                TypeParam typeParam = new TypeParam();
-                OffsetParam offsetParam = new OffsetParam();
-                baseParam.setCode(item.getCode());
-                baseParam.setIndex(item.getCode());
-                baseParam.setName(item.getName());
-                baseParam.setAb(item.getName());
-                baseParam.setTyp(item.getCategory());
-                waterParam.setDelay(Double.valueOf(item.getDelay()));
-                waterParam.setMoisture3(item.getTarget());
-                waterParam.setWeight1(Double.valueOf(item.getFeed()));
-                waterParam.setTimes(item.getEt());
-                waterParam.setTemp1(item.getWindTemp());
-                waterParam.setTemp2(item.getEnvTemp());
-                waterParam.setTemp3(item.getEnvHum());
-                waterParam.setTurntime(item.getTurn());
-                typeParam.setMtype(Integer.valueOf(item.getCategory()));
-                offsetParam.setMoisoffset(item.getMoisOffset());
-                offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration()));
+        LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>();
+        queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType())
+                .eq(DryHerbFormula::getTenantId, tenantId);
+        List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper);
+        Formulas formulas = new Formulas();
+        list.forEach(item -> {
+            DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId());
+            if (byId != null) {
+                item.setPinyin(byId.getPinyin());
+                item.setName(byId.getName());
+            }
+            Formula formula = new Formula();
+            BaseParam baseParam = new BaseParam();
+            WaterParam waterParam = new WaterParam();
+            TypeParam typeParam = new TypeParam();
+            OffsetParam offsetParam = new OffsetParam();
+            baseParam.setCode(item.getCode());
+            baseParam.setIndex(item.getCode());
+            baseParam.setName(item.getName());
+            baseParam.setAb(item.getName());
+            baseParam.setTyp(item.getCategory());
+            waterParam.setDelay(Double.valueOf(item.getDelay()));
+            waterParam.setMoisture3(item.getTarget());
+            waterParam.setWeight1(Double.valueOf(item.getFeed()));
+            waterParam.setTimes(item.getEt());
+            waterParam.setTemp1(item.getWindTemp());
+            waterParam.setTemp2(item.getEnvTemp());
+            waterParam.setTemp3(item.getEnvHum());
+            waterParam.setTurntime(item.getTurn());
+            typeParam.setMtype(Integer.valueOf(item.getCategory()));
+            offsetParam.setMoisoffset(item.getMoisOffset());
+            offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration()));
 
-                formula.setBaseParam(baseParam);
-                formula.setWaterParam(waterParam);
-                formula.setTypeParam(typeParam);
-                formula.setOffsetParam(offsetParam);
-                formulas.getDryFormulaList().add(formula);
-            });
-            // 鎶妚os杞崲鎴恱ml
+            formula.setBaseParam(baseParam);
+            formula.setWaterParam(waterParam);
+            formula.setTypeParam(typeParam);
+            formula.setOffsetParam(offsetParam);
+            formulas.getDryFormulaList().add(formula);
+        });
+        // 鎶妚os杞崲鎴恱ml
         // 鍒涘缓JAXBContext瀹炰緥
         JAXBContext jaxbContext = JAXBContext.newInstance(Formulas.class);
 
@@ -278,12 +259,12 @@
         StringWriter writer = new StringWriter();
         marshaller.marshal(formulas, writer);
 
-            return Result.OK("璇锋眰鎴愬姛",writer.toString());
+        return Result.OK("璇锋眰鎴愬姛", writer.toString());
 
 
     }
 
-    @ApiOperation(value="骞茬嚗閰嶆柟涓婃姤", notes="骞茬嚗閰嶆柟璁板綍涓婃姤")
+    @ApiOperation(value = "骞茬嚗閰嶆柟涓婃姤", notes = "骞茬嚗閰嶆柟璁板綍涓婃姤")
     @PostMapping(value = "/sendFormulaHistory")
     public Result<?> sendFormulaHistory(DryHerbFormulaHisVo hisVo) {
         //鑾峰彇request
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index f669d79..a490a0a 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
 import org.springframework.data.redis.core.RedisTemplate;
 
 import java.util.*;
@@ -101,8 +102,8 @@
             mqttClient.subscribe(MqttConstant.MOBILE_UP);
             System.err.println("admin璁㈤槄" + MqttConstant.MOBILE_UP);
             // 璁㈤槄绉熸埛瀹炴椂鏁版嵁
-            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
-            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
+            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
+            System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
             // 璁㈤槄绉熸埛鎶ヨ鏁版嵁
             mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
             System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
@@ -178,15 +179,17 @@
       for (int i = 0; i < data.size(); i++) {
         JSONObject obj = data.getJSONObject(i);
         JSONObject item = new JSONObject();
+        //clientid
+        String clientid = obj.getString("clientid");
+        item.put("clientid", clientid);
+        //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪
+        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
         //username
         item.put("username", obj.get("username"));
         //杩炴帴鏃堕棿
         String st = obj.getString("connected_at");
         String upTime = DateUtils.zone2Str(st);
         item.put("connectedAt", upTime);
-        //clientid
-        String clientid = obj.getString("clientid");
-        item.put("clientid", clientid);
         //鏄惁杩炴帴
         Boolean connected = obj.getBoolean("connected");
         item.put("connected", connected);
@@ -195,7 +198,7 @@
           String[] info = clientid.split("-");
           item.put("type", info[0]);
           item.put("tenantId", info[1]);
-          //item.put("code", info[2]);
+          item.put("code", info[2]);
 
           if (connected) {
             redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index b381df7..4558603 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -42,6 +42,9 @@
 public class MqttSampleCallback implements MqttCallback {
     @Value(value = "${jeecg.mqtt.role}")
     private String role;
+
+
+
     @Autowired
     private MqttUtil mqttUtil;
     @Autowired
@@ -76,8 +79,8 @@
 
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) {
-        System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
-                + new String(mqttMessage.getPayload()));
+//        System.out.println("鏀跺埌娑堟伅: \n  topic锛�" + topic + "\n  Qos锛�" + mqttMessage.getQos() + "\n  payload锛�"
+//                + new String(mqttMessage.getPayload()));
 
         switch (role) {
             // 绠$悊鍛�
@@ -98,6 +101,8 @@
                         //clientid
                         String clientid = messageJson.getString("clientid");
                         item.put("clientid", clientid);
+                        // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
+                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                         //鏄惁杩炴帴
                         item.put("connected", true);
                         //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
@@ -105,12 +110,14 @@
                             String[] info = clientid.split("-");
                             item.put("type", info[0]);
                             item.put("tenantId", info[1]);
-                            //item.put("code", info[2]);
+                            item.put("code", info[2]);
+                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
+                            System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
-                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
-                        System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+
+
                     }
 
                 }
@@ -204,16 +211,16 @@
                 });
                 break;
 
-            // 鎺ユ敹璁惧瀹炴椂鏁版嵁
+            // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP
             case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
-                ThreadUtil.execute(() -> {
-                    try {
-                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
-                        realTimeDataService.realTimeDataHandle(vo);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                });
+//                ThreadUtil.execute(() -> {
+//                    try {
+//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
+//                        realTimeDataService.realTimeDataHandle(vo);
+//                    } catch (Exception e) {
+//                        e.printStackTrace();
+//                    }
+//                });
 
                 break;
             // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴
@@ -221,40 +228,42 @@
                 ThreadUtil.execute(() -> {
                     try {
                         RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
-                        realTimeDataService.realTimeDataHandle(vo);
+                        synchronized (realTimeDataService) {
+                            realTimeDataService.realTimeDataHandle(vo);
+                        }
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
                 });
                 break;
-            //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁
+            //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP
             case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
 
-                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
-                });
-                //鏁呴殰鏁版嵁
-                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
-                //绉熸埛id
-                String tenantId = realFaultMessage.getTentId();
+//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
+//                });
+//                //鏁呴殰鏁版嵁
+//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
+//                //绉熸埛id
+//                String tenantId = realFaultMessage.getTentId();
                 //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis
-                //杞崲涓� Map<String, Object>
-                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
-                        .collect(Collectors.toMap(
-                                Map.Entry::getKey,
-                                entry -> (Object) entry.getValue()
-                        ));
-                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
-                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if (dryFaultMap.isEmpty()) {
-                    return;
-                }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
-                //鏁版嵁杞崲
-                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
-                //鍙戦�佸箍鎾�
-                System.err.println("骞挎挱缁欙細" + recTopic);
-                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+//                //杞崲涓� Map<String, Object>
+//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
+//                        .collect(Collectors.toMap(
+//                                Map.Entry::getKey,
+//                                entry -> (Object) entry.getValue()
+//                        ));
+//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
+//                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+//                if (dryFaultMap.isEmpty()) {
+//                    return;
+//                }
+//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+//                //鏁版嵁杞崲
+//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+//                //鍙戦�佸箍鎾�
+//                System.err.println("骞挎挱缁欙細" + recTopic);
+//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
             }
             break;
             //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級
@@ -263,7 +272,7 @@
                 if (req.toString().isEmpty() || tenantId == null) {
                     return;
                 }
-                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
+                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                 //杞崲涓� Map<String, DryFaultRecordVo>
                 Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                         .collect(Collectors.toMap(
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
index b744404..bdfb9f8 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
@@ -1,7 +1,10 @@
 package org.jeecg.modules.dry.mqtt;
 
+import cn.hutool.core.thread.ThreadUtil;
+import com.alibaba.fastjson.JSONObject;
 import lombok.Data;
 import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.springframework.stereotype.Component;
 
 @Component
@@ -9,4 +12,30 @@
 public class MqttUtil {
   public   MqttClient mqttClient;
 
+  /**
+   * 鍙戦�佹秷鎭�
+   *
+   * @param topic     璁㈤槄
+   * @param mqMessage 娑堟伅浣�
+   * @param type      1-鍙戦�佺粰绉熸埛   2-鍙戦�佺粰鍥哄畾id
+   */
+  public void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
+    ThreadUtil.execute(() -> {
+      try {
+        if (type == 1) {
+          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+          sendMessage.setQos(0);
+          mqttClient.publish(String.format(topic, mqMessage.getTentId()), sendMessage);
+        } else if (type == 2) {
+          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+          sendMessage.setQos(0);
+          mqttClient.publish(topic, sendMessage);
+        }
+
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    });
+  }
+
 }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 58b10ca..9178cb4 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -6,6 +6,7 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.mina.core.session.IoSession;
@@ -23,6 +24,7 @@
 import org.jeecg.modules.dry.common.CacheConstants;
 import org.jeecg.modules.dry.entity.*;
 import org.jeecg.modules.dry.mqtt.MqMessage;
+import org.jeecg.modules.dry.mqtt.MqttConfig;
 import org.jeecg.modules.dry.mqtt.MqttUtil;
 import org.jeecg.modules.dry.service.*;
 import org.jeecg.modules.dry.socket.ServerHandler;
@@ -54,6 +56,9 @@
     private IDryEquipmentService equipmentService;
 
     @Autowired
+    private IDryEqpTypeService dryEqpTypeService;
+
+    @Autowired
     private RedisUtil redisUtil;
 
     @Autowired
@@ -67,6 +72,7 @@
 
     @Value(value = "${jeecg.mqtt.role}")
     private String role;
+
 
     @Autowired
     private MqttUtil mqttUtil;
@@ -90,24 +96,24 @@
     @Override
     @Transactional
     public Result<?> realTimeDataHandle(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid()+"");
-        log.info("瀹炴椂鏁版嵁锛�"+realTimeDataVo.toString());
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
+        log.info("瀹炴椂鏁版嵁锛�" + realTimeDataVo.toString());
 
 
         // 1 鏌ヨ鎴栧垱寤哄伐鍗�
         // 1.1 浠巖edis鍙栧嚭宸ュ崟缂撳瓨
         DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
-                realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid());
+                realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid());
         // 1.2 濡傛灉鏈夌紦瀛樿褰�
-        if(orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) {
+        if (orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) {
 
-        // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱
+            // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱
         } else {
             // 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉″啀杩斿洖
             orderVo = getOrSaveDryOrderVoDB(realTimeDataVo);
         }
         if (orderVo == null) {
-            log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細"+realTimeDataVo.getWorkorder()+",璁惧锛�" + realTimeDataVo.getMachineid() +",鑽潗锛�" + realTimeDataVo.getName());
+            log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細" + realTimeDataVo.getWorkorder() + ",璁惧锛�" + realTimeDataVo.getMachineid() + ",鑽潗锛�" + realTimeDataVo.getName());
             return Result.error("宸ュ崟涓嶅瓨鍦�");
         }
 
@@ -147,27 +153,25 @@
         orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2());
         // 2.3 鏇存柊鍒皉edis缂撳瓨
         redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
-                realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60);
+                realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60);
         return Result.ok();
     }
-
-
 
 
     @Override
     @Transactional
     public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) {
-        TenantContext.setTenant(realTimeDataParentVo.getTenantid()+"");
-        log.info("瀹炴椂鏁版嵁锛�"+realTimeDataParentVo.toString());
+        TenantContext.setTenant(realTimeDataParentVo.getTenantid() + "");
+        log.info("瀹炴椂鏁版嵁锛�" + realTimeDataParentVo.toString());
         if (realTimeDataParentVo.getRealTime() != null) {
             RealTimeDataVo realTimeDataVo = realTimeDataParentVo.getRealTime();
 
             // 1 鏌ヨ鎴栧垱寤哄伐鍗�
             // 1.1 浠巖edis鍙栧嚭宸ュ崟缂撳瓨
             DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
-                    realTimeDataParentVo.getTenantid()+"_"+realTimeDataParentVo.getMachineid());
+                    realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid());
             // 1.2 濡傛灉鏈夌紦瀛樿褰�
-            if(orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
+            if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
 
                 // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱
             } else {
@@ -175,7 +179,7 @@
                 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo);
             }
             if (orderVo == null) {
-                log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細"+realTimeDataParentVo.getWorkorder()+",璁惧锛�" + realTimeDataParentVo.getMachineid() +",鑽潗锛�" + realTimeDataVo.getName());
+                log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細" + realTimeDataParentVo.getWorkorder() + ",璁惧锛�" + realTimeDataParentVo.getMachineid() + ",鑽潗锛�" + realTimeDataVo.getName());
                 return Result.error("宸ュ崟涓嶅瓨鍦�");
             }
 
@@ -214,18 +218,22 @@
 
             // 2.3 鏇存柊鍒皉edis缂撳瓨
             redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
-                    realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60);
+                    realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60);
         }
 
         if (realTimeDataParentVo.getReport() != null) {
             saveReport(realTimeDataParentVo);
+        }
+
+        if (realTimeDataParentVo.getFault() != null) {
+            fitFaultRecord(realTimeDataParentVo);
         }
         return Result.ok();
     }
 
     private void saveReport(RealTimeDataParentVo realTimeDataParentVo) {
         RealTimeReportVo report = realTimeDataParentVo.getReport();
-        if(report.getReport_flag()) {
+        if (report.getReport_flag()) {
             DryProdRecord prodRecord = new DryProdRecord();
             prodRecord.setReportHeadName(report.getReport_head_name());
             prodRecord.setReportHeadBatch(report.getReport_head_batch());
@@ -236,24 +244,24 @@
             prodRecord.setReportHeadLeader(report.getReport_head_leader());
             prodRecord.setReportHeadTecher(report.getReport_head_techer());
 
-            prodRecord.setReportCheckField(report.getReport_check_field()?1:0);
-            prodRecord.setReportCheckFile(report.getReport_check_file()?1:0);
-            prodRecord.setReportCheckTag(report.getReport_check_tag()?1:0);
-            prodRecord.setReportCheckTool(report.getReport_check_tool()?1:0);
+            prodRecord.setReportCheckField(report.getReport_check_field() ? 1 : 0);
+            prodRecord.setReportCheckFile(report.getReport_check_file() ? 1 : 0);
+            prodRecord.setReportCheckTag(report.getReport_check_tag() ? 1 : 0);
+            prodRecord.setReportCheckTool(report.getReport_check_tool() ? 1 : 0);
             prodRecord.setReportCheckMan(report.getReport_check_man());
-            prodRecord.setReportCheckStatus(report.getReport_check_status()?1:0);
+            prodRecord.setReportCheckStatus(report.getReport_check_status() ? 1 : 0);
             prodRecord.setReportCheckQa(report.getReport_check_qa());
             prodRecord.setReportCheckRecord(report.getReport_check_record());
 
-            prodRecord.setReportProductView(report.getReport_product_view()?1:0);
-            prodRecord.setReportProductWind(report.getReport_product_wind()?1:0);
-            prodRecord.setReportProductSun(report.getReport_product_sun()?1:0);
-            prodRecord.setReportProductLowDry(report.getReport_product_low_dry()?1:0);
-            prodRecord.setReportProductDry(report.getReport_product_dry()?1:0);
+            prodRecord.setReportProductView(report.getReport_product_view() ? 1 : 0);
+            prodRecord.setReportProductWind(report.getReport_product_wind() ? 1 : 0);
+            prodRecord.setReportProductSun(report.getReport_product_sun() ? 1 : 0);
+            prodRecord.setReportProductLowDry(report.getReport_product_low_dry() ? 1 : 0);
+            prodRecord.setReportProductDry(report.getReport_product_dry() ? 1 : 0);
             prodRecord.setReportProductStart(report.getReport_product_start());
             prodRecord.setReportProductEnd(report.getReport_product_end());
             prodRecord.setReportProductTotal(report.getReport_product_total());
-            prodRecord.setReportProductCheck(report.getReport_product_check()?1:0);
+            prodRecord.setReportProductCheck(report.getReport_product_check() ? 1 : 0);
             prodRecord.setReportProductMan1(report.getReport_product_man1());
             prodRecord.setReportProductMan2(report.getReport_product_man2());
             prodRecord.setReportProductWeight(report.getReport_product_weight());
@@ -261,15 +269,15 @@
             prodRecord.setReportProductUse(report.getReport_product_use());
             prodRecord.setReportProductQa(report.getReport_product_qa());
 
-            prodRecord.setReportCleanMachine(report.getReport_clean_machine()?1:0);
-            prodRecord.setReportCleanWaste(report.getReport_clean_waste()?1:0);
-            prodRecord.setReportCleanTool(report.getReport_clean_tool()?1:0);
-            prodRecord.setReportCleanDoor(report.getReport_clean_door()?1:0);
-            prodRecord.setReportCleanBox(report.getReport_clean_box()?1:0);
-            prodRecord.setReportCleanRecord(report.getReport_clean_record()?1:0);
+            prodRecord.setReportCleanMachine(report.getReport_clean_machine() ? 1 : 0);
+            prodRecord.setReportCleanWaste(report.getReport_clean_waste() ? 1 : 0);
+            prodRecord.setReportCleanTool(report.getReport_clean_tool() ? 1 : 0);
+            prodRecord.setReportCleanDoor(report.getReport_clean_door() ? 1 : 0);
+            prodRecord.setReportCleanBox(report.getReport_clean_box() ? 1 : 0);
+            prodRecord.setReportCleanRecord(report.getReport_clean_record() ? 1 : 0);
             prodRecord.setReportCleanDate(report.getReport_clean_date());
             prodRecord.setReportCleanMan(report.getReport_clean_man());
-            prodRecord.setReportCleanConfirm(report.getReport_clean_confirm()?1:0);
+            prodRecord.setReportCleanConfirm(report.getReport_clean_confirm() ? 1 : 0);
             prodRecord.setReportCleanQa(report.getReport_clean_qa());
             prodRecordService.save(prodRecord);
         }
@@ -278,10 +286,12 @@
 
     /**
      * 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉�
+     *
      * @param realTimeDataVo
      * @return
      */
     private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
+        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
         DryOrderVo orderVo;
         LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -293,7 +303,8 @@
             // 杞崲涓虹紦瀛樻暟鎹粨鏋�
             orderVo = BeanUtil.toBean(one, DryOrderVo.class);
             if (one.getTemps() != null) {
-                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){});
+                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() {
+                });
                 orderVo.setBellowsTemp(map);
             }
             // 鏌ヨ绉伴噸璁板綍锛屾坊鍔犲埌缂撳瓨鏁版嵁缁撴瀯
@@ -303,7 +314,7 @@
                 orderVo.setTrendVo(oldVo);
                 orderVo.setDetailList(trendVos);
             }
-        // 3 鏁版嵁搴撴病鏈夊垯鏂板涓�鏉℃暟鎹�
+            // 3 鏁版嵁搴撴病鏈夊垯鏂板涓�鏉℃暟鎹�
         } else {
 
             orderVo = saveNewOrder(realTimeDataVo);
@@ -313,23 +324,60 @@
 
     /**
      * 淇濆瓨鏂板伐鍗�
+     *
      * @param realTimeDataVo
      * @return
      */
     private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
+        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
         DryOrderVo orderVo;
 
         // 鏌ヨ璁惧
         DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo);
         if (equ == null) {
-            log.error("鏈壘鍒拌澶囷細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid());
-            return null;
+            log.error("鏈壘鍒拌澶囷細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
+            log.error("鏂板璁惧锛�");
+            if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) {
+                log.error("鏂板璁惧澶辫触锛氳澶嘔D鎴栫鎴稩D涓虹┖锛乵achineid={}, tenantid={}",
+                        realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid());
+                return null;
+            }
+
+            DryEquipment addEqu = new DryEquipment(realTimeDataVo);
+
+            try {
+                String digits = StringUtils.getDigits(realTimeDataVo.getMachineid());
+                addEqu.setName(Integer.parseInt(digits) + "#骞茬嚗璁惧");
+            } catch (NumberFormatException e) {
+                log.error("璁惧ID鏍煎紡閿欒锛屾棤娉曟彁鍙栨暟瀛楅儴鍒嗭細machineid={}", realTimeDataVo.getMachineid(), e);
+                return null;
+            }
+
+            DryEqpType eqpType = dryEqpTypeService.getOne(
+                    new LambdaQueryWrapper<DryEqpType>()
+                            .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
+            );
+            if(eqpType == null){
+                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() );
+                return null;
+            }
+
+            Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
+
+            if (!equipmentService.save(addEqu)) {
+                log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
+                return null;
+            }
+            equ = addEqu;
+
+            log.info("鏂板璁惧鎴愬姛锛歟quipmentId={}", addEqu.getId());
+
         }
         // 鏌ヨ鑽潗
-        DryHerbFormula herbFormula =  queryHerbByIndexTenant(realTimeDataVo);
+        DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo);
 
         if (herbFormula == null) {
-            log.error("鏈壘鍒拌嵂鏉愶細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid());
+            log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
             return null;
         }
         // 鍒涘缓鏂板伐鍗�
@@ -345,6 +393,7 @@
 
     /**
      * 鏌ヨ璁惧锛屾柊璁惧鍒欐坊鍔犲埌璁惧涓绘暟鎹�
+     *
      * @param realTimeDataVo
      * @return
      */
@@ -354,7 +403,7 @@
         queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid());
         DryEquipment one = equipmentService.getOne(queryWrapper);
         if (one == null) {
-            log.error(role+"淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�"+realTimeDataVo.getMachineid());
+            log.error(role + "淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�" + realTimeDataVo.getMachineid());
 //            one = new DryEquipment(realTimeDataVo);
 //            equipmentService.save(one);
             if (MqttConstant.ROLE_ADMIN.equals(role)) {
@@ -365,8 +414,10 @@
                 object.put("tenantId", realTimeDataVo.getTenantid());
                 mqttMessage.setPayload(object.toJSONString().getBytes());
                 try {
-                    mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage);
-                }catch (MqttException e) {
+                   if(mqttEnable){
+                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
+                   }
+                } catch (MqttException e) {
                     e.printStackTrace();
                 }
 
@@ -378,6 +429,7 @@
 
     /**
      * 鏌ヨ鑽潗锛屾柊鑽潗娣诲姞鍒版暟鎹簱
+     *
      * @param realTimeDataVo
      * @return
      */
@@ -389,7 +441,10 @@
         if (one == null) {
             one = new DryHerbFormula(realTimeDataVo);
             DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
-            one.setEqpType(dryEquipment.getType());
+            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
+                one.setEqpType(dryEquipment.getType());
+            }
+
             dryHerbFormulaService.save(one);
         }
         return one;
@@ -397,14 +452,15 @@
 
     /**
      * 淇濆瓨鍚按鐜囧彉鍖栬褰�
+     *
      * @param trendVo
      * @param orderVo
      */
     private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) {
         //鍒ゆ柇 瀹炴椂鍚按鐜� 鎴� 瀹炴椂閲嶉噺鏈夋病鏈夊彉鍖栵紝鏈夊彉鍖栧垯鏇存柊
-        if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0
-                || orderVo.getTrendVo()!=null &&  trendVo.getWeight() < orderVo.getTrendVo().getWeight()
-                ) {
+        if (orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0
+                || orderVo.getTrendVo() != null && trendVo.getWeight() < orderVo.getTrendVo().getWeight()
+        ) {
             DryOrder byId = dryOrderService.getById(orderVo.getId());
             // 灏嗘渶鏂扮粨鏋滄洿鏂板埌宸ュ崟
             if (byId != null) {
@@ -422,12 +478,13 @@
 
     /**
      * 鏌ヨ鏈哄彴瀹炴椂鏁版嵁
+     *
      * @param realTimeDataVo
      * @return
      */
     @Override
     public Result<?> queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid()+"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
 
         // 鏌ヨ鎵�鏈夋満鍙�,鏌ヨ璇彞缁勮
         LambdaQueryWrapper<DryEquipment> queryWrapper = new LambdaQueryWrapper<>();
@@ -449,13 +506,13 @@
                 dryEquipments.stream().forEach(item -> {
                     // 鑾峰彇宸ュ崟
                     DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode());
-                    list.add(item.getName().substring(0, item.getName().indexOf('#')+1));
+                    list.add(item.getName().substring(0, item.getName().indexOf('#') + 1));
                     if (order != null) {
                         // 璁$畻骞茬嚗鏁堢巼锛岀敤浜庡姣�
                         DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1);
                         double v = order.getOriginWeight() - dryOrderTrendVo.getWeight();
 
-                        if (v > 0 && dryOrderTrendVo.getTotalTime()>0) {
+                        if (v > 0 && dryOrderTrendVo.getTotalTime() > 0) {
                             DecimalFormat df = new DecimalFormat("#.00");
                             dList.add(Double.valueOf(df.format(v / dryOrderTrendVo.getTotalTime() * 60)));
                         } else {
@@ -477,7 +534,7 @@
                 // 鏌ヨ杩戝崄娆℃晥鐜囧拰鑳借兘鑰楀钩鍧�
                 dryOrderService.queryRecentOrderAvg(orderVo);
             }
-        }catch (Exception e) {
+        } catch (Exception e) {
             e.printStackTrace();
         }
         return Result.ok(orderVo);
@@ -491,27 +548,27 @@
         DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(msgVo.getTenantId() + "", msgVo.getMachineId());
         log.info("鑾峰彇璁惧锛�" + dryEquipment.toString());
 
-       // managedSessions.keySet().forEach(addr -> {
-           // ObjectOutputStream oos = null;
-            try {
+        // managedSessions.keySet().forEach(addr -> {
+        // ObjectOutputStream oos = null;
+        try {
 //                Socket socket = SocketServerConfig.clientMap.get(addr);
-                IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp());
-                if (session == null) {
-                    return Result.error("鏈幏鍙栧埌session,璇锋鏌ュ鎴风閰嶇疆鎴栬澶噄p閰嶇疆鏄惁姝e父");
-                }
-                SocketMsgVo smv = new SocketMsgVo(msgVo);
-                session.write(JSONObject.toJSONString(smv));
+            IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp());
+            if (session == null) {
+                return Result.error("鏈幏鍙栧埌session,璇锋鏌ュ鎴风閰嶇疆鎴栬澶噄p閰嶇疆鏄惁姝e父");
+            }
+            SocketMsgVo smv = new SocketMsgVo(msgVo);
+            session.write(JSONObject.toJSONString(smv));
 //                oos = new ObjectOutputStream(socket.getOutputStream());
 //                String s = JSONObject.toJSONString(new SocketMsgVo(msgVo));
 //                oos.writeUTF(s);
 //                oos.flush();
 
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            } finally {
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
 
-            }
-     //   });
+        }
+        //   });
         return Result.OK();
     }
 
@@ -530,15 +587,15 @@
                     orderVo.setEnvHum(order.getEnvHum());
                     orderVo.setEnvTemp(order.getEnvTemp());
                     double watt = order.getWatt() - order.getDetailList().get(0).getWatt();
-                    orderVo.setWatt(orderVo.getWatt()==null? watt : orderVo.getWatt() + watt);
+                    orderVo.setWatt(orderVo.getWatt() == null ? watt : orderVo.getWatt() + watt);
                     double steam = order.getSteam() - order.getDetailList().get(0).getSteam();
-                    orderVo.setSteam(orderVo.getSteam()==null? steam : orderVo.getSteam() + steam);
-                    orderVo.setOriginWeight(orderVo.getOriginWeight()==null? order.getOriginWeight(): orderVo.getOriginWeight() + order.getOriginWeight());
+                    orderVo.setSteam(orderVo.getSteam() == null ? steam : orderVo.getSteam() + steam);
+                    orderVo.setOriginWeight(orderVo.getOriginWeight() == null ? order.getOriginWeight() : orderVo.getOriginWeight() + order.getOriginWeight());
 
-                    double yield = order.getOriginWeight()*(1-(order.getInitial()/100))/(1-(order.getTarget()/100));
-                    orderVo.setYield(orderVo.getYield()==null? yield: orderVo.getYield() + yield);
+                    double yield = order.getOriginWeight() * (1 - (order.getInitial() / 100)) / (1 - (order.getTarget() / 100));
+                    orderVo.setYield(orderVo.getYield() == null ? yield : orderVo.getYield() + yield);
                     double sub = order.getOriginWeight() - order.getYield();
-                    orderVo.setReduce(orderVo.getReduce()==null? sub: orderVo.getReduce() + sub);
+                    orderVo.setReduce(orderVo.getReduce() == null ? sub : orderVo.getReduce() + sub);
 
 
                 }
@@ -555,7 +612,7 @@
 
     @Override
     public Result<?> fitFaultRecord(RealTimeDataVo vo) {
-        TenantContext.setTenant(vo.getTenantid()+"");
+        TenantContext.setTenant(vo.getTenantid() + "");
         ThreadUtil.execute(() -> {
             try {
                 //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
@@ -565,30 +622,28 @@
 
 
                 //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒
-                    Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
-                    if(mqttEnable && !toCloudFaultMap.isEmpty()){
-                        MqMessage< Map<Object, Object>> message = new MqMessage<>();
-                        message.setData(toCloudFaultMap);
-                        message.setTentId(vo.getTenantid()+"");
-                        MqttMessage mqttMessage = new MqttMessage();
-                        mqttMessage.setQos(0);
-                        mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
-                        mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
-                    }
-
-
-                //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰
-                if(!faultRecords1.isEmpty()){
-                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
-                    message.setData(faultRecords1);
-                    message.setTentId(vo.getTenantid()+"");
+                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+                if (mqttEnable && !toCloudFaultMap.isEmpty()) {
+                    MqMessage<Map<Object, Object>> message = new MqMessage<>();
+                    message.setData(toCloudFaultMap);
+                    message.setTentId(vo.getTenantid() + "");
                     MqttMessage mqttMessage = new MqttMessage();
                     mqttMessage.setQos(0);
-                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
-                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
+                    mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
+                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA, mqttMessage);
                 }
 
 
+                //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰
+                if (!faultRecords1.isEmpty()) {
+                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
+                    message.setData(faultRecords1);
+                    message.setTentId(vo.getTenantid() + "");
+                    MqttMessage mqttMessage = new MqttMessage();
+                    mqttMessage.setQos(0);
+                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
+                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA, mqttMessage);
+                }
 
 
             } catch (Exception e) {
@@ -601,41 +656,47 @@
 
     @Override
     public void fitFaultRecord(RealTimeDataParentVo vo) {
-        TenantContext.setTenant(vo.getTenantid()+"");
+        TenantContext.setTenant(vo.getTenantid() + "");
         ThreadUtil.execute(() -> {
             try {
                 //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
-                List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
-                List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
-                faultRecords1.addAll(faultRecords2);
-
-
-                //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒
-                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
-                if(mqttEnable && !toCloudFaultMap.isEmpty()){
-                    MqMessage< Map<Object, Object>> message = new MqMessage<>();
-                    message.setData(toCloudFaultMap);
-                    message.setTentId(vo.getTenantid()+"");
-                    MqttMessage mqttMessage = new MqttMessage();
-                    mqttMessage.setQos(0);
-                    mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
-                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
+                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
+                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
+                if(!errorList.isEmpty()){
+                   log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
+                }
+                if(!warnList.isEmpty()){
+                    log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
                 }
 
+                //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
+                if(!mqttEnable)return;
 
-                //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰
-                if(!faultRecords1.isEmpty()){
-                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
-                    message.setData(faultRecords1);
-                    message.setTentId(vo.getTenantid()+"");
-                    MqttMessage mqttMessage = new MqttMessage();
-                    mqttMessage.setQos(0);
-                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
-                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
+
+
+                //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
+                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+
+
+                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
+                        .collect(Collectors.toMap(
+                                entry -> entry.getKey().toString(),
+                                entry -> (DryFaultRecordVo)entry.getValue()
+                        ));
+
+                String tenantId = vo.getTenantid() +"";
+
+                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+                if (dryFaultMap.isEmpty()) {
+                    return;
                 }
-
-
-
+                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+                //鏁版嵁杞崲
+                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+                //鍙戦�佸箍鎾�
+                log.error("骞挎挱缁欙細{}" , recTopic);
+                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
 
             } catch (Exception e) {
                 e.printStackTrace();
@@ -648,20 +709,21 @@
     /**
      * 瑙f瀽瀛樺偍鏁呴殰鏁版嵁
      * TODO 淇濊瘉鍘熷瓙鎬�
-     * @param fault 鏁呴殰鏁版嵁
-     * @param orderId 宸ュ崟
-     * @param tenantId 绉熸埛
+     *
+     * @param fault     鏁呴殰鏁版嵁
+     * @param orderId   宸ュ崟
+     * @param tenantId  绉熸埛
      * @param machineId 璁惧
      * @param faultType 鏁呴殰绫诲瀷
      * @return 缁勮濂芥晠闅滄暟鎹�
      */
-    private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){
+    private List<DryFaultRecord> fitFault(String fault, String orderId, Integer tenantId, String machineId, Integer faultType) {
         List<DryFaultRecord> result = new ArrayList<>();
-        if(StringUtils.isEmpty(fault))return  result;
+        if (StringUtils.isEmpty(fault)) return result;
         //鏁版嵁鏍锋湰锛�"eqp_fault": "婊氱瓛闄嶈秴鏃�-鎶ヨ,椋庢満杩囨祦鎶ヨ,婊氱瓛鍗囪秴鏃�-鎶ヨ,椋庣鍗囨姤璀�",
-        System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") +  DateUtils.formatDateTime()+"--"+fault);
+        System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "--" + fault);
         //redis涓殑鏁呴殰
-        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
+        Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
         Map<String, Object> redFauMap = rFauMap.entrySet().stream()
                 .collect(Collectors.toMap(
                         entry -> entry.getKey().toString(),  // 閿浆鎹负瀛楃涓�
@@ -669,42 +731,45 @@
                 ));
 
         //娌℃湁鐢熸垚宸ュ崟鐨勬晠闅滄暟鎹笉瀛樺偍
-        if(StringUtils.isEmpty(orderId)){
+        if (StringUtils.isEmpty(orderId)) {
             return result;
         }
 
-        if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){
+        if (StringUtils.isEmpty(fault) && rFauMap.isEmpty()) {
             return result;
         }
         //1.瑙f瀽鏁版嵁
         String[] eqpFaults = fault.split(",");
-        Map<String,DryFaultRecord> addFauMap = new HashMap<>();
-        Map<String,DryFaultRecord> realFauMap = new HashMap<>();
+        Map<String, DryFaultRecord> addFauMap = new HashMap<>();
+        Map<String, DryFaultRecord> realFauMap = new HashMap<>();
         for (int i = 0; i < eqpFaults.length; i++) {
             String eqpFault = eqpFaults[i];
             //閬垮厤绌哄瓧绗︿覆
-            if(StringUtils.isEmpty(eqpFault.trim())) continue;
+            if (StringUtils.isEmpty(eqpFault.trim())) continue;
             //1.1妫�鏌qtt涓槸鍚﹀凡瀛樺湪杩欎釜鏁呴殰
-            String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim();
+            String redisKey = String.format("%s_%s_%s", tenantId, machineId, eqpFault).trim();
 
 
             realFauMap.put(redisKey, new DryFaultRecord());
-            DryFaultRecordVo  rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey);
+            DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey);
             //1.2濡傛灉redis涓嶅瓨鍦ㄥ垯瀛樺叆锛堝瓨鏁呴殰寮�濮嬶級
-            if(rFault ==null){
+            if (rFault == null) {
                 //缁勮缂撳瓨鏁版嵁
 //                DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null);
 //                addFauMap.put(redisKey,faultRecord);
                 Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                 String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
-                DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName);
-                addFauMap.put(redisKey,vo);
-            }else {
+                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
+                addFauMap.put(redisKey, vo);
+            } else {
+                //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂�
+                
+                
                 //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級
-                if(rFault.getECount()!=null && rFault.getECount() > 1){
+                if (rFault.getECount() != null && rFault.getECount() > 1) {
                     rFault.setECount(1);
-                    redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault);
-                    System.err.println("鎶ヨ娆℃暟閲嶇疆 clear clear 锛宬ey-"+redisKey);
+                    redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault);
+                    System.err.println("鎶ヨ娆℃暟閲嶇疆 clear clear 锛宬ey-" + redisKey);
                 }
 
             }
@@ -715,40 +780,38 @@
         //鍚堝苟鏁版嵁
         addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
         //娌℃湁鏂版晠闅滄暟鎹笉鐢ㄨ鐩�
-        if(!addFauMap.isEmpty()){
-            redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap);
+        if (!addFauMap.isEmpty()) {
+            redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap);
         }
 
         //2妫�娴嬪凡缁撴潫鐨勬晠闅�
         //2.1濡傛灉瀹炴椂鏁版嵁涓嶅瓨鍦╮edis瀛樺湪鍒欎唬琛ㄦ晠闅滅粨鏉燂紝瀛樺叆鏁版嵁搴�
-        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
+        Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
         curFauMap.keySet().stream()
                 //鐗瑰埆娉ㄦ剰锛屽涓姤璀︾被鍨嬪叡鐢ㄦ柟娉曢渶瑕佸尯鍒嗙被鍨�
-                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType)
+                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType)
                 .forEach(key -> {
-                    DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key);
-                    vo.setECount(vo.getECount()+1);
-                    if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){
+                    DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key);
+                    vo.setECount(vo.getECount() + 1);
+                    if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) {
                         //鏇存柊娆℃暟
-                        redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo);
-                        System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-"+key);
+                        redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo);
+                        System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key);
                     }
 
-                    if(vo.getECount()>=3){
+                    if (vo.getECount() >= 3) {
                         vo.setEndTime(new Date());
                         //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹�
                         faultRecordService.save(vo);
-                        redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key);
+                        redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key);
                         result.add(vo);
-                        System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") +  DateUtils.formatDateTime()+"瀛樺叆鏁版嵁搴�");
+                        System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�");
                     }
                 });
 
 
-
         return result;
     }
-
 
 
 }

--
Gitblit v1.9.3