package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.dry.api.EmqxApi; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.data.redis.core.RedisTemplate; import java.util.*; /** * mqtt */ @Data @Slf4j @Configuration public class MqttConfig { @Value(value = "${jeecg.mqtt.broker}") private String broker; @Value(value = "${jeecg.mqtt.mqtt_name}") private String mqttName; @Value(value = "${jeecg.mqtt.mqtt_pass}") private String mqttPass; @Value(value = "${jeecg.mqtt.mqtt_client_id}") private String mqttClientId; @Value(value = "${jeecg.mqtt.role}") private String role; @Value(value = "${jeecg.mqtt.enable}") private boolean enable; @Autowired private MqttSampleCallback mqttSampleCallback; @Autowired private MqttUtil mqttUtil; @Autowired private RedisUtil redisUtil; @Autowired private EmqxApi emqxApi; @Autowired private RedisTemplate redisTemplate; @Bean public void initMqtt() { conn(); reconn(); } /** * mqtt连接配置 */ private void conn() { if (!isEnable()) return; MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions mqttConnOpt = new MqttConnectOptions(); mqttConnOpt.setUserName(mqttName); mqttConnOpt.setPassword(mqttPass.toCharArray()); mqttConnOpt.setKeepAliveInterval(10);//设置心跳间隔 mqttConnOpt.setConnectionTimeout(10);//设置连接超时时间 mqttConnOpt.setCleanSession(false);//设置是否清除会话 mqttConnOpt.setAutomaticReconnect(false);//设置是否自动重连 //遗嘱消息 TODO qos2需要在设备上线时做清除消息操作 mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false); try { MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence); mqttClient.connect(mqttConnOpt); if (mqttClient.isConnected()) { System.err.println("连接成功"); // 创建消息并设置 QoS String content = "我是" + mqttName + "_" + mqttClientId + ",我上线了"; MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(0); // 测试发布消息 mqttClient.publish("topline", message); mqttClient.subscribe("message", 0); mqttClient.setCallback(mqttSampleCallback); mqttUtil.setMqttClient(mqttClient); //不同的角色需要不同的订阅 switch (role) { //管理员 case "admin": //订阅系统级设备上下线通知 mqttClient.subscribe(MqttConstant.MQTT_TOPIC_ONLINE); mqttClient.subscribe(MqttConstant.MQTT_TOPIC_OFFLINE); //订阅移动端上行指令 mqttClient.subscribe(MqttConstant.MOBILE_UP); System.err.println("admin订阅" + MqttConstant.MOBILE_UP); // 订阅租户实时数据 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); // 订阅租户报警数据 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_EQU); initClients(); break; //普通用户 case "user": //普通客户端只需订阅自身相关消息 mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX); System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); break; } } else { System.err.println("连接失败"); } } catch (MqttException e) { e.printStackTrace(); } } //重连 private void reconn() { if (!isEnable()) return; Timer timer = new Timer(); TimerTask task = new TimerTask() { @Override public void run() { // 在这里编写定时执行的任务逻辑 //System.out.println("定时任务执行:" + new java.util.Date()); if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) { try { conn(); } catch (Exception e) { e.printStackTrace(); } } } }; // 设定定时任务,延迟0毫秒后开始执行,每隔1000毫秒(1秒)执行一次 timer.schedule(task, 0, 10000); } /** * 服务端(admin角色)启动时查询所有设备并缓存到redis */ private void initClients() { //初始化时先删除所有在线设备 Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*")); if (keys != null && !keys.isEmpty()) { keys.forEach(key -> System.out.println("初始化删除在线设备: " + key)); redisTemplate.delete(keys); } else { System.out.println("初始化无在线设备: " + MqttConstant.MQTT_ONLINE_CLIENT); } JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); //TODO 根据emqx返回编写实体类 if (clients != null && clients.containsKey("data")) { JSONArray data = clients.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject obj = data.getJSONObject(i); JSONObject item = new JSONObject(); //clientid String clientid = obj.getString("clientid"); item.put("clientid", clientid); //TODO 校验租户id是否存在 if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; //username item.put("username", obj.get("username")); //连接时间 String st = obj.getString("connected_at"); String upTime = DateUtils.zone2Str(st); item.put("connectedAt", upTime); //是否连接 Boolean connected = obj.getBoolean("connected"); item.put("connected", connected); //根据clientid解析(注意配置文件中clientid格式 例:client-1000) try { String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); item.put("code", info[2]); if (connected) { redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); } }catch (Exception e){ e.printStackTrace(); } } } } }