From f027c87f28485988b3d25f41b0cdf6e5bd42cd42 Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期三, 30 七月 2025 16:50:37 +0800 Subject: [PATCH] redis工单保存故障数据 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 193 ++++++++++++++++++++++++++++++----------------- 1 files changed, 123 insertions(+), 70 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index b381df7..6fcfa4b 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -42,6 +43,8 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; + + @Autowired private MqttUtil mqttUtil; @Autowired @@ -76,8 +79,8 @@ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); +// System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" +// + new String(mqttMessage.getPayload())); switch (role) { // 绠$悊鍛� @@ -85,7 +88,7 @@ String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) { JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); @@ -98,6 +101,8 @@ //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //鏄惁杩炴帴 item.put("connected", true); //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) @@ -105,20 +110,43 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); - //item.put("code", info[2]); + item.put("code", info[2]); + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + // 鎺ㄩ�佸埌绉诲姩绔� + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId")); + MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); + } catch (Exception e) { e.printStackTrace(); } - redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); - redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; + String tenantId = clientid.split("-")[1]; + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid); System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + + //鎺ㄩ�佸埌绉诲姩绔� + JSONObject item = new JSONObject(); + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + item.put("code", info[2]); + item.put("connected", false); + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId); + MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); + } catch (Exception e) { e.printStackTrace(); } @@ -170,91 +198,116 @@ switch (topic) { // 鏌ヨ璁惧鍦ㄧ嚎 case MqttConstant.MOBILE_QUERY_EQU_STATU: - System.err.println("admin鏀跺埌" + topic); - // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� - String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); + log.info("admin鏀跺埌MQTT璇锋眰锛宼opic: {}", topic); // 鏀圭敤鏇磋鑼冪殑鏃ュ織璁板綍 - ThreadUtil.execute(() -> { + try { - if (client == null || client.isEmpty()) { - JSONObject res = new JSONObject(); - res.put("success", false); - res.put("msg", "鏌ヨ澶辫触"); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - } catch (Exception e) { - e.printStackTrace(); - } + // 1. 鍙傛暟鎻愬彇 + String clientId = messageJson.getString("clientId"); + if (StringUtils.isEmpty(clientId)) { return; } + String deviceKey = clientId.split("-")[1]; // 鎻愬彇璁惧鏍囪瘑 - client.put("success", true); - client.put("msg", "鏌ヨ鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - }); + // 2. 鏌ヨ璁惧鐘舵�� + String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey); + JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId); + + // 3. 寮傛澶勭悊鍝嶅簲 + ThreadUtil.execute(() -> { + JSONObject response = new JSONObject(); + + // 3.1 澶勭悊鏌ヨ缁撴灉 + if (client == null || client.isEmpty()) { + response.put("success", false); + response.put("msg", "鏌ヨ澶辫触锛岃澶囦笉瀛樺湪鎴栫绾�"); + } else { + response = client; // 澶嶇敤鏌ヨ缁撴灉 + response.put("success", true); + response.put("msg", "鏌ヨ鎴愬姛"); + } + + // 3.2 鍙戦�丮QTT鍝嶅簲 + try { + String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req); + MqMessage<JSONObject> mqMessage = new MqMessage<>( + response, + response.getString("tenantId"), + resTopic + ); + + sendMqttMessage(resTopic, mqMessage, 2); + log.debug("璁惧鐘舵�佸搷搴斿彂閫佹垚鍔�: {}", response); + + } catch (Exception e) { + log.error("MQTT鍝嶅簲鍙戦�佸け璐�", e); + } + }); + + } catch (Exception e) { + log.error("澶勭悊璁惧鐘舵�佹煡璇㈠紓甯�", e); + } break; - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - ThreadUtil.execute(() -> { - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } - }); +// ThreadUtil.execute(() -> { +// try { +// RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); +// realTimeDataService.realTimeDataHandle(vo); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); break; // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: ThreadUtil.execute(() -> { try { + RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); - realTimeDataService.realTimeDataHandle(vo); + // 鍚戝悇绉熸埛绉诲姩绔彂閫佹暟鎹� + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid()); + MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1); + + + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { e.printStackTrace(); } }); break; - //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { - MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { - }); - //鏁呴殰鏁版嵁 - Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); - //绉熸埛id - String tenantId = realFaultMessage.getTentId(); +// MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { +// }); +// //鏁呴殰鏁版嵁 +// Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); +// //绉熸埛id +// String tenantId = realFaultMessage.getTentId(); //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis - //杞崲涓� Map<String, Object> - Map<String, Object> objectMap = dryFaultMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> (Object) entry.getValue() - )); - redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - System.err.println("骞挎挱缁欙細" + recTopic); - sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); +// //杞崲涓� Map<String, Object> +// Map<String, Object> objectMap = dryFaultMap.entrySet().stream() +// .collect(Collectors.toMap( +// Map.Entry::getKey, +// entry -> (Object) entry.getValue() +// )); +// redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); +// //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 +// if (dryFaultMap.isEmpty()) { +// return; +// } +// String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); +// //鏁版嵁杞崲 +// List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); +// MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); +// //鍙戦�佸箍鎾� +// System.err.println("骞挎挱缁欙細" + recTopic); +// sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } break; //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 @@ -263,7 +316,7 @@ if (req.toString().isEmpty() || tenantId == null) { return; } - Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); //杞崲涓� Map<String, DryFaultRecordVo> Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() .collect(Collectors.toMap( -- Gitblit v1.9.3