From f027c87f28485988b3d25f41b0cdf6e5bd42cd42 Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期三, 30 七月 2025 16:50:37 +0800
Subject: [PATCH] redis工单保存故障数据

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java |  209 +++++++++++++++++++++++++++++++--------------------
 1 files changed, 126 insertions(+), 83 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 9178cb4..7812901 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
 
 import java.text.DecimalFormat;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -79,6 +81,13 @@
 
     @Value(value = "${jeecg.mqtt.enable}")
     private boolean mqttEnable;
+
+    private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
+
+    private ReentrantLock getLock(String tenantId, String type) {
+        String lockKey = tenantId + ":" + type;
+        return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
+    }
 
     public String getTemporaryToken() {
         if (token == null) {
@@ -171,11 +180,12 @@
             DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
                     realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid());
             // 1.2 濡傛灉鏈夌紦瀛樿褰�
-            if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
+            if (orderVo != null && orderVo.getCode() != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
 
                 // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱
             } else {
                 // 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉″啀杩斿洖
+                realTimeDataVo.setWorkorder(realTimeDataParentVo.getWorkorder());
                 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo);
             }
             if (orderVo == null) {
@@ -206,8 +216,12 @@
             orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status());
             orderVo.setEqp_status(realTimeDataVo.getEqp_status());
 //        orderVo.setEqp_state(realTimeDataVo.getEqp_state());
-            orderVo.setWarning(realTimeDataVo.getEqp_warning());
-            orderVo.setFault(realTimeDataVo.getEqp_fault());
+            if(realTimeDataParentVo.getFault()!=null && StringUtils.isNotEmpty(realTimeDataParentVo.getFault().getWarning())){
+                orderVo.setWarning(realTimeDataParentVo.getFault().getWarning());
+            }
+            if(realTimeDataParentVo.getFault()!=null && StringUtils.isNotEmpty(realTimeDataParentVo.getFault().getError())){
+                orderVo.setFault(realTimeDataParentVo.getFault().getError());
+            }
             orderVo.setLevel(realTimeDataVo.getLevel());
             DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo);
             // 2.2 淇濆瓨宸ュ崟鍚按鐜囧彉鍖� 鎴� 閲嶉噺鍙樺寲
@@ -226,7 +240,16 @@
         }
 
         if (realTimeDataParentVo.getFault() != null) {
-            fitFaultRecord(realTimeDataParentVo);
+            ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
+            faultLock.lock();
+            try {
+                fitFaultRecord(realTimeDataParentVo);
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                faultLock.unlock();
+            }
+
         }
         return Result.ok();
     }
@@ -291,7 +314,7 @@
      * @return
      */
     private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
         LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
         queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +352,7 @@
      * @return
      */
     private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
-        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
         DryOrderVo orderVo;
 
         // 鏌ヨ璁惧
@@ -357,16 +380,26 @@
                     new LambdaQueryWrapper<DryEqpType>()
                             .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
             );
-            if(eqpType == null){
-                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() );
+            if (eqpType == null) {
+                log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid());
                 return null;
             }
 
             Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
 
-            if (!equipmentService.save(addEqu)) {
-                log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
-                return null;
+            // 璁惧鏂板
+            ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
+
+            equipmentLock.lock();
+            try {
+                if (!equipmentService.save(addEqu)) {
+                    log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
+                    return null;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                equipmentLock.unlock();
             }
             equ = addEqu;
 
@@ -380,14 +413,26 @@
             log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
             return null;
         }
-        // 鍒涘缓鏂板伐鍗�
-        orderVo = new DryOrderVo(realTimeDataVo);
 
-        orderVo.setHerbId(herbFormula.getId());
-        orderVo.setEquId(equ.getId());
-        DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
-        boolean save = dryOrderService.save(dryOrder);
-        return orderVo;
+        // 宸ュ崟鏂板
+        ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
+        orderLock.lock();
+        try {
+            // 鍒涘缓鏂板伐鍗�
+            orderVo = new DryOrderVo(realTimeDataVo);
+            orderVo.setHerbId(herbFormula.getId());
+            orderVo.setEquId(equ.getId());
+            DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
+            boolean save = dryOrderService.save(dryOrder);
+            return orderVo;
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            orderLock.unlock();
+        }
+
+
+        return null;
     }
 
 
@@ -414,9 +459,9 @@
                 object.put("tenantId", realTimeDataVo.getTenantid());
                 mqttMessage.setPayload(object.toJSONString().getBytes());
                 try {
-                   if(mqttEnable){
-                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
-                   }
+                    if (mqttEnable) {
+                        mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
+                    }
                 } catch (MqttException e) {
                     e.printStackTrace();
                 }
@@ -441,7 +486,7 @@
         if (one == null) {
             one = new DryHerbFormula(realTimeDataVo);
             DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
-            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
+            if (dryEquipment != null && dryEquipment.getType() != null) {
                 one.setEqpType(dryEquipment.getType());
             }
 
@@ -507,7 +552,7 @@
                     // 鑾峰彇宸ュ崟
                     DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode());
                     list.add(item.getName().substring(0, item.getName().indexOf('#') + 1));
-                    if (order != null) {
+                    if (order != null &&  order.getDetailList()!=null && !order.getDetailList().isEmpty()) {
                         // 璁$畻骞茬嚗鏁堢巼锛岀敤浜庡姣�
                         DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1);
                         double v = order.getOriginWeight() - dryOrderTrendVo.getWeight();
@@ -657,51 +702,44 @@
     @Override
     public void fitFaultRecord(RealTimeDataParentVo vo) {
         TenantContext.setTenant(vo.getTenantid() + "");
-        ThreadUtil.execute(() -> {
-            try {
-                //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
-                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
-                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
-                if(!errorList.isEmpty()){
-                   log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
-                }
-                if(!warnList.isEmpty()){
-                    log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
-                }
+        //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
+        List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
+        List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
+        if (!errorList.isEmpty()) {
+            log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
+        }
+        if (!warnList.isEmpty()) {
+            log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
+        }
 
-                //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
-                if(!mqttEnable)return;
+        //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
+        if (!mqttEnable) return;
 
 
-
-                //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
-                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+        //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒  key = tenantId + machineId + eqpFault
+        Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
 
 
-                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
-                        .collect(Collectors.toMap(
-                                entry -> entry.getKey().toString(),
-                                entry -> (DryFaultRecordVo)entry.getValue()
-                        ));
+        Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
+                .collect(Collectors.toMap(
+                        entry -> entry.getKey().toString(),
+                        entry -> (DryFaultRecordVo) entry.getValue()
+                ));
 
-                String tenantId = vo.getTenantid() +"";
+        String tenantId = vo.getTenantid() + "";
 
-                //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
-                if (dryFaultMap.isEmpty()) {
-                    return;
-                }
-                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
-                //鏁版嵁杞崲
-                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
-                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
-                //鍙戦�佸箍鎾�
-                log.error("骞挎挱缁欙細{}" , recTopic);
-                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+        //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+        if (dryFaultMap.isEmpty()) {
+            return;
+        }
+        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+        //鏁版嵁杞崲
+        List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+        MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+        //鍙戦�佸箍鎾�
+        log.error("骞挎挱缁欙細{}", recTopic);
+        mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
 
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
 
     }
 
@@ -759,12 +797,11 @@
 //                addFauMap.put(redisKey,faultRecord);
                 Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                 String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
-                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
+                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
                 addFauMap.put(redisKey, vo);
             } else {
-                //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂�
-                
-                
+
+
                 //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級
                 if (rFault.getECount() != null && rFault.getECount() > 1) {
                     rFault.setECount(1);
@@ -787,27 +824,33 @@
         //2妫�娴嬪凡缁撴潫鐨勬晠闅�
         //2.1濡傛灉瀹炴椂鏁版嵁涓嶅瓨鍦╮edis瀛樺湪鍒欎唬琛ㄦ晠闅滅粨鏉燂紝瀛樺叆鏁版嵁搴�
         Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
-        curFauMap.keySet().stream()
+        Map<Object, DryFaultRecordVo> collect = curFauMap.keySet().stream()
+                .filter(key -> {
+                    String[] split = key.toString().split("_");
+                    return split[0].equals(tenantId + "") && split[1].equals(machineId);
+                })
                 //鐗瑰埆娉ㄦ剰锛屽涓姤璀︾被鍨嬪叡鐢ㄦ柟娉曢渶瑕佸尯鍒嗙被鍨�
-                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType)
-                .forEach(key -> {
-                    DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key);
-                    vo.setECount(vo.getECount() + 1);
-                    if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) {
-                        //鏇存柊娆℃暟
-                        redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo);
-                        System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key);
-                    }
+                .filter(key -> !realFauMap.containsKey(key) && (Objects.equals(((DryFaultRecordVo) curFauMap.get(key)).getFaultType(), faultType)))
+                .collect(Collectors.toMap(key -> key,  // 淇濈暀鍘熼敭
+                        key -> (DryFaultRecordVo) curFauMap.get(key)));
+        collect.keySet().forEach(key -> {
+            DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key);
+            vo.setECount(vo.getECount() + 1);
+            if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) {
+                //鏇存柊娆℃暟
+                redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo);
+                System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key);
+            }
 
-                    if (vo.getECount() >= 3) {
-                        vo.setEndTime(new Date());
-                        //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹�
-                        faultRecordService.save(vo);
-                        redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key);
-                        result.add(vo);
-                        System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�");
-                    }
-                });
+            if (vo.getECount() >= 3) {
+                vo.setEndTime(new Date());
+                //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹�
+                faultRecordService.save(vo);
+                redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key);
+                result.add(vo);
+                System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�");
+            }
+        });
 
 
         return result;

--
Gitblit v1.9.3