干燥机配套车间生产管理系统/云平台服务端
zhuguifei
19 小时以前 a0a030ec98b711e82720be38c6ea9ddf4a07a22b
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -24,6 +25,7 @@
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataParentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -41,6 +43,8 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
@@ -75,8 +79,8 @@
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
//        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
//                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
@@ -84,7 +88,7 @@
                String message = new String(mqttMessage.getPayload());
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
@@ -97,6 +101,8 @@
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -104,20 +110,43 @@
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                            // 推送到移动端
                            String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId"));
                            MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic);
                            sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        // 不符合的设备不进行管理
                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                        String tenantId = clientid.split("-")[1];
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                        //推送到移动端
                        JSONObject item = new JSONObject();
                        String[] info = clientid.split("-");
                        item.put("type", info[0]);
                        item.put("tenantId", info[1]);
                        item.put("code", info[2]);
                        item.put("connected", false);
                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId);
                        MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic);
                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
@@ -169,82 +198,116 @@
        switch (topic) {
            // 查询设备在线
            case MqttConstant.MOBILE_QUERY_EQU_STATU:
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
                log.info("admin收到MQTT请求,topic: {}", topic);  // 改用更规范的日志记录
                ThreadUtil.execute(() -> {
                try {
                    if (client == null || client.isEmpty()) {
                        JSONObject res = new JSONObject();
                        res.put("success", false);
                        res.put("msg", "查询失败");
                        try {
                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
                            sendMessage.setQos(0);
                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    // 1. 参数提取
                    String clientId = messageJson.getString("clientId");
                    if (StringUtils.isEmpty(clientId)) {
                        return;
                    }
                    String deviceKey = clientId.split("-")[1];  // 提取设备标识
                    client.put("success", true);
                    client.put("msg", "查询成功");
                    try {
                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
                        sendMessage.setQos(0);
                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                    // 2. 查询设备状态
                    String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey);
                    JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId);
                    // 3. 异步处理响应
                    ThreadUtil.execute(() -> {
                        JSONObject response = new JSONObject();
                        // 3.1 处理查询结果
                        if (client == null || client.isEmpty()) {
                            response.put("success", false);
                            response.put("msg", "查询失败,设备不存在或离线");
                        } else {
                            response = client;  // 复用查询结果
                            response.put("success", true);
                            response.put("msg", "查询成功");
                        }
                        // 3.2 发送MQTT响应
                        try {
                            String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req);
                            MqMessage<JSONObject> mqMessage = new MqMessage<>(
                                    response,
                                    response.getString("tenantId"),
                                    resTopic
                            );
                            sendMqttMessage(resTopic, mqMessage, 2);
                            log.debug("设备状态响应发送成功: {}", response);
                        } catch (Exception e) {
                            log.error("MQTT响应发送失败", e);
                        }
                    });
                } catch (Exception e) {
                    log.error("处理设备状态查询异常", e);
                }
                break;
            // 接收设备实时数据
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
//                ThreadUtil.execute(() -> {
//                    try {
//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
//                        realTimeDataService.realTimeDataHandle(vo);
//                    } catch (Exception e) {
//                        e.printStackTrace();
//                    }
//                });
                break;
            // 接收设备实时数据-机台
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        // 向各租户移动端发送数据
                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid());
                        MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic);
                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1);
                         realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tenantId = realFaultMessage.getTentId();
//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
//                });
//                //故障数据
//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
//                //租户id
//                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
//                //转换为 Map<String, Object>
//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
//                        .collect(Collectors.toMap(
//                                Map.Entry::getKey,
//                                entry -> (Object) entry.getValue()
//                        ));
//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
//                //广播发送给各租户下移动设备
//                if (dryFaultMap.isEmpty()) {
//                    return;
//                }
//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
//                //数据转换
//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
//                //发送广播
//                System.err.println("广播给:" + recTopic);
//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
@@ -253,15 +316,13 @@
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }