干燥机配套车间生产管理系统/云平台服务端
baoshiwei
2024-12-11 7c585586e9bea943161676bd9d127e81123891c3
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -216,13 +216,14 @@
                break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
                //租户id
                String tentId = realFaultMessage.getTentId();
                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
@@ -235,16 +236,46 @@
                if(dryFaultMap.isEmpty()){
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
                break;
            // 接收设备报警数据
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
                String tenantId = (String) messageJson.get("tenantId");
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
                //发送请求设备
                System.err.println("发送给:" + resTopic);
                sendMqttMessage(resTopic, mqMessage, 2);
            }
            break;
            // 接收设备报警历史数据
            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                ThreadUtil.execute(() -> {
                    try {
@@ -372,20 +403,28 @@
        }
    }
    /**
     * 发送消息
     *
     * @param topic       订阅
     * @param mqMessage   消息体
     * @param type      1-发送给租户   2-发送给固定id
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage){
    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
        ThreadUtil.execute(() -> {
            try {
                if (type == 1) {
                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                sendMessage.setQos(0);
                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
                } else if (type == 2) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(topic, sendMessage);
                }
            }catch (Exception e){
                e.printStackTrace();
            }