package org.jeecg.modules.dry.mqtt;
|
|
import cn.hutool.core.thread.ThreadUtil;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import lombok.Data;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.*;
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
import org.jeecg.common.constant.MqttConstant;
|
import org.jeecg.common.util.DateUtils;
|
import org.jeecg.common.util.RedisUtil;
|
import org.jeecg.modules.dry.api.EmqxApi;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
import java.util.*;
|
|
/**
|
* mqtt
|
*/
|
@Data
|
@Slf4j
|
@Configuration
|
public class MqttConfig {
|
@Value(value = "${jeecg.mqtt.broker}")
|
private String broker;
|
@Value(value = "${jeecg.mqtt.mqtt_name}")
|
private String mqttName;
|
@Value(value = "${jeecg.mqtt.mqtt_pass}")
|
private String mqttPass;
|
@Value(value = "${jeecg.mqtt.mqtt_client_id}")
|
private String mqttClientId;
|
@Value(value = "${jeecg.mqtt.role}")
|
private String role;
|
|
@Autowired
|
private MqttSampleCallback mqttSampleCallback;
|
@Autowired
|
private MqttUtil mqttUtil;
|
@Autowired
|
private RedisUtil redisUtil;
|
@Autowired
|
private EmqxApi emqxApi;
|
@Autowired
|
private RedisTemplate redisTemplate;
|
|
|
@Bean
|
public void initMqtt() {
|
conn();
|
reconn();
|
}
|
|
/**
|
* mqtt连接配置
|
*/
|
private void conn() {
|
MemoryPersistence persistence = new MemoryPersistence();
|
MqttConnectOptions mqttConnOpt = new MqttConnectOptions();
|
mqttConnOpt.setUserName(mqttName);
|
mqttConnOpt.setPassword(mqttPass.toCharArray());
|
mqttConnOpt.setKeepAliveInterval(10);//设置心跳间隔
|
mqttConnOpt.setConnectionTimeout(10);//设置连接超时时间
|
mqttConnOpt.setCleanSession(false);//设置是否清除会话
|
mqttConnOpt.setAutomaticReconnect(false);//设置是否自动重连
|
|
//遗嘱消息 TODO qos2需要在设备上线时做清除消息操作
|
mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false);
|
try {
|
|
MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence);
|
mqttClient.connect(mqttConnOpt);
|
if (mqttClient.isConnected()) {
|
System.err.println("连接成功");
|
// 创建消息并设置 QoS
|
String content = "我是" + mqttName + "_" + mqttClientId + ",我上线了";
|
MqttMessage message = new MqttMessage(content.getBytes());
|
message.setQos(0);
|
// 测试发布消息
|
mqttClient.publish("topline", message);
|
mqttClient.subscribe("message", 0);
|
|
mqttClient.setCallback(mqttSampleCallback);
|
mqttUtil.setMqttClient(mqttClient);
|
//不同的角色需要不同的订阅
|
switch (role) {
|
//管理员
|
case "admin":
|
//订阅系统级设备上下线通知
|
mqttClient.subscribe(MqttConstant.MQTT_TOPIC_ONLINE);
|
mqttClient.subscribe(MqttConstant.MQTT_TOPIC_OFFLINE);
|
|
//订阅移动端上行指令
|
mqttClient.subscribe(MqttConstant.MOBILE_UP);
|
System.err.println("admin订阅" + MqttConstant.MOBILE_UP);
|
// 订阅租户实时数据
|
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
|
System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
|
// 订阅租户报警数据
|
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
|
System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
|
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
|
System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
|
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
|
System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_EQU);
|
initClients();
|
|
break;
|
//普通用户
|
case "user":
|
//普通客户端只需订阅自身相关消息
|
mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
|
mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX);
|
System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
|
break;
|
|
}
|
|
} else {
|
System.err.println("连接失败");
|
}
|
|
} catch (MqttException e) {
|
e.printStackTrace();
|
}
|
|
}
|
|
//重连
|
private void reconn() {
|
Timer timer = new Timer();
|
|
TimerTask task = new TimerTask() {
|
@Override
|
public void run() {
|
// 在这里编写定时执行的任务逻辑
|
System.out.println("定时任务执行:" + new java.util.Date());
|
if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) {
|
try {
|
conn();
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
};
|
|
// 设定定时任务,延迟0毫秒后开始执行,每隔1000毫秒(1秒)执行一次
|
timer.schedule(task, 0, 10000);
|
|
}
|
|
/**
|
* 服务端(admin角色)启动时查询所有设备并缓存到redis
|
*/
|
private void initClients() {
|
//初始化时先删除所有在线设备
|
Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
|
if (keys != null && !keys.isEmpty()) {
|
keys.forEach(key -> System.out.println("初始化删除在线设备: " + key));
|
redisTemplate.delete(keys);
|
} else {
|
System.out.println("初始化无在线设备: " + MqttConstant.MQTT_ONLINE_CLIENT);
|
}
|
|
JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
|
//TODO 根据emqx返回编写实体类
|
if (clients != null && clients.containsKey("data")) {
|
JSONArray data = clients.getJSONArray("data");
|
for (int i = 0; i < data.size(); i++) {
|
JSONObject obj = data.getJSONObject(i);
|
JSONObject item = new JSONObject();
|
//username
|
item.put("username", obj.get("username"));
|
//连接时间
|
String st = obj.getString("connected_at");
|
String upTime = DateUtils.zone2Str(st);
|
item.put("connectedAt", upTime);
|
//clientid
|
String clientid = obj.getString("clientid");
|
item.put("clientid", clientid);
|
//是否连接
|
Boolean connected = obj.getBoolean("connected");
|
item.put("connected", connected);
|
//根据clientid解析(注意配置文件中clientid格式 例:client-1000)
|
try {
|
String[] info = clientid.split("-");
|
item.put("type", info[0]);
|
item.put("tenantId", info[1]);
|
//item.put("code", info[2]);
|
|
if (connected) {
|
redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
|
}
|
}catch (Exception e){
|
e.printStackTrace();
|
}
|
|
|
|
|
}
|
}
|
}
|
|
|
}
|