package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryRealTimeDataService; import org.jeecg.modules.dry.service.IDryShopService; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Autowired private EmqxApi emqxApi; @Autowired private BaseCommonService baseCommonService; @Autowired private RedisUtil redisUtil; @Autowired private IDryRealTimeDataService realTimeDataService; @Autowired private IDryEquipmentService equipmentService; @Autowired private IDryShopService dryShopService; @Override public void connectionLost(Throwable throwable) { System.err.println("连接断开::掉线"); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" + new String(mqttMessage.getPayload())); switch (role) { // 管理员 case "admin": String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username item.put("username", messageJson.get("username")); //连接时间 Long st = messageJson.getLong("connected_at"); String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); item.put("connectedAt", upTime); //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); //是否连接 item.put("connected", true); // String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); item.put("code", info[2]); redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { String clientid = messageJson.getString("clientid"); redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); System.err.println(String.format("设备: %s下线", clientid)); } parseAdminCommand(topic, mqttMessage); break; // 普通用户 case "user": System.err.println("user"); try { parseUserCommand(topic, mqttMessage); } catch (Exception e) { e.printStackTrace(); } break; } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.err.println("消息传递成功"); } // 解析admin角色指令 private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } // 实时数据上传太频繁且数据内容超过字段大小不记录日志 if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){ baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { // 查询设备在线 case MqttConstant.MOBILE_QUERY_EQU_STATU: System.err.println("admin收到" + topic); // 根据设备id查询设备mqtt在线状态 String clientId = messageJson.getString("clientId"); JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); ThreadUtil.execute(() -> { if (client == null || client.isEmpty()) { JSONObject res = new JSONObject(); res.put("success", false); res.put("msg", "查询失败"); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); } catch (Exception e) { e.printStackTrace(); } return; } client.put("success", true); client.put("msg", "查询成功"); try { MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; // 接收设备实时数据 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: try { RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); realTimeDataService.realTimeDataHandle(vo); } catch (Exception e) { e.printStackTrace(); } break; case MqttConstant.TENANT_UP_PREFIX_EQU: try { DryEquipment equipment = (DryEquipment) messageJson.get("equipment"); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); if (dryEquipment == null) { equipmentService.save(equipment); } // 获取设备所属车间 DryShop shop = (DryShop) messageJson.get("shop"); shop.setTenantId(equipment.getTenantId()); dryShopService.save(shop); } catch (Exception e) { e.printStackTrace(); } } } // 解析user角色指令 private void parseUserCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } switch (topic) { case MqttConstant.MOBILE_REQ_EQU_CMD: System.err.println("user收到" + topic); System.err.println(message); ThreadUtil.execute(() -> { //TODO 向PLC发送开关机操作,并返回信息 JSONObject res = new JSONObject(); res.put("success", true); res.put("msg", "操作成功"); try { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.SERVICE_REQ_EQU_TOPIC: log.debug("收到设备详细信息查询请求"); // 根据设备编码查询设备信息 String code = messageJson.getString("code"); DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(TenantContext.getTenant(),code); // 根据设备车间id查询车间信息 DryShop shop = dryShopService.getById(equipmentVo.getShopId()); JSONObject res = new JSONObject(); res.put("tenant", TenantContext.getTenant()); res.put("equipment", equipmentVo); res.put("shop", shop); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } break; } } }