From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期五, 25 七月 2025 15:08:07 +0800 Subject: [PATCH] 接收mqtt数据高并发处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 152 ++++++++++++++++++++++++++++++-------------------- 1 files changed, 92 insertions(+), 60 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 9178cb4..8e48a0b 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -36,6 +36,8 @@ import java.text.DecimalFormat; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Slf4j @@ -79,6 +81,13 @@ @Value(value = "${jeecg.mqtt.enable}") private boolean mqttEnable; + + private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>(); + + private ReentrantLock getLock(String tenantId, String type) { + String lockKey = tenantId + ":" + type; + return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock()); + } public String getTemporaryToken() { if (token == null) { @@ -226,7 +235,16 @@ } if (realTimeDataParentVo.getFault() != null) { - fitFaultRecord(realTimeDataParentVo); + ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault"); + faultLock.lock(); + try { + fitFaultRecord(realTimeDataParentVo); + } catch (Exception e) { + e.printStackTrace(); + } finally { + faultLock.unlock(); + } + } return Result.ok(); } @@ -291,7 +309,7 @@ * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); @@ -329,7 +347,7 @@ * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; // 鏌ヨ璁惧 @@ -357,16 +375,26 @@ new LambdaQueryWrapper<DryEqpType>() .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) ); - if(eqpType == null){ - log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() ); + if (eqpType == null) { + log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid()); return null; } Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); - if (!equipmentService.save(addEqu)) { - log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); - return null; + // 璁惧鏂板 + ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment"); + + equipmentLock.lock(); + try { + if (!equipmentService.save(addEqu)) { + log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); + return null; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + equipmentLock.unlock(); } equ = addEqu; @@ -380,14 +408,26 @@ log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid()); return null; } - // 鍒涘缓鏂板伐鍗� - orderVo = new DryOrderVo(realTimeDataVo); - orderVo.setHerbId(herbFormula.getId()); - orderVo.setEquId(equ.getId()); - DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); - boolean save = dryOrderService.save(dryOrder); - return orderVo; + // 宸ュ崟鏂板 + ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order"); + orderLock.lock(); + try { + // 鍒涘缓鏂板伐鍗� + orderVo = new DryOrderVo(realTimeDataVo); + orderVo.setHerbId(herbFormula.getId()); + orderVo.setEquId(equ.getId()); + DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); + boolean save = dryOrderService.save(dryOrder); + return orderVo; + } catch (Exception e) { + e.printStackTrace(); + } finally { + orderLock.unlock(); + } + + + return null; } @@ -414,9 +454,9 @@ object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { - if(mqttEnable){ - mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); - } + if (mqttEnable) { + mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); + } } catch (MqttException e) { e.printStackTrace(); } @@ -441,7 +481,7 @@ if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); - if (dryEquipment!=null&&dryEquipment.getType()!=null) { + if (dryEquipment != null && dryEquipment.getType() != null) { one.setEqpType(dryEquipment.getType()); } @@ -657,51 +697,44 @@ @Override public void fitFaultRecord(RealTimeDataParentVo vo) { TenantContext.setTenant(vo.getTenantid() + ""); - ThreadUtil.execute(() -> { - try { - //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 - List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); - List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); - if(!errorList.isEmpty()){ - log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); - } - if(!warnList.isEmpty()){ - log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); - } + //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 + List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); + List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); + if (!errorList.isEmpty()) { + log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); + } + if (!warnList.isEmpty()) { + log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); + } - //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 - if(!mqttEnable)return; + //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 + if (!mqttEnable) return; - - //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault - Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); + //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); - Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().toString(), - entry -> (DryFaultRecordVo)entry.getValue() - )); + Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); - String tenantId = vo.getTenantid() +""; + String tenantId = vo.getTenantid() + ""; - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - log.error("骞挎挱缁欙細{}" , recTopic); - mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if (dryFaultMap.isEmpty()) { + return; + } + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); + //鍙戦�佸箍鎾� + log.error("骞挎挱缁欙細{}", recTopic); + mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); - } catch (Exception e) { - e.printStackTrace(); - } - }); } @@ -759,12 +792,11 @@ // addFauMap.put(redisKey,faultRecord); Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); - DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); + DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName); addFauMap.put(redisKey, vo); } else { - //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂� - - + + //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級 if (rFault.getECount() != null && rFault.getECount() > 1) { rFault.setECount(1); -- Gitblit v1.9.3