package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEqpType; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Autowired private EmqxApi emqxApi; @Autowired private BaseCommonService baseCommonService; @Autowired private RedisUtil redisUtil; @Autowired private IDryRealTimeDataService realTimeDataService; @Autowired private IDryEquipmentService equipmentService; @Autowired private IDryEqpTypeService eqpTypeService; @Autowired private IDryShopService dryShopService; @Autowired private IDryFaultRecordService faultRecordService; @Override public void connectionLost(Throwable throwable) { System.err.println("连接断开::掉线"); System.err.println("连接断开::"+throwable.toString()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" + new String(mqttMessage.getPayload())); switch (role) { // 管理员 case "admin": String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username item.put("username", messageJson.get("username")); //连接时间 Long st = messageJson.getLong("connected_at"); String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); item.put("connectedAt", upTime); //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); //是否连接 item.put("connected", true); //根据clientid解析(注意配置文件中clientid格式 例:client-1000) try { String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); }catch (Exception e){ e.printStackTrace(); } redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); System.err.println(String.format("设备: %s下线", clientid)); } catch (Exception e) { e.printStackTrace(); } } parseAdminCommand(topic, mqttMessage); break; // 普通用户 case "user": System.err.println("user"); try { parseUserCommand(topic, mqttMessage); } catch (Exception e) { e.printStackTrace(); } break; } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.err.println("消息传递成功"); } // 解析admin角色指令 private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } // 实时数据上传太频繁且数据内容超过字段大小不记录日志 if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { // 查询设备在线 case MqttConstant.MOBILE_QUERY_EQU_STATU: System.err.println("admin收到" + topic); // 根据设备id查询设备mqtt在线状态 String clientId = messageJson.getString("clientId"); JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); ThreadUtil.execute(() -> { if (client == null || client.isEmpty()) { JSONObject res = new JSONObject(); res.put("success", false); res.put("msg", "查询失败"); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); } catch (Exception e) { e.printStackTrace(); } return; } client.put("success", true); client.put("msg", "查询成功"); try { MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; // 接收设备实时数据 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: ThreadUtil.execute(() -> { try { RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); realTimeDataService.realTimeDataHandle(vo); } catch (Exception e) { e.printStackTrace(); } }); break; // 接收设备报警数据 case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { MqMessage> listMqMessage = JSON.parseObject(message, new TypeReference>>() { }); // List faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(listMqMessage.toString()); faultRecordService.saveBatch(listMqMessage.getData()); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.TENANT_UP_PREFIX_EQU: ThreadUtil.execute(() -> { try { Object equObj = messageJson.get("equipment"); DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); TenantContext.setTenant(equipment.getTenantId()+""); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); if (dryEquipment == null) { equipmentService.save(equipment); } Object typeObj = messageJson.get("eqpType"); DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); if (dryEqpType == null) { eqpTypeService.save(eqpType); } // 获取设备所属车间 Object shopObj = messageJson.get("shop"); DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); DryShop dryShop = dryShopService.getById(shop.getId()); if (dryShop == null) { dryShopService.save(shop); } } catch (Exception e) { e.printStackTrace(); } }); } } // 解析user角色指令 private void parseUserCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } switch (topic) { case MqttConstant.MOBILE_REQ_EQU_CMD: System.err.println("user收到" + topic); System.err.println(message); ThreadUtil.execute(() -> { //TODO 向PLC发送开关机操作,并返回信息 JSONObject res = new JSONObject(); res.put("success", true); res.put("msg", "操作成功"); try { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.SERVICE_REQ_PREFIX: log.debug("收到设备详细信息查询请求"); ThreadUtil.execute(() -> { String tenantId = messageJson.getString("tenantId"); String clientId = mqttUtil.getMqttClient().getClientId(); String tenant = clientId.substring(clientId.lastIndexOf("_")+1); if (tenantId!=null && tenantId.equals(tenant)) { TenantContext.setTenant(tenantId); // 根据设备编码查询设备信息 String code = messageJson.getString("code"); DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code); // 根据设备车间id查询车间信息 DryShop shop = dryShopService.getById(equipmentVo.getShopId()); // 根据设备类型ID查询设备类型信息 DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); JSONObject res = new JSONObject(); res.put("tenant", tenantId); res.put("equipment", equipmentVo); res.put("shop", shop); res.put("eqpType", eqpType); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } } }); break; } } }