From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期五, 25 七月 2025 15:08:07 +0800
Subject: [PATCH] 接收mqtt数据高并发处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java |  152 ++++++++++++++++++------------
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java                 |  112 +++++++++++++++------
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java                         |    4 
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java             |   15 ++
 jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java                     |    5 
 jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java                                         |   12 +
 6 files changed, 199 insertions(+), 101 deletions(-)

diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 6264a3b..2b95f9f 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -46,7 +46,7 @@
   //鏈嶅姟绔笅琛屾寚浠ゅ墠缂�锛堣繑鍥炵粰绉诲姩绔級
   String SERVICE_DOWN_PREFIX = "service/down/res";
   //杩斿洖绉诲姩绔煡璇㈣澶囩姸鎬�
-  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
+  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/equ/statu/%s";
   //杩斿洖绉诲姩绔繙绋嬭姹傛寚浠�
   String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
 
@@ -63,6 +63,11 @@
   String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s";
   //鏈嶅姟绔悜绉诲姩绔洖澶嶄竴娆¤澶囧疄鏃舵晠闅滃憡璀�
   String  SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s";
+  // 鐩戞祴瀹㈡埛绔繛鎺ユ垨鏂紑锛屾帹閫佹秷鎭埌绉熸埛鍐呮墍鏈夌Щ鍔ㄧ鎻愰啋鏇存柊骞茬嚗璁惧杩炴帴淇℃伅锛�%s-绉熸埛锛�
+  String  SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU = SERVICE_BROADCAST_PREFIX + "/update/equ/statu/%s";
+
+  //鏈嶅姟绔悜绉诲姩绔彂閫佸共鐕ヨ澶囧疄鏃舵暟鎹紙%s-绉熸埛锛�
+  String  SERVICE_BROADCAST_TENANT_REAL_DATA = SERVICE_BROADCAST_PREFIX + "/real/data/%s";
 
 
 
@@ -77,7 +82,7 @@
 
   String TENANT_UP_PREFIX = "tenant/up";
   String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
-  String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp";
+  String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp/test";
   String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data";
   String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data";
 
@@ -96,12 +101,13 @@
 
   /**************************start*******************************/
   /**************************end*******************************/
-  //redis缂撳瓨
+
   //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
   String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
 
 
 
+
   //service(cloud)
   //鍦ㄧ嚎瀹㈡埛绔�
   String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
diff --git a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
index b74fdf1..c82fea9 100644
--- a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
+++ b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
@@ -17,6 +17,8 @@
     private Integer eCount;
     //璁惧鍚嶇О
     private String equName;
+    //璁惧缂栫爜
+    private String equCode;
     //绉熸埛鍚嶇О
     private String tenantName;
     //鏁呴殰鏃堕棿
@@ -28,10 +30,11 @@
         super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime());
         this.eCount = count;
     }
-    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) {
+    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount,String equCode, String equName, String tenantName) {
         super(orderId, tenantId, faultName, faultType, startTime, endTime);
         this.eCount = eCount;
         this.equName = equName;
+        this.equCode = equCode;
         this.tenantName = tenantName;
     }
 }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
index 4fcdc30..4ec431f 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -167,12 +167,23 @@
                 String st = client.getString("connectedAt");
                 vo.setUpTime(st);
                 vo.setClientId(clientid);
+            }else{
+                vo.setClientId(clientid);
+                vo.setOnline(false);
             }
             return vo;
         }).collect(Collectors.toList());
         //鎺掑簭
-        collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
-        collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
+        collect.sort(
+                Comparator.comparing(
+                                MoEquVo::getOnline,
+                                Comparator.nullsLast(Comparator.reverseOrder())  // true 鍦ㄥ墠锛宖alse 鍦ㄥ悗锛宯ull 鏈�鍚�
+                        )
+                        .thenComparing(
+                                DryEquipment::getCode,
+                                Comparator.nullsLast(Comparator.naturalOrder())  // code 鍗囧簭锛宯ull 鏈�鍚�
+                        )
+        );
         BeanUtils.copyProperties(pageList, page);
         page.setRecords(collect);
     }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index a490a0a..65d41d6 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -73,7 +73,7 @@
     mqttConnOpt.setAutomaticReconnect(false);//璁剧疆鏄惁鑷姩閲嶈繛
 
     //閬楀槺娑堟伅 TODO qos2闇�瑕佸湪璁惧涓婄嚎鏃跺仛娓呴櫎娑堟伅鎿嶄綔
-    mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false);
+    //mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false);
     try {
 
       MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence);
@@ -183,6 +183,8 @@
         String clientid = obj.getString("clientid");
         item.put("clientid", clientid);
         //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪
+
+
         if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
         //username
         item.put("username", obj.get("username"));
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 4558603..6fcfa4b 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.TypeReference;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.poi.ss.formula.functions.T;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -42,7 +43,6 @@
 public class MqttSampleCallback implements MqttCallback {
     @Value(value = "${jeecg.mqtt.role}")
     private String role;
-
 
 
     @Autowired
@@ -88,7 +88,7 @@
                 String message = new String(mqttMessage.getPayload());
                 JSONObject messageJson = JSONObject.parseObject(message);
 
-                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
+                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) {
                     JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                     if (client == null) {
                         JSONObject item = new JSONObject();
@@ -102,7 +102,7 @@
                         String clientid = messageJson.getString("clientid");
                         item.put("clientid", clientid);
                         // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
-                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
+                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                         //鏄惁杩炴帴
                         item.put("connected", true);
                         //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡  渚嬶細client-1000)
@@ -113,6 +113,12 @@
                             item.put("code", info[2]);
                             redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                             System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+
+                            // 鎺ㄩ�佸埌绉诲姩绔�
+                            String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId"));
+                            MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic);
+                            sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
                         } catch (Exception e) {
                             e.printStackTrace();
                         }
@@ -124,8 +130,23 @@
                 if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                     try {
                         String clientid = messageJson.getString("clientid");
-                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
+                        // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
+                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
+                        String tenantId = clientid.split("-")[1];
+                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
                         System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+
+                        //鎺ㄩ�佸埌绉诲姩绔�
+                        JSONObject item = new JSONObject();
+                        String[] info = clientid.split("-");
+                        item.put("type", info[0]);
+                        item.put("tenantId", info[1]);
+                        item.put("code", info[2]);
+                        item.put("connected", false);
+                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId);
+                        MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic);
+                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
@@ -177,38 +198,55 @@
         switch (topic) {
             // 鏌ヨ璁惧鍦ㄧ嚎
             case MqttConstant.MOBILE_QUERY_EQU_STATU:
-                System.err.println("admin鏀跺埌" + topic);
-                // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
-                String clientId = messageJson.getString("clientId");
-                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
+                log.info("admin鏀跺埌MQTT璇锋眰锛宼opic: {}", topic);  // 鏀圭敤鏇磋鑼冪殑鏃ュ織璁板綍
 
-                ThreadUtil.execute(() -> {
+                try {
 
-                    if (client == null || client.isEmpty()) {
-                        JSONObject res = new JSONObject();
-                        res.put("success", false);
-                        res.put("msg", "鏌ヨ澶辫触");
-                        try {
-                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
-                            sendMessage.setQos(0);
-                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-                        } catch (Exception e) {
-                            e.printStackTrace();
-                        }
+                    // 1. 鍙傛暟鎻愬彇
+                    String clientId = messageJson.getString("clientId");
+                    if (StringUtils.isEmpty(clientId)) {
                         return;
                     }
+                    String deviceKey = clientId.split("-")[1];  // 鎻愬彇璁惧鏍囪瘑
 
-                    client.put("success", true);
-                    client.put("msg", "鏌ヨ鎴愬姛");
-                    try {
-                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
-                        sendMessage.setQos(0);
-                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
-                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
-                    } catch (Exception e) {
-                        e.printStackTrace();
-                    }
-                });
+                    // 2. 鏌ヨ璁惧鐘舵��
+                    String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey);
+                    JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId);
+
+                    // 3. 寮傛澶勭悊鍝嶅簲
+                    ThreadUtil.execute(() -> {
+                        JSONObject response = new JSONObject();
+
+                        // 3.1 澶勭悊鏌ヨ缁撴灉
+                        if (client == null || client.isEmpty()) {
+                            response.put("success", false);
+                            response.put("msg", "鏌ヨ澶辫触锛岃澶囦笉瀛樺湪鎴栫绾�");
+                        } else {
+                            response = client;  // 澶嶇敤鏌ヨ缁撴灉
+                            response.put("success", true);
+                            response.put("msg", "鏌ヨ鎴愬姛");
+                        }
+
+                        // 3.2 鍙戦�丮QTT鍝嶅簲
+                        try {
+                            String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req);
+                            MqMessage<JSONObject> mqMessage = new MqMessage<>(
+                                    response,
+                                    response.getString("tenantId"),
+                                    resTopic
+                            );
+
+                            sendMqttMessage(resTopic, mqMessage, 2);
+                            log.debug("璁惧鐘舵�佸搷搴斿彂閫佹垚鍔�: {}", response);
+
+                        } catch (Exception e) {
+                            log.error("MQTT鍝嶅簲鍙戦�佸け璐�", e);
+                        }
+                    });
+
+                } catch (Exception e) {
+                    log.error("澶勭悊璁惧鐘舵�佹煡璇㈠紓甯�", e);
+                }
                 break;
 
             // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP
@@ -227,10 +265,16 @@
             case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                 ThreadUtil.execute(() -> {
                     try {
+
                         RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
-                        synchronized (realTimeDataService) {
-                            realTimeDataService.realTimeDataHandle(vo);
-                        }
+                        // 鍚戝悇绉熸埛绉诲姩绔彂閫佹暟鎹�
+                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid());
+                        MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic);
+                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1);
+
+
+                         realTimeDataService.realTimeDataHandle(vo);
+
                     } catch (Exception e) {
                         e.printStackTrace();
                     }
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 9178cb4..8e48a0b 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
 
 import java.text.DecimalFormat;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -79,6 +81,13 @@
 
     @Value(value = "${jeecg.mqtt.enable}")
     private boolean mqttEnable;
+
+    private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
+
+    private ReentrantLock getLock(String tenantId, String type) {
+        String lockKey = tenantId + ":" + type;
+        return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
+    }
 
     public String getTemporaryToken() {
         if (token == null) {
@@ -226,7 +235,16 @@
         }
 
         if (realTimeDataParentVo.getFault() != null) {
-            fitFaultRecord(realTimeDataParentVo);
+            ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
+            faultLock.lock();
+            try {
+                fitFaultRecord(realTimeDataParentVo);
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                faultLock.unlock();
+            }
+
         }
         return Result.ok();
     }
@@ -291,7 +309,7 @@
      * @return
      */
     private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
         LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +347,7 @@
      * @return
      */
     private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
 
         // 鏌ヨ璁惧
@@ -357,16 +375,26 @@
                     new LambdaQueryWrapper<DryEqpType>()
                             .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
             );
-            if(eqpType == null){
-                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() );
+            if (eqpType == null) {
+                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid());
                 return null;
             }
 
             Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
 
-            if (!equipmentService.save(addEqu)) {
-                log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
-                return null;
+            // 璁惧鏂板
+            ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
+
+            equipmentLock.lock();
+            try {
+                if (!equipmentService.save(addEqu)) {
+                    log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
+                    return null;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                equipmentLock.unlock();
             }
             equ = addEqu;
 
@@ -380,14 +408,26 @@
             log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
             return null;
         }
-        // 鍒涘缓鏂板伐鍗�
-        orderVo = new DryOrderVo(realTimeDataVo);
 
-        orderVo.setHerbId(herbFormula.getId());
-        orderVo.setEquId(equ.getId());
-        DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
-        boolean save = dryOrderService.save(dryOrder);
-        return orderVo;
+        // 宸ュ崟鏂板
+        ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
+        orderLock.lock();
+        try {
+            // 鍒涘缓鏂板伐鍗�
+            orderVo = new DryOrderVo(realTimeDataVo);
+            orderVo.setHerbId(herbFormula.getId());
+            orderVo.setEquId(equ.getId());
+            DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
+            boolean save = dryOrderService.save(dryOrder);
+            return orderVo;
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            orderLock.unlock();
+        }
+
+
+        return null;
     }
 
 
@@ -414,9 +454,9 @@
                 object.put("tenantId", realTimeDataVo.getTenantid());
                 mqttMessage.setPayload(object.toJSONString().getBytes());
                 try {
-                   if(mqttEnable){
-                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
-                   }
+                    if (mqttEnable) {
+                        mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
+                    }
                 } catch (MqttException e) {
                     e.printStackTrace();
                 }
@@ -441,7 +481,7 @@
         if (one == null) {
             one = new DryHerbFormula(realTimeDataVo);
             DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
-            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
+            if (dryEquipment != null && dryEquipment.getType() != null) {
                 one.setEqpType(dryEquipment.getType());
             }
 
@@ -657,51 +697,44 @@
     @Override
     public void fitFaultRecord(RealTimeDataParentVo vo) {
         TenantContext.setTenant(vo.getTenantid() + "");
-        ThreadUtil.execute(() -> {
-            try {
-                //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
-                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
-                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
-                if(!errorList.isEmpty()){
-                   log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
-                }
-                if(!warnList.isEmpty()){
-                    log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
-                }
+        //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
+        List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
+        List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
+        if (!errorList.isEmpty()) {
+            log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
+        }
+        if (!warnList.isEmpty()) {
+            log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
+        }
 
-                //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
-                if(!mqttEnable)return;
+        //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
+        if (!mqttEnable) return;
 
 
-
-                //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
-                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+        //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
+        Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
 
 
-                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
-                        .collect(Collectors.toMap(
-                                entry -> entry.getKey().toString(),
-                                entry -> (DryFaultRecordVo)entry.getValue()
-                        ));
+        Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
+                .collect(Collectors.toMap(
+                        entry -> entry.getKey().toString(),
+                        entry -> (DryFaultRecordVo) entry.getValue()
+                ));
 
-                String tenantId = vo.getTenantid() +"";
+        String tenantId = vo.getTenantid() + "";
 
-                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if (dryFaultMap.isEmpty()) {
-                    return;
-                }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
-                //鏁版嵁杞崲
-                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
-                //鍙戦�佸箍鎾�
-                log.error("骞挎挱缁欙細{}" , recTopic);
-                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+        //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+        if (dryFaultMap.isEmpty()) {
+            return;
+        }
+        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+        //鏁版嵁杞崲
+        List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+        MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+        //鍙戦�佸箍鎾�
+        log.error("骞挎挱缁欙細{}", recTopic);
+        mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
 
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
 
     }
 
@@ -759,12 +792,11 @@
 //                addFauMap.put(redisKey,faultRecord);
                 Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                 String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
-                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
+                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
                 addFauMap.put(redisKey, vo);
             } else {
-                //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂�
-                
-                
+
+
                 //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級
                 if (rFault.getECount() != null && rFault.getECount() > 1) {
                     rFault.setECount(1);

--
Gitblit v1.9.3