From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期二, 22 七月 2025 08:55:15 +0800 Subject: [PATCH] 修改接收实时数据接口-故障处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 91 +++++++++++++++++++++++++-------------------- 1 files changed, 50 insertions(+), 41 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index b381df7..4558603 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -42,6 +42,9 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; + + + @Autowired private MqttUtil mqttUtil; @Autowired @@ -76,8 +79,8 @@ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); +// System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" +// + new String(mqttMessage.getPayload())); switch (role) { // 绠$悊鍛� @@ -98,6 +101,8 @@ //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //鏄惁杩炴帴 item.put("connected", true); //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) @@ -105,12 +110,14 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); - //item.put("code", info[2]); + item.put("code", info[2]); + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); } catch (Exception e) { e.printStackTrace(); } - redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + } } @@ -204,16 +211,16 @@ }); break; - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - ThreadUtil.execute(() -> { - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } - }); +// ThreadUtil.execute(() -> { +// try { +// RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); +// realTimeDataService.realTimeDataHandle(vo); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); break; // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴 @@ -221,40 +228,42 @@ ThreadUtil.execute(() -> { try { RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); - realTimeDataService.realTimeDataHandle(vo); + synchronized (realTimeDataService) { + realTimeDataService.realTimeDataHandle(vo); + } } catch (Exception e) { e.printStackTrace(); } }); break; - //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { - MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { - }); - //鏁呴殰鏁版嵁 - Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); - //绉熸埛id - String tenantId = realFaultMessage.getTentId(); +// MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { +// }); +// //鏁呴殰鏁版嵁 +// Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); +// //绉熸埛id +// String tenantId = realFaultMessage.getTentId(); //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis - //杞崲涓� Map<String, Object> - Map<String, Object> objectMap = dryFaultMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> (Object) entry.getValue() - )); - redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - System.err.println("骞挎挱缁欙細" + recTopic); - sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); +// //杞崲涓� Map<String, Object> +// Map<String, Object> objectMap = dryFaultMap.entrySet().stream() +// .collect(Collectors.toMap( +// Map.Entry::getKey, +// entry -> (Object) entry.getValue() +// )); +// redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); +// //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 +// if (dryFaultMap.isEmpty()) { +// return; +// } +// String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); +// //鏁版嵁杞崲 +// List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); +// MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); +// //鍙戦�佸箍鎾� +// System.err.println("骞挎挱缁欙細" + recTopic); +// sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } break; //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 @@ -263,7 +272,7 @@ if (req.toString().isEmpty() || tenantId == null) { return; } - Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); //杞崲涓� Map<String, DryFaultRecordVo> Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() .collect(Collectors.toMap( -- Gitblit v1.9.3