干燥机配套车间生产管理系统/云平台服务端
baoshiwei
2024-12-11 7c585586e9bea943161676bd9d127e81123891c3
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -85,7 +85,7 @@
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
                        //username
@@ -108,7 +108,7 @@
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
@@ -116,7 +116,7 @@
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                    } catch (Exception e) {
                        e.printStackTrace();
@@ -163,7 +163,7 @@
        }
        // 实时数据上传太频繁且数据内容超过字段大小不记录日志
        if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
            // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
        }
        switch (topic) {
@@ -172,7 +172,7 @@
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
                ThreadUtil.execute(() -> {
@@ -216,13 +216,14 @@
                break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tentId = realFaultMessage.getTentId();
                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
@@ -230,21 +231,51 @@
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if(dryFaultMap.isEmpty()){
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
                break;
            // 接收设备报警数据
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
                String tenantId = (String) messageJson.get("tenantId");
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
                //发送请求设备
                System.err.println("发送给:" + resTopic);
                sendMqttMessage(resTopic, mqMessage, 2);
            }
            break;
            // 接收设备报警历史数据
            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                ThreadUtil.execute(() -> {
                    try {
@@ -372,21 +403,29 @@
        }
    }
    /**
     * 发送消息
     * @param topic       订阅
     * @param mqMessage   消息体
     *
     * @param topic     订阅
     * @param mqMessage 消息体
     * @param type      1-发送给租户   2-发送给固定id
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage){
    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
        ThreadUtil.execute(() -> {
            try {
                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                sendMessage.setQos(0);
                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
            }catch (Exception e){
                if (type == 1) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
                } else if (type == 2) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(topic, sendMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });