From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期二, 22 七月 2025 08:55:15 +0800 Subject: [PATCH] 修改接收实时数据接口-故障处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 146 +++++++++++++++++++++++++++++++++--------------- 1 files changed, 101 insertions(+), 45 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java old mode 100755 new mode 100644 index 068d415..4558603 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -24,6 +24,7 @@ import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.DryFaultRecordVo; +import org.jeecg.modules.dry.vo.RealTimeDataParentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -41,6 +42,9 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; + + + @Autowired private MqttUtil mqttUtil; @Autowired @@ -75,8 +79,8 @@ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); +// System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" +// + new String(mqttMessage.getPayload())); switch (role) { // 绠$悊鍛� @@ -85,7 +89,7 @@ JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { - JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); + JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username @@ -97,6 +101,8 @@ //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //鏄惁杩炴帴 item.put("connected", true); //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) @@ -104,19 +110,21 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); - //item.put("code", info[2]); + item.put("code", info[2]); + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); } catch (Exception e) { e.printStackTrace(); } - redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); - redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid); + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); } catch (Exception e) { e.printStackTrace(); @@ -163,7 +171,7 @@ } // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { - // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); + // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { @@ -172,7 +180,7 @@ System.err.println("admin鏀跺埌" + topic); // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId); + JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); ThreadUtil.execute(() -> { @@ -203,48 +211,88 @@ }); break; - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: +// ThreadUtil.execute(() -> { +// try { +// RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); +// realTimeDataService.realTimeDataHandle(vo); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); + + break; + // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴 + case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: ThreadUtil.execute(() -> { try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); + RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); + synchronized (realTimeDataService) { + realTimeDataService.realTimeDataHandle(vo); + } } catch (Exception e) { e.printStackTrace(); } }); - break; - //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 - case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: - MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { - }); - //鏁呴殰鏁版嵁 - Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); - //绉熸埛id - String tentId = realFaultMessage.getTentId(); + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP + case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { + +// MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { +// }); +// //鏁呴殰鏁版嵁 +// Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); +// //绉熸埛id +// String tenantId = realFaultMessage.getTentId(); //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis - //杞崲涓� Map<String, Object> - Map<String, Object> objectMap = dryFaultMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> (Object) entry.getValue() - )); - redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap); - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if(dryFaultMap.isEmpty()){ +// //杞崲涓� Map<String, Object> +// Map<String, Object> objectMap = dryFaultMap.entrySet().stream() +// .collect(Collectors.toMap( +// Map.Entry::getKey, +// entry -> (Object) entry.getValue() +// )); +// redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); +// //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 +// if (dryFaultMap.isEmpty()) { +// return; +// } +// String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); +// //鏁版嵁杞崲 +// List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); +// MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); +// //鍙戦�佸箍鎾� +// System.err.println("骞挎挱缁欙細" + recTopic); +// sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); + } + break; + //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 + case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: { + String tenantId = (String) messageJson.get("tenantId"); + if (req.toString().isEmpty() || tenantId == null) { return; } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); + //杞崲涓� Map<String, DryFaultRecordVo> + Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); + if (dryFaultMap.isEmpty()) { + return; + } + String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req); //鏁版嵁杞崲 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); - //鍙戦�佸箍鎾� - System.err.println("骞挎挱缁欙細" + recTopic); - sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic); + //鍙戦�佽姹傝澶� + System.err.println("鍙戦�佺粰锛�" + resTopic); + sendMqttMessage(resTopic, mqMessage, 2); - break; - // 鎺ユ敹璁惧鎶ヨ鏁版嵁 + } + break; + // 鎺ユ敹璁惧鎶ヨ鍘嗗彶鏁版嵁 case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { @@ -372,21 +420,29 @@ } - } /** * 鍙戦�佹秷鎭� - * @param topic 璁㈤槄 - * @param mqMessage 娑堟伅浣� + * + * @param topic 璁㈤槄 + * @param mqMessage 娑堟伅浣� + * @param type 1-鍙戦�佺粰绉熸埛 2-鍙戦�佺粰鍥哄畾id */ - private void sendMqttMessage(String topic, MqMessage mqMessage){ + private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { ThreadUtil.execute(() -> { try { - MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); - }catch (Exception e){ + if (type == 1) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); + } else if (type == 2) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(topic, sendMessage); + } + + } catch (Exception e) { e.printStackTrace(); } }); -- Gitblit v1.9.3