From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期五, 25 七月 2025 15:08:07 +0800 Subject: [PATCH] 接收mqtt数据高并发处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 152 ++++++++++++++++++------------ jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 112 +++++++++++++++------ jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 4 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java | 15 ++ jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java | 5 jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 12 + 6 files changed, 199 insertions(+), 101 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java index 6264a3b..2b95f9f 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java @@ -46,7 +46,7 @@ //鏈嶅姟绔笅琛屾寚浠ゅ墠缂�锛堣繑鍥炵粰绉诲姩绔級 String SERVICE_DOWN_PREFIX = "service/down/res"; //杩斿洖绉诲姩绔煡璇㈣澶囩姸鎬� - String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu"; + String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/equ/statu/%s"; //杩斿洖绉诲姩绔繙绋嬭姹傛寚浠� String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd"; @@ -63,6 +63,11 @@ String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"; //鏈嶅姟绔悜绉诲姩绔洖澶嶄竴娆¤澶囧疄鏃舵晠闅滃憡璀� String SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s"; + // 鐩戞祴瀹㈡埛绔繛鎺ユ垨鏂紑锛屾帹閫佹秷鎭埌绉熸埛鍐呮墍鏈夌Щ鍔ㄧ鎻愰啋鏇存柊骞茬嚗璁惧杩炴帴淇℃伅锛�%s-绉熸埛锛� + String SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU = SERVICE_BROADCAST_PREFIX + "/update/equ/statu/%s"; + + //鏈嶅姟绔悜绉诲姩绔彂閫佸共鐕ヨ澶囧疄鏃舵暟鎹紙%s-绉熸埛锛� + String SERVICE_BROADCAST_TENANT_REAL_DATA = SERVICE_BROADCAST_PREFIX + "/real/data/%s"; @@ -77,7 +82,7 @@ String TENANT_UP_PREFIX = "tenant/up"; String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; - String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp"; + String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp/test"; String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data"; String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data"; @@ -96,12 +101,13 @@ /**************************start*******************************/ /**************************end*******************************/ - //redis缂撳瓨 + //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛� String MQTT_REAL_FAULT = "mqtt:real:fault:%s"; + //service(cloud) //鍦ㄧ嚎瀹㈡埛绔� String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s"; diff --git a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java index b74fdf1..c82fea9 100644 --- a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java +++ b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java @@ -17,6 +17,8 @@ private Integer eCount; //璁惧鍚嶇О private String equName; + //璁惧缂栫爜 + private String equCode; //绉熸埛鍚嶇О private String tenantName; //鏁呴殰鏃堕棿 @@ -28,10 +30,11 @@ super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime()); this.eCount = count; } - public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) { + public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount,String equCode, String equName, String tenantName) { super(orderId, tenantId, faultName, faultType, startTime, endTime); this.eCount = eCount; this.equName = equName; + this.equCode = equCode; this.tenantName = tenantName; } } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java index 4fcdc30..4ec431f 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java @@ -167,12 +167,23 @@ String st = client.getString("connectedAt"); vo.setUpTime(st); vo.setClientId(clientid); + }else{ + vo.setClientId(clientid); + vo.setOnline(false); } return vo; }).collect(Collectors.toList()); //鎺掑簭 - collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder()))); - collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder()))); + collect.sort( + Comparator.comparing( + MoEquVo::getOnline, + Comparator.nullsLast(Comparator.reverseOrder()) // true 鍦ㄥ墠锛宖alse 鍦ㄥ悗锛宯ull 鏈�鍚� + ) + .thenComparing( + DryEquipment::getCode, + Comparator.nullsLast(Comparator.naturalOrder()) // code 鍗囧簭锛宯ull 鏈�鍚� + ) + ); BeanUtils.copyProperties(pageList, page); page.setRecords(collect); } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java index a490a0a..65d41d6 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java @@ -73,7 +73,7 @@ mqttConnOpt.setAutomaticReconnect(false);//璁剧疆鏄惁鑷姩閲嶈繛 //閬楀槺娑堟伅 TODO qos2闇�瑕佸湪璁惧涓婄嚎鏃跺仛娓呴櫎娑堟伅鎿嶄綔 - mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false); + //mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false); try { MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence); @@ -183,6 +183,8 @@ String clientid = obj.getString("clientid"); item.put("clientid", clientid); //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪 + + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; //username item.put("username", obj.get("username")); diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index 4558603..6fcfa4b 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -42,7 +43,6 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; - @Autowired @@ -88,7 +88,7 @@ String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) { JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); @@ -102,7 +102,7 @@ String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� - if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; + if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //鏄惁杩炴帴 item.put("connected", true); //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) @@ -113,6 +113,12 @@ item.put("code", info[2]); redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + // 鎺ㄩ�佸埌绉诲姩绔� + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId")); + MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); + } catch (Exception e) { e.printStackTrace(); } @@ -124,8 +130,23 @@ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); - redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; + String tenantId = clientid.split("-")[1]; + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid); System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + + //鎺ㄩ�佸埌绉诲姩绔� + JSONObject item = new JSONObject(); + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + item.put("code", info[2]); + item.put("connected", false); + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId); + MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); + } catch (Exception e) { e.printStackTrace(); } @@ -177,38 +198,55 @@ switch (topic) { // 鏌ヨ璁惧鍦ㄧ嚎 case MqttConstant.MOBILE_QUERY_EQU_STATU: - System.err.println("admin鏀跺埌" + topic); - // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� - String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); + log.info("admin鏀跺埌MQTT璇锋眰锛宼opic: {}", topic); // 鏀圭敤鏇磋鑼冪殑鏃ュ織璁板綍 - ThreadUtil.execute(() -> { + try { - if (client == null || client.isEmpty()) { - JSONObject res = new JSONObject(); - res.put("success", false); - res.put("msg", "鏌ヨ澶辫触"); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - } catch (Exception e) { - e.printStackTrace(); - } + // 1. 鍙傛暟鎻愬彇 + String clientId = messageJson.getString("clientId"); + if (StringUtils.isEmpty(clientId)) { return; } + String deviceKey = clientId.split("-")[1]; // 鎻愬彇璁惧鏍囪瘑 - client.put("success", true); - client.put("msg", "鏌ヨ鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - }); + // 2. 鏌ヨ璁惧鐘舵�� + String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey); + JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId); + + // 3. 寮傛澶勭悊鍝嶅簲 + ThreadUtil.execute(() -> { + JSONObject response = new JSONObject(); + + // 3.1 澶勭悊鏌ヨ缁撴灉 + if (client == null || client.isEmpty()) { + response.put("success", false); + response.put("msg", "鏌ヨ澶辫触锛岃澶囦笉瀛樺湪鎴栫绾�"); + } else { + response = client; // 澶嶇敤鏌ヨ缁撴灉 + response.put("success", true); + response.put("msg", "鏌ヨ鎴愬姛"); + } + + // 3.2 鍙戦�丮QTT鍝嶅簲 + try { + String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req); + MqMessage<JSONObject> mqMessage = new MqMessage<>( + response, + response.getString("tenantId"), + resTopic + ); + + sendMqttMessage(resTopic, mqMessage, 2); + log.debug("璁惧鐘舵�佸搷搴斿彂閫佹垚鍔�: {}", response); + + } catch (Exception e) { + log.error("MQTT鍝嶅簲鍙戦�佸け璐�", e); + } + }); + + } catch (Exception e) { + log.error("澶勭悊璁惧鐘舵�佹煡璇㈠紓甯�", e); + } break; // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP @@ -227,10 +265,16 @@ case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: ThreadUtil.execute(() -> { try { + RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); - synchronized (realTimeDataService) { - realTimeDataService.realTimeDataHandle(vo); - } + // 鍚戝悇绉熸埛绉诲姩绔彂閫佹暟鎹� + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid()); + MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1); + + + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { e.printStackTrace(); } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 9178cb4..8e48a0b 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -36,6 +36,8 @@ import java.text.DecimalFormat; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Slf4j @@ -79,6 +81,13 @@ @Value(value = "${jeecg.mqtt.enable}") private boolean mqttEnable; + + private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>(); + + private ReentrantLock getLock(String tenantId, String type) { + String lockKey = tenantId + ":" + type; + return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock()); + } public String getTemporaryToken() { if (token == null) { @@ -226,7 +235,16 @@ } if (realTimeDataParentVo.getFault() != null) { - fitFaultRecord(realTimeDataParentVo); + ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault"); + faultLock.lock(); + try { + fitFaultRecord(realTimeDataParentVo); + } catch (Exception e) { + e.printStackTrace(); + } finally { + faultLock.unlock(); + } + } return Result.ok(); } @@ -291,7 +309,7 @@ * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); @@ -329,7 +347,7 @@ * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid() +""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); DryOrderVo orderVo; // 鏌ヨ璁惧 @@ -357,16 +375,26 @@ new LambdaQueryWrapper<DryEqpType>() .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) ); - if(eqpType == null){ - log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() ); + if (eqpType == null) { + log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid()); return null; } Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); - if (!equipmentService.save(addEqu)) { - log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); - return null; + // 璁惧鏂板 + ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment"); + + equipmentLock.lock(); + try { + if (!equipmentService.save(addEqu)) { + log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); + return null; + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + equipmentLock.unlock(); } equ = addEqu; @@ -380,14 +408,26 @@ log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid()); return null; } - // 鍒涘缓鏂板伐鍗� - orderVo = new DryOrderVo(realTimeDataVo); - orderVo.setHerbId(herbFormula.getId()); - orderVo.setEquId(equ.getId()); - DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); - boolean save = dryOrderService.save(dryOrder); - return orderVo; + // 宸ュ崟鏂板 + ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order"); + orderLock.lock(); + try { + // 鍒涘缓鏂板伐鍗� + orderVo = new DryOrderVo(realTimeDataVo); + orderVo.setHerbId(herbFormula.getId()); + orderVo.setEquId(equ.getId()); + DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); + boolean save = dryOrderService.save(dryOrder); + return orderVo; + } catch (Exception e) { + e.printStackTrace(); + } finally { + orderLock.unlock(); + } + + + return null; } @@ -414,9 +454,9 @@ object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { - if(mqttEnable){ - mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); - } + if (mqttEnable) { + mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); + } } catch (MqttException e) { e.printStackTrace(); } @@ -441,7 +481,7 @@ if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); - if (dryEquipment!=null&&dryEquipment.getType()!=null) { + if (dryEquipment != null && dryEquipment.getType() != null) { one.setEqpType(dryEquipment.getType()); } @@ -657,51 +697,44 @@ @Override public void fitFaultRecord(RealTimeDataParentVo vo) { TenantContext.setTenant(vo.getTenantid() + ""); - ThreadUtil.execute(() -> { - try { - //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 - List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); - List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); - if(!errorList.isEmpty()){ - log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); - } - if(!warnList.isEmpty()){ - log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); - } + //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 + List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); + List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); + if (!errorList.isEmpty()) { + log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); + } + if (!warnList.isEmpty()) { + log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); + } - //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 - if(!mqttEnable)return; + //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 + if (!mqttEnable) return; - - //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault - Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); + //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); - Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() - .collect(Collectors.toMap( - entry -> entry.getKey().toString(), - entry -> (DryFaultRecordVo)entry.getValue() - )); + Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); - String tenantId = vo.getTenantid() +""; + String tenantId = vo.getTenantid() + ""; - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - log.error("骞挎挱缁欙細{}" , recTopic); - mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if (dryFaultMap.isEmpty()) { + return; + } + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); + //鍙戦�佸箍鎾� + log.error("骞挎挱缁欙細{}", recTopic); + mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); - } catch (Exception e) { - e.printStackTrace(); - } - }); } @@ -759,12 +792,11 @@ // addFauMap.put(redisKey,faultRecord); Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); - DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); + DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName); addFauMap.put(redisKey, vo); } else { - //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂� - - + + //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級 if (rFault.getECount() != null && rFault.getECount() > 1) { rFault.setECount(1); -- Gitblit v1.9.3