package org.jeecg.modules.dry.mqtt;
|
|
import cn.hutool.core.thread.ThreadUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
import org.jeecg.common.config.TenantContext;
|
import org.jeecg.common.constant.CommonConstant;
|
import org.jeecg.common.constant.MqttConstant;
|
import org.jeecg.common.util.DateUtils;
|
import org.jeecg.common.util.RedisUtil;
|
import org.jeecg.modules.base.service.BaseCommonService;
|
import org.jeecg.modules.dry.api.EmqxApi;
|
import org.jeecg.modules.dry.entity.DryEquipment;
|
import org.jeecg.modules.dry.entity.DryShop;
|
import org.jeecg.modules.dry.service.IDryEquipmentService;
|
import org.jeecg.modules.dry.service.IDryRealTimeDataService;
|
import org.jeecg.modules.dry.service.IDryShopService;
|
import org.jeecg.modules.dry.vo.DryEquipmentVo;
|
import org.jeecg.modules.dry.vo.RealTimeDataVo;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Scope;
|
import org.springframework.stereotype.Component;
|
|
@Slf4j
|
@Component
|
@Scope("prototype")
|
public class MqttSampleCallback implements MqttCallback {
|
@Value(value = "${jeecg.mqtt.role}")
|
private String role;
|
@Autowired
|
private MqttUtil mqttUtil;
|
@Autowired
|
private EmqxApi emqxApi;
|
@Autowired
|
private BaseCommonService baseCommonService;
|
@Autowired
|
private RedisUtil redisUtil;
|
|
@Autowired
|
private IDryRealTimeDataService realTimeDataService;
|
|
|
@Autowired
|
private IDryEquipmentService equipmentService;
|
|
@Autowired
|
private IDryShopService dryShopService;
|
|
|
|
|
@Override
|
public void connectionLost(Throwable throwable) {
|
System.err.println("连接断开::掉线");
|
}
|
|
@Override
|
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
|
System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:"
|
+ new String(mqttMessage.getPayload()));
|
|
switch (role) {
|
// 管理员
|
case "admin":
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
|
JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
|
if (client == null) {
|
JSONObject item = new JSONObject();
|
//username
|
item.put("username", messageJson.get("username"));
|
//连接时间
|
Long st = messageJson.getLong("connected_at");
|
String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
|
item.put("connectedAt", upTime);
|
//clientid
|
String clientid = messageJson.getString("clientid");
|
item.put("clientid", clientid);
|
//是否连接
|
item.put("connected", true);
|
//
|
String[] info = clientid.split("-");
|
item.put("type", info[0]);
|
item.put("tenantId", info[1]);
|
item.put("code", info[2]);
|
|
redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
|
System.err.println(String.format("设备: %s上线", clientid));
|
}
|
|
}
|
if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
|
String clientid = messageJson.getString("clientid");
|
redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
|
System.err.println(String.format("设备: %s下线", clientid));
|
}
|
parseAdminCommand(topic, mqttMessage);
|
|
break;
|
// 普通用户
|
case "user":
|
System.err.println("user");
|
try {
|
parseUserCommand(topic, mqttMessage);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
@Override
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
System.err.println("消息传递成功");
|
}
|
|
// 解析admin角色指令
|
private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
//请求的客户端(服务端只推送数据到请求的客户端)
|
StringBuilder req = new StringBuilder();
|
if (messageJson.containsKey("req")) {
|
req.append(messageJson.get("req"));
|
}
|
//前端传参时间戳转换
|
if (messageJson.containsKey("timestamp")) {
|
messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
|
}
|
// 实时数据上传太频繁且数据内容超过字段大小不记录日志
|
if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){
|
baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
|
}
|
|
switch (topic) {
|
// 查询设备在线
|
case MqttConstant.MOBILE_QUERY_EQU_STATU:
|
System.err.println("admin收到" + topic);
|
// 根据设备id查询设备mqtt在线状态
|
String clientId = messageJson.getString("clientId");
|
JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
|
|
ThreadUtil.execute(() -> {
|
|
if (client == null || client.isEmpty()) {
|
JSONObject res = new JSONObject();
|
res.put("success", false);
|
res.put("msg", "查询失败");
|
try {
|
MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return;
|
}
|
|
client.put("success", true);
|
client.put("msg", "查询成功");
|
try {
|
MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
|
baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
});
|
break;
|
|
// 接收设备实时数据
|
case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
|
try {
|
RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
|
realTimeDataService.realTimeDataHandle(vo);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
break;
|
case MqttConstant.TENANT_UP_PREFIX_EQU:
|
try {
|
DryEquipment equipment = (DryEquipment) messageJson.get("equipment");
|
|
DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
|
|
if (dryEquipment == null) {
|
equipmentService.save(equipment);
|
}
|
|
|
|
// 获取设备所属车间
|
DryShop shop = (DryShop) messageJson.get("shop");
|
shop.setTenantId(equipment.getTenantId());
|
dryShopService.save(shop);
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
}
|
|
|
// 解析user角色指令
|
private void parseUserCommand(String topic, MqttMessage mqttMessage) {
|
|
String message = new String(mqttMessage.getPayload());
|
JSONObject messageJson = JSONObject.parseObject(message);
|
|
//请求的客户端(服务端只推送数据到请求的客户端)
|
StringBuilder req = new StringBuilder();
|
if (messageJson.containsKey("req")) {
|
req.append(messageJson.get("req"));
|
}
|
//前端传参时间戳转换
|
if (messageJson.containsKey("timestamp")) {
|
messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
|
}
|
|
switch (topic) {
|
case MqttConstant.MOBILE_REQ_EQU_CMD:
|
System.err.println("user收到" + topic);
|
System.err.println(message);
|
ThreadUtil.execute(() -> {
|
//TODO 向PLC发送开关机操作,并返回信息
|
JSONObject res = new JSONObject();
|
res.put("success", true);
|
res.put("msg", "操作成功");
|
try {
|
MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
|
baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
});
|
|
|
break;
|
case MqttConstant.SERVICE_REQ_PREFIX:
|
log.debug("收到设备详细信息查询请求");
|
String tenantId = messageJson.getString("tenantId");
|
if (tenantId!=null && tenantId.equals(TenantContext.getTenant())) {
|
// 根据设备编码查询设备信息
|
String code = messageJson.getString("code");
|
DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code);
|
// 根据设备车间id查询车间信息
|
DryShop shop = dryShopService.getById(equipmentVo.getShopId());
|
|
JSONObject res = new JSONObject();
|
|
res.put("tenant", TenantContext.getTenant());
|
res.put("equipment", equipmentVo);
|
res.put("shop", shop);
|
try {
|
MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
|
sendMessage.setQos(0);
|
mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
|
// baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
|
break;
|
|
}
|
|
}
|
|
}
|