package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSONObject; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Autowired private EmqxApi emqxApi; @Autowired private BaseCommonService baseCommonService; @Autowired private RedisUtil redisUtil; @Override public void connectionLost(Throwable throwable) { System.err.println("连接断开::掉线"); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" + new String(mqttMessage.getPayload())); switch (role) { // 管理员 case "admin": String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username item.put("username", messageJson.get("username")); //连接时间 Long st = messageJson.getLong("connected_at"); String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); item.put("connectedAt", upTime); //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); //是否连接 item.put("connected", true); // String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); item.put("code", info[2]); redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { String clientid = messageJson.getString("clientid"); redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); System.err.println(String.format("设备: %s下线", clientid)); } parseAdminCommand(topic, mqttMessage); break; // 普通用户 case "user": System.err.println("user"); parseUserCommand(topic, mqttMessage); break; } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { System.err.println("消息传递成功"); } // 解析admin角色指令 private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); switch (topic) { // 查询设备在线 case MqttConstant.MOBILE_QUERY_EQU_STATU: System.err.println("admin收到" + topic); // 根据设备id查询设备mqtt在线状态 String clientId = messageJson.getString("clientId"); JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); ThreadUtil.execute(() -> { if (client == null || client.isEmpty()) { JSONObject res = new JSONObject(); res.put("success", false); res.put("msg", "查询失败"); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); } catch (Exception e) { e.printStackTrace(); } return; } client.put("success", true); client.put("msg", "查询成功"); try { MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; } } // 解析user角色指令 private void parseUserCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); //请求的客户端(服务端只推送数据到请求的客户端) StringBuilder req = new StringBuilder(); if (messageJson.containsKey("req")) { req.append(messageJson.get("req")); } //前端传参时间戳转换 if (messageJson.containsKey("timestamp")) { messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } switch (topic) { case MqttConstant.MOBILE_REQ_EQU_CMD: System.err.println("user收到" + topic); System.err.println(message); ThreadUtil.execute(() -> { //TODO 向PLC发送开关机操作,并返回信息 JSONObject res = new JSONObject(); res.put("success", true); res.put("msg", "操作成功"); try { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } }); break; } } }