干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2 天以前 b38019aae593a66c16f7e75d6e37d14eb8d2c42e
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
old mode 100755 new mode 100644
@@ -24,6 +24,7 @@
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataParentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -41,6 +42,9 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
@@ -75,8 +79,8 @@
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
//        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
//                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
@@ -97,6 +101,8 @@
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -104,12 +110,14 @@
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
@@ -203,48 +211,59 @@
                });
                break;
            // 接收设备实时数据
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
//                ThreadUtil.execute(() -> {
//                    try {
//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
//                        realTimeDataService.realTimeDataHandle(vo);
//                    } catch (Exception e) {
//                        e.printStackTrace();
//                    }
//                });
                break;
            // 接收设备实时数据-机台
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        synchronized (realTimeDataService) {
                            realTimeDataService.realTimeDataHandle(vo);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tenantId = realFaultMessage.getTentId();
//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
//                });
//                //故障数据
//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
//                //租户id
//                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
//                //转换为 Map<String, Object>
//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
//                        .collect(Collectors.toMap(
//                                Map.Entry::getKey,
//                                entry -> (Object) entry.getValue()
//                        ));
//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
//                //广播发送给各租户下移动设备
//                if (dryFaultMap.isEmpty()) {
//                    return;
//                }
//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
//                //数据转换
//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
//                //发送广播
//                System.err.println("广播给:" + recTopic);
//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
@@ -253,15 +272,13 @@
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }