From f027c87f28485988b3d25f41b0cdf6e5bd42cd42 Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期三, 30 七月 2025 16:50:37 +0800 Subject: [PATCH] redis工单保存故障数据 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 209 +++++++++++++++++++++++++++++++-------------------- 1 files changed, 126 insertions(+), 83 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 9178cb4..7812901 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -36,6 +36,8 @@ import java.text.DecimalFormat; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Slf4j @@ -79,6 +81,13 @@ @Value(value = "${jeecg.mqtt.enable}") private boolean mqttEnable; + + private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>(); + + private ReentrantLock getLock(String tenantId, String type) { + String lockKey = tenantId + ":" + type; + return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock()); + } public String getTemporaryToken() { if (token == null) { @@ -171,11 +180,12 @@ DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid()); // 1.2 濡傛灉鏈夌紦瀛樿褰� - if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { + if (orderVo != null && orderVo.getCode() != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱 } else { // 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉″啀杩斿洖 + realTimeDataVo.setWorkorder(realTimeDataParentVo.getWorkorder()); orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { @@ -206,8 +216,12 @@ orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); orderVo.setEqp_status(realTimeDataVo.getEqp_status()); // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); - orderVo.setWarning(realTimeDataVo.getEqp_warning()); - orderVo.setFault(realTimeDataVo.getEqp_fault()); + if(realTimeDataParentVo.getFault()!=null && StringUtils.isNotEmpty(realTimeDataParentVo.getFault().getWarning())){ + orderVo.setWarning(realTimeDataParentVo.getFault().getWarning()); + } + if(realTimeDataParentVo.getFault()!=null && StringUtils.isNotEmpty(realTimeDataParentVo.getFault().getError())){ + orderVo.setFault(realTimeDataParentVo.getFault().getError()); + } orderVo.setLevel(realTimeDataVo.getLevel()); DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo); // 2.2 淇濆瓨宸ュ崟鍚按鐜囧彉鍖� 鎴� 閲嶉噺鍙樺寲 @@ -226,7 +240,16 @@ } if (realTimeDataParentVo.getFault() != null) { - fitFaultRecord(realTimeDataParentVo); + ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault"); + faultLock.lock(); + try { + fitFaultRecord(realTimeDataParentVo); + } catch (Exception e) { + e.printStackTrace(); + } finally { + faultLock.unlock(); + } + } return Result.ok(); } @@ -291,7 +314,7 @@ * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); @@ -329,7 +352,7 @@ * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; // 鏌ヨ璁惧 @@ -357,16 +380,26 @@ new LambdaQueryWrapper<DryEqpType>() .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) ); - if(eqpType == null){ - log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() ); + if (eqpType == null) { + log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid()); return null; } Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); - if (!equipmentService.save(addEqu)) { - log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); - return null; + // 璁惧鏂板 + ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment"); + + equipmentLock.lock(); + try { + if (!equipmentService.save(addEqu)) { + log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); + return null; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + equipmentLock.unlock(); } equ = addEqu; @@ -380,14 +413,26 @@ log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid()); return null; } - // 鍒涘缓鏂板伐鍗� - orderVo = new DryOrderVo(realTimeDataVo); - orderVo.setHerbId(herbFormula.getId()); - orderVo.setEquId(equ.getId()); - DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); - boolean save = dryOrderService.save(dryOrder); - return orderVo; + // 宸ュ崟鏂板 + ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order"); + orderLock.lock(); + try { + // 鍒涘缓鏂板伐鍗� + orderVo = new DryOrderVo(realTimeDataVo); + orderVo.setHerbId(herbFormula.getId()); + orderVo.setEquId(equ.getId()); + DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); + boolean save = dryOrderService.save(dryOrder); + return orderVo; + } catch (Exception e) { + e.printStackTrace(); + } finally { + orderLock.unlock(); + } + + + return null; } @@ -414,9 +459,9 @@ object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { - if(mqttEnable){ - mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); - } + if (mqttEnable) { + mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); + } } catch (MqttException e) { e.printStackTrace(); } @@ -441,7 +486,7 @@ if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); - if (dryEquipment!=null&&dryEquipment.getType()!=null) { + if (dryEquipment != null && dryEquipment.getType() != null) { one.setEqpType(dryEquipment.getType()); } @@ -507,7 +552,7 @@ // 鑾峰彇宸ュ崟 DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); list.add(item.getName().substring(0, item.getName().indexOf('#') + 1)); - if (order != null) { + if (order != null && order.getDetailList()!=null && !order.getDetailList().isEmpty()) { // 璁$畻骞茬嚗鏁堢巼锛岀敤浜庡姣� DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1); double v = order.getOriginWeight() - dryOrderTrendVo.getWeight(); @@ -657,51 +702,44 @@ @Override public void fitFaultRecord(RealTimeDataParentVo vo) { TenantContext.setTenant(vo.getTenantid() + ""); - ThreadUtil.execute(() -> { - try { - //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 - List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); - List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); - if(!errorList.isEmpty()){ - log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); - } - if(!warnList.isEmpty()){ - log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); - } + //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 + List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); + List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); + if (!errorList.isEmpty()) { + log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); + } + if (!warnList.isEmpty()) { + log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); + } - //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 - if(!mqttEnable)return; + //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 + if (!mqttEnable) return; - - //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault - Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); + //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); - Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().toString(), - entry -> (DryFaultRecordVo)entry.getValue() - )); + Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); - String tenantId = vo.getTenantid() +""; + String tenantId = vo.getTenantid() + ""; - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - log.error("骞挎挱缁欙細{}" , recTopic); - mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if (dryFaultMap.isEmpty()) { + return; + } + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); + //鍙戦�佸箍鎾� + log.error("骞挎挱缁欙細{}", recTopic); + mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); - } catch (Exception e) { - e.printStackTrace(); - } - }); } @@ -759,12 +797,11 @@ // addFauMap.put(redisKey,faultRecord); Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); - DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); + DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName); addFauMap.put(redisKey, vo); } else { - //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂� - - + + //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級 if (rFault.getECount() != null && rFault.getECount() > 1) { rFault.setECount(1); @@ -787,27 +824,33 @@ //2妫�娴嬪凡缁撴潫鐨勬晠闅� //2.1濡傛灉瀹炴椂鏁版嵁涓嶅瓨鍦╮edis瀛樺湪鍒欎唬琛ㄦ晠闅滅粨鏉燂紝瀛樺叆鏁版嵁搴� Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); - curFauMap.keySet().stream() + Map<Object, DryFaultRecordVo> collect = curFauMap.keySet().stream() + .filter(key -> { + String[] split = key.toString().split("_"); + return split[0].equals(tenantId + "") && split[1].equals(machineId); + }) //鐗瑰埆娉ㄦ剰锛屽涓姤璀︾被鍨嬪叡鐢ㄦ柟娉曢渶瑕佸尯鍒嗙被鍨� - .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType) - .forEach(key -> { - DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key); - vo.setECount(vo.getECount() + 1); - if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { - //鏇存柊娆℃暟 - redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); - System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key); - } + .filter(key -> !realFauMap.containsKey(key) && (Objects.equals(((DryFaultRecordVo) curFauMap.get(key)).getFaultType(), faultType))) + .collect(Collectors.toMap(key -> key, // 淇濈暀鍘熼敭 + key -> (DryFaultRecordVo) curFauMap.get(key))); + collect.keySet().forEach(key -> { + DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key); + vo.setECount(vo.getECount() + 1); + if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { + //鏇存柊娆℃暟 + redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); + System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key); + } - if (vo.getECount() >= 3) { - vo.setEndTime(new Date()); - //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹� - faultRecordService.save(vo); - redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); - result.add(vo); - System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�"); - } - }); + if (vo.getECount() >= 3) { + vo.setEndTime(new Date()); + //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹� + faultRecordService.save(vo); + redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); + result.add(vo); + System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�"); + } + }); return result; -- Gitblit v1.9.3