package org.jeecg.modules.dry.mqtt;
|
|
import cn.hutool.core.thread.ThreadUtil;
|
import com.alibaba.fastjson.JSONObject;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.jeecg.common.constant.CommonConstant;
|
import org.jeecg.common.constant.MqttConstant;
|
import org.jeecg.common.util.DateUtils;
|
import org.jeecg.common.util.RedisUtil;
|
import org.jeecg.modules.base.service.BaseCommonService;
|
import org.jeecg.modules.dry.api.EmqxApi;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Scope;
|
import org.springframework.stereotype.Component;
|
|
@Component
|
@Scope("prototype")
|
public class MqttSampleCallback implements MqttCallback {
|
@Value(value = "${jeecg.mqtt.role}")
|
private String role;
|
@Autowired
|
private MqttUtil mqttUtil;
|
@Autowired
|
private EmqxApi emqxApi;
|
@Autowired
|
private BaseCommonService baseCommonService;
|
@Autowired
|
private RedisUtil redisUtil;
|
|
|
@Override
|
public void connectionLost(Throwable throwable) {
|
System.err.println("连接断开::掉线");
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:"
|
+ new String(mqttMessage.getPayload()));
|
|
switch (role) {
|
// 管理员
|
case "admin":
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
|
JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
|
if (client == null) {
|
JSONObject item = new JSONObject();
|
//username
|
item.put("username", messageJson.get("username"));
|
//连接时间
|
Long st = messageJson.getLong("connected_at");
|
String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
|
item.put("connectedAt", upTime);
|
//clientid
|
String clientid = messageJson.getString("clientid");
|
item.put("clientid", clientid);
|
//是否连接
|
item.put("connected", true);
|
//
|
String[] info = clientid.split("-");
|
item.put("type", info[0]);
|
item.put("tenantId", info[1]);
|
item.put("code", info[2]);
|
|
redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
|
System.err.println(String.format("设备: %s上线", clientid));
|
}
|
|
}
|
if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
|
String clientid = messageJson.getString("clientid");
|
redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
|
System.err.println(String.format("设备: %s下线", clientid));
|
}
|
parseAdminCommand(topic, mqttMessage);
|
|
break;
|
// 普通用户
|
case "user":
|
System.err.println("user");
|
parseUserCommand(topic, mqttMessage);
|
break;
|
|
}
|
|
}
|
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
System.err.println("消息传递成功");
|
}
|
|
// 解析admin角色指令
|
private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
//请求的客户端(服务端只推送数据到请求的客户端)
|
StringBuilder req = new StringBuilder();
|
if (messageJson.containsKey("req")) {
|
req.append(messageJson.get("req"));
|
}
|
//前端传参时间戳转换
|
if (messageJson.containsKey("timestamp")) {
|
messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
|
}
|
baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
|
switch (topic) {
|
// 查询设备在线
|
case MqttConstant.MOBILE_QUERY_EQU_STATU:
|
System.err.println("admin收到" + topic);
|
// 根据设备id查询设备mqtt在线状态
|
String clientId = messageJson.getString("clientId");
|
JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
|
|
ThreadUtil.execute(() -> {
|
|
if (client == null || client.isEmpty()) {
|
JSONObject res = new JSONObject();
|
res.put("success", false);
|
res.put("msg", "查询失败");
|
try {
|
MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return;
|
}
|
|
client.put("success", true);
|
client.put("msg", "查询成功");
|
try {
|
MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
|
baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
});
|
break;
|
|
}
|
|
}
|
|
|
// 解析user角色指令
|
private void parseUserCommand(String topic, MqttMessage mqttMessage) {
|
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
//请求的客户端(服务端只推送数据到请求的客户端)
|
StringBuilder req = new StringBuilder();
|
if (messageJson.containsKey("req")) {
|
req.append(messageJson.get("req"));
|
}
|
//前端传参时间戳转换
|
if (messageJson.containsKey("timestamp")) {
|
messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
|
}
|
|
switch (topic) {
|
case MqttConstant.MOBILE_REQ_EQU_CMD:
|
System.err.println("user收到" + topic);
|
System.err.println(message);
|
ThreadUtil.execute(() -> {
|
//TODO 向PLC发送开关机操作,并返回信息
|
JSONObject res = new JSONObject();
|
res.put("success", true);
|
res.put("msg", "操作成功");
|
try {
|
MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
|
baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
});
|
|
|
break;
|
}
|
|
}
|
|
}
|