From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期二, 22 七月 2025 08:55:15 +0800 Subject: [PATCH] 修改接收实时数据接口-故障处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 59 +++++++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 45 insertions(+), 14 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java old mode 100644 new mode 100755 index b55069f..a490a0a --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java @@ -15,6 +15,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.data.redis.core.RedisTemplate; import java.util.*; @@ -35,6 +37,8 @@ private String mqttClientId; @Value(value = "${jeecg.mqtt.role}") private String role; + @Value(value = "${jeecg.mqtt.enable}") + private boolean enable; @Autowired private MqttSampleCallback mqttSampleCallback; @@ -44,6 +48,8 @@ private RedisUtil redisUtil; @Autowired private EmqxApi emqxApi; + @Autowired + private RedisTemplate redisTemplate; @Bean @@ -56,6 +62,7 @@ * mqtt杩炴帴閰嶇疆 */ private void conn() { + if (!isEnable()) return; MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions mqttConnOpt = new MqttConnectOptions(); mqttConnOpt.setUserName(mqttName); @@ -95,8 +102,15 @@ mqttClient.subscribe(MqttConstant.MOBILE_UP); System.err.println("admin璁㈤槄" + MqttConstant.MOBILE_UP); // 璁㈤槄绉熸埛瀹炴椂鏁版嵁 - mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); - System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); + // 璁㈤槄绉熸埛鎶ヨ鏁版嵁 + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_EQU); initClients(); break; @@ -104,6 +118,7 @@ case "user": //鏅�氬鎴风鍙渶璁㈤槄鑷韩鐩稿叧娑堟伅 mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); + mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX); System.err.println("user璁㈤槄" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); break; @@ -121,13 +136,14 @@ //閲嶈繛 private void reconn() { + if (!isEnable()) return; Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { // 鍦ㄨ繖閲岀紪鍐欏畾鏃舵墽琛岀殑浠诲姟閫昏緫 - System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date()); + //System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date()); if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) { try { conn(); @@ -147,7 +163,14 @@ * 鏈嶅姟绔紙admin瑙掕壊锛夊惎鍔ㄦ椂鏌ヨ鎵�鏈夎澶囧苟缂撳瓨鍒皉edis */ private void initClients() { - redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT); + //鍒濆鍖栨椂鍏堝垹闄ゆ墍鏈夊湪绾胯澶� + Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*")); + if (keys != null && !keys.isEmpty()) { + keys.forEach(key -> System.out.println("鍒濆鍖栧垹闄ゅ湪绾胯澶�: " + key)); + redisTemplate.delete(keys); + } else { + System.out.println("鍒濆鍖栨棤鍦ㄧ嚎璁惧: " + MqttConstant.MQTT_ONLINE_CLIENT); + } JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); //TODO 鏍规嵁emqx杩斿洖缂栧啓瀹炰綋绫� @@ -156,29 +179,37 @@ for (int i = 0; i < data.size(); i++) { JSONObject obj = data.getJSONObject(i); JSONObject item = new JSONObject(); + //clientid + String clientid = obj.getString("clientid"); + item.put("clientid", clientid); + //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪 + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; //username item.put("username", obj.get("username")); //杩炴帴鏃堕棿 String st = obj.getString("connected_at"); String upTime = DateUtils.zone2Str(st); item.put("connectedAt", upTime); - //clientid - String clientid = obj.getString("clientid"); - item.put("clientid", clientid); //鏄惁杩炴帴 Boolean connected = obj.getBoolean("connected"); item.put("connected", connected); - // - String[] info = clientid.split("-"); - item.put("type", info[0]); - item.put("tenantId", info[1]); - item.put("code", info[2]); + //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) + try { + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + item.put("code", info[2]); - if (connected) { - redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); + if (connected) { + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); + } + }catch (Exception e){ + e.printStackTrace(); } + + } } } -- Gitblit v1.9.3