From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期五, 25 七月 2025 15:08:07 +0800
Subject: [PATCH] 接收mqtt数据高并发处理

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java |  152 ++++++++++++++++++++++++++++++--------------------
 1 files changed, 92 insertions(+), 60 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 9178cb4..8e48a0b 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
 
 import java.text.DecimalFormat;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -79,6 +81,13 @@
 
     @Value(value = "${jeecg.mqtt.enable}")
     private boolean mqttEnable;
+
+    private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
+
+    private ReentrantLock getLock(String tenantId, String type) {
+        String lockKey = tenantId + ":" + type;
+        return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
+    }
 
     public String getTemporaryToken() {
         if (token == null) {
@@ -226,7 +235,16 @@
         }
 
         if (realTimeDataParentVo.getFault() != null) {
-            fitFaultRecord(realTimeDataParentVo);
+            ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
+            faultLock.lock();
+            try {
+                fitFaultRecord(realTimeDataParentVo);
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                faultLock.unlock();
+            }
+
         }
         return Result.ok();
     }
@@ -291,7 +309,7 @@
      * @return
      */
     private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
         LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +347,7 @@
      * @return
      */
     private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
 
         // 鏌ヨ璁惧
@@ -357,16 +375,26 @@
                     new LambdaQueryWrapper<DryEqpType>()
                             .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
             );
-            if(eqpType == null){
-                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() );
+            if (eqpType == null) {
+                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid());
                 return null;
             }
 
             Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
 
-            if (!equipmentService.save(addEqu)) {
-                log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
-                return null;
+            // 璁惧鏂板
+            ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
+
+            equipmentLock.lock();
+            try {
+                if (!equipmentService.save(addEqu)) {
+                    log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
+                    return null;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                equipmentLock.unlock();
             }
             equ = addEqu;
 
@@ -380,14 +408,26 @@
             log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
             return null;
         }
-        // 鍒涘缓鏂板伐鍗�
-        orderVo = new DryOrderVo(realTimeDataVo);
 
-        orderVo.setHerbId(herbFormula.getId());
-        orderVo.setEquId(equ.getId());
-        DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
-        boolean save = dryOrderService.save(dryOrder);
-        return orderVo;
+        // 宸ュ崟鏂板
+        ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
+        orderLock.lock();
+        try {
+            // 鍒涘缓鏂板伐鍗�
+            orderVo = new DryOrderVo(realTimeDataVo);
+            orderVo.setHerbId(herbFormula.getId());
+            orderVo.setEquId(equ.getId());
+            DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
+            boolean save = dryOrderService.save(dryOrder);
+            return orderVo;
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            orderLock.unlock();
+        }
+
+
+        return null;
     }
 
 
@@ -414,9 +454,9 @@
                 object.put("tenantId", realTimeDataVo.getTenantid());
                 mqttMessage.setPayload(object.toJSONString().getBytes());
                 try {
-                   if(mqttEnable){
-                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
-                   }
+                    if (mqttEnable) {
+                        mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
+                    }
                 } catch (MqttException e) {
                     e.printStackTrace();
                 }
@@ -441,7 +481,7 @@
         if (one == null) {
             one = new DryHerbFormula(realTimeDataVo);
             DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
-            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
+            if (dryEquipment != null && dryEquipment.getType() != null) {
                 one.setEqpType(dryEquipment.getType());
             }
 
@@ -657,51 +697,44 @@
     @Override
     public void fitFaultRecord(RealTimeDataParentVo vo) {
         TenantContext.setTenant(vo.getTenantid() + "");
-        ThreadUtil.execute(() -> {
-            try {
-                //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
-                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
-                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
-                if(!errorList.isEmpty()){
-                   log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
-                }
-                if(!warnList.isEmpty()){
-                    log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
-                }
+        //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
+        List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
+        List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
+        if (!errorList.isEmpty()) {
+            log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
+        }
+        if (!warnList.isEmpty()) {
+            log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
+        }
 
-                //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
-                if(!mqttEnable)return;
+        //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
+        if (!mqttEnable) return;
 
 
-
-                //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
-                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+        //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
+        Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
 
 
-                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
-                        .collect(Collectors.toMap(
-                                entry -> entry.getKey().toString(),
-                                entry -> (DryFaultRecordVo)entry.getValue()
-                        ));
+        Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
+                .collect(Collectors.toMap(
+                        entry -> entry.getKey().toString(),
+                        entry -> (DryFaultRecordVo) entry.getValue()
+                ));
 
-                String tenantId = vo.getTenantid() +"";
+        String tenantId = vo.getTenantid() + "";
 
-                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if (dryFaultMap.isEmpty()) {
-                    return;
-                }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
-                //鏁版嵁杞崲
-                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
-                //鍙戦�佸箍鎾�
-                log.error("骞挎挱缁欙細{}" , recTopic);
-                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+        //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+        if (dryFaultMap.isEmpty()) {
+            return;
+        }
+        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+        //鏁版嵁杞崲
+        List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+        MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+        //鍙戦�佸箍鎾�
+        log.error("骞挎挱缁欙細{}", recTopic);
+        mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
 
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
 
     }
 
@@ -759,12 +792,11 @@
 //                addFauMap.put(redisKey,faultRecord);
                 Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                 String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
-                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
+                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
                 addFauMap.put(redisKey, vo);
             } else {
-                //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂�
-                
-                
+
+
                 //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級
                 if (rFault.getECount() != null && rFault.getECount() > 1) {
                     rFault.setECount(1);

--
Gitblit v1.9.3