干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2 天以前 b38019aae593a66c16f7e75d6e37d14eb8d2c42e
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -24,6 +24,7 @@
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataParentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -41,6 +42,9 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
@@ -75,8 +79,8 @@
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
//        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
//                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
@@ -85,7 +89,7 @@
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
                        //username
@@ -97,6 +101,8 @@
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -104,19 +110,21 @@
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                    } catch (Exception e) {
                        e.printStackTrace();
@@ -163,7 +171,7 @@
        }
        // 实时数据上传太频繁且数据内容超过字段大小不记录日志
        if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
            // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
        }
        switch (topic) {
@@ -172,7 +180,7 @@
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
                ThreadUtil.execute(() -> {
@@ -203,48 +211,88 @@
                });
                break;
            // 接收设备实时数据
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
//                ThreadUtil.execute(() -> {
//                    try {
//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
//                        realTimeDataService.realTimeDataHandle(vo);
//                    } catch (Exception e) {
//                        e.printStackTrace();
//                    }
//                });
                break;
            // 接收设备实时数据-机台
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        synchronized (realTimeDataService) {
                            realTimeDataService.realTimeDataHandle(vo);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
                //租户id
                String tentId = realFaultMessage.getTentId();
            //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
//                });
//                //故障数据
//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
//                //租户id
//                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if(dryFaultMap.isEmpty()){
//                //转换为 Map<String, Object>
//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
//                        .collect(Collectors.toMap(
//                                Map.Entry::getKey,
//                                entry -> (Object) entry.getValue()
//                        ));
//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
//                //广播发送给各租户下移动设备
//                if (dryFaultMap.isEmpty()) {
//                    return;
//                }
//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
//                //数据转换
//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
//                //发送广播
//                System.err.println("广播给:" + recTopic);
//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
                String tenantId = (String) messageJson.get("tenantId");
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
                //发送请求设备
                System.err.println("发送给:" + resTopic);
                sendMqttMessage(resTopic, mqMessage, 2);
                break;
            // 接收设备报警数据
            }
            break;
            // 接收设备报警历史数据
            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                ThreadUtil.execute(() -> {
                    try {
@@ -372,21 +420,29 @@
        }
    }
    /**
     * 发送消息
     * @param topic       订阅
     * @param mqMessage   消息体
     *
     * @param topic     订阅
     * @param mqMessage 消息体
     * @param type      1-发送给租户   2-发送给固定id
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage){
    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
        ThreadUtil.execute(() -> {
            try {
                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                sendMessage.setQos(0);
                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
            }catch (Exception e){
                if (type == 1) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
                } else if (type == 2) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(topic, sendMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });