From 7c585586e9bea943161676bd9d127e81123891c3 Mon Sep 17 00:00:00 2001 From: baoshiwei <baoshiwei@shlanbao.cn> Date: 星期三, 11 十二月 2024 11:01:35 +0800 Subject: [PATCH] Merge branch 'refs/heads/master' into herb --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 28 +++++++++++++++++++++++----- 1 files changed, 23 insertions(+), 5 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java old mode 100644 new mode 100755 index 41d3db4..f669d79 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java @@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.core.RedisTemplate; import java.util.*; @@ -35,6 +36,8 @@ private String mqttClientId; @Value(value = "${jeecg.mqtt.role}") private String role; + @Value(value = "${jeecg.mqtt.enable}") + private boolean enable; @Autowired private MqttSampleCallback mqttSampleCallback; @@ -44,6 +47,8 @@ private RedisUtil redisUtil; @Autowired private EmqxApi emqxApi; + @Autowired + private RedisTemplate redisTemplate; @Bean @@ -56,6 +61,7 @@ * mqtt杩炴帴閰嶇疆 */ private void conn() { + if (!isEnable()) return; MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions mqttConnOpt = new MqttConnectOptions(); mqttConnOpt.setUserName(mqttName); @@ -98,6 +104,8 @@ mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); // 璁㈤槄绉熸埛鎶ヨ鏁版嵁 + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); @@ -127,13 +135,14 @@ //閲嶈繛 private void reconn() { + if (!isEnable()) return; Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { // 鍦ㄨ繖閲岀紪鍐欏畾鏃舵墽琛岀殑浠诲姟閫昏緫 - System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date()); + //System.out.println("瀹氭椂浠诲姟鎵ц锛�" + new java.util.Date()); if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) { try { conn(); @@ -153,7 +162,14 @@ * 鏈嶅姟绔紙admin瑙掕壊锛夊惎鍔ㄦ椂鏌ヨ鎵�鏈夎澶囧苟缂撳瓨鍒皉edis */ private void initClients() { - redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT); + //鍒濆鍖栨椂鍏堝垹闄ゆ墍鏈夊湪绾胯澶� + Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*")); + if (keys != null && !keys.isEmpty()) { + keys.forEach(key -> System.out.println("鍒濆鍖栧垹闄ゅ湪绾胯澶�: " + key)); + redisTemplate.delete(keys); + } else { + System.out.println("鍒濆鍖栨棤鍦ㄧ嚎璁惧: " + MqttConstant.MQTT_ONLINE_CLIENT); + } JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); //TODO 鏍规嵁emqx杩斿洖缂栧啓瀹炰綋绫� @@ -180,13 +196,15 @@ item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); + + if (connected) { + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); + } }catch (Exception e){ e.printStackTrace(); } - if (connected) { - redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); - } + } -- Gitblit v1.9.3