package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEqpType; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.DryFaultRecordVo; import org.jeecg.modules.dry.vo.RealTimeDataParentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Autowired private EmqxApi emqxApi; @Autowired private BaseCommonService baseCommonService; @Autowired private RedisUtil redisUtil; @Autowired private IDryRealTimeDataService realTimeDataService; @Autowired private IDryEquipmentService equipmentService; @Autowired private IDryEqpTypeService eqpTypeService; @Autowired private IDryShopService dryShopService; @Autowired private IDryFaultRecordService faultRecordService; @Override public void connectionLost(Throwable throwable) { System.err.println("连接断开::掉线"); System.err.println("连接断开::" + throwable.toString()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" + new String(mqttMessage.getPayload())); switch (role) { // 管理员 case "admin": String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username item.put("username", messageJson.get("username")); //连接时间 Long st = messageJson.getLong("connected_at"); String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); item.put("connectedAt", upTime); //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); //是否连接 item.put("connected", true); //根据clientid解析(注意配置文件中clientid格式 例:client-1000) try { String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); } catch (Exception e) { e.printStackTrace(); } redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); System.err.println(String.format("设备: %s下线", clientid)); } catch (Exception e) { e.printStackTrace(); } } parseAdminCommand(topic, mqttMessage); break; // 普通用户 case "user": System.err.println("user"); try { parseUserCommand(topic, mqttMessage); } catch (Exception e) { e.printStackTrace(); } break; } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.err.println("消息传递成功"); } // 解析admin角色指令 private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } // 实时数据上传太频繁且数据内容超过字段大小不记录日志 if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { // 查询设备在线 case MqttConstant.MOBILE_QUERY_EQU_STATU: System.err.println("admin收到" + topic); // 根据设备id查询设备mqtt在线状态 String clientId = messageJson.getString("clientId"); JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); ThreadUtil.execute(() -> { if (client == null || client.isEmpty()) { JSONObject res = new JSONObject(); res.put("success", false); res.put("msg", "查询失败"); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); } catch (Exception e) { e.printStackTrace(); } return; } client.put("success", true); client.put("msg", "查询成功"); try { MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; // 接收设备实时数据 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: ThreadUtil.execute(() -> { try { RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); realTimeDataService.realTimeDataHandle(vo); } catch (Exception e) { e.printStackTrace(); } }); break; // 接收设备实时数据-机台 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: ThreadUtil.execute(() -> { try { RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); realTimeDataService.realTimeDataHandle(vo); } catch (Exception e) { e.printStackTrace(); } }); break; //各租户上传的实时报警数据 case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { MqMessage> realFaultMessage = JSON.parseObject(message, new TypeReference>>() { }); //故障数据 Map dryFaultMap = realFaultMessage.getData(); //租户id String tenantId = realFaultMessage.getTentId(); //收到租户实时报警数据存入redis //转换为 Map Map objectMap = dryFaultMap.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> (Object) entry.getValue() )); redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); //广播发送给各租户下移动设备 if (dryFaultMap.isEmpty()) { return; } String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); //数据转换 List faultList = new ArrayList((dryFaultMap.values())); MqMessage> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); //发送广播 System.err.println("广播给:" + recTopic); sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } break; //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据) case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: { String tenantId = (String) messageJson.get("tenantId"); if (req.toString().isEmpty() || tenantId == null) { return; } Map objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); //转换为 Map Map dryFaultMap = objFaultMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), entry -> (DryFaultRecordVo) entry.getValue() )); if (dryFaultMap.isEmpty()) { return; } String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req); //数据转换 List faultList = new ArrayList((dryFaultMap.values())); MqMessage> mqMessage = new MqMessage<>(faultList, tenantId, resTopic); //发送请求设备 System.err.println("发送给:" + resTopic); sendMqttMessage(resTopic, mqMessage, 2); } break; // 接收设备报警历史数据 case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { MqMessage> faultMessage = JSON.parseObject(message, new TypeReference>>() { }); // List faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(faultMessage.toString()); faultRecordService.saveBatch(faultMessage.getData()); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.TENANT_UP_PREFIX_EQU: ThreadUtil.execute(() -> { try { Object equObj = messageJson.get("equipment"); DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); TenantContext.setTenant(equipment.getTenantId() + ""); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); if (dryEquipment == null) { equipmentService.save(equipment); } Object typeObj = messageJson.get("eqpType"); DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); if (dryEqpType == null) { eqpTypeService.save(eqpType); } // 获取设备所属车间 Object shopObj = messageJson.get("shop"); DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); DryShop dryShop = dryShopService.getById(shop.getId()); if (dryShop == null) { dryShopService.save(shop); } } catch (Exception e) { e.printStackTrace(); } }); } } // 解析user角色指令 private void parseUserCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } switch (topic) { case MqttConstant.MOBILE_REQ_EQU_CMD: System.err.println("user收到" + topic); System.err.println(message); ThreadUtil.execute(() -> { //TODO 向PLC发送开关机操作,并返回信息 JSONObject res = new JSONObject(); res.put("success", true); res.put("msg", "操作成功"); try { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.SERVICE_REQ_PREFIX: log.debug("收到设备详细信息查询请求"); ThreadUtil.execute(() -> { String tenantId = messageJson.getString("tenantId"); String clientId = mqttUtil.getMqttClient().getClientId(); String tenant = clientId.substring(clientId.lastIndexOf("_") + 1); if (tenantId != null && tenantId.equals(tenant)) { TenantContext.setTenant(tenantId); // 根据设备编码查询设备信息 String code = messageJson.getString("code"); DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code); // 根据设备车间id查询车间信息 DryShop shop = dryShopService.getById(equipmentVo.getShopId()); // 根据设备类型ID查询设备类型信息 DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); JSONObject res = new JSONObject(); res.put("tenant", tenantId); res.put("equipment", equipmentVo); res.put("shop", shop); res.put("eqpType", eqpType); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } } }); break; } } /** * 发送消息 * * @param topic 订阅 * @param mqMessage 消息体 * @param type 1-发送给租户 2-发送给固定id */ private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { ThreadUtil.execute(() -> { try { if (type == 1) { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); } else if (type == 2) { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(topic, sendMessage); } } catch (Exception e) { e.printStackTrace(); } }); } }