| | |
| | | |
| | | break; |
| | | //各租户上传的实时报警数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: |
| | | case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { |
| | | |
| | | MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { |
| | | }); |
| | | //故障数据 |
| | | Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); |
| | | //租户id |
| | | String tentId = realFaultMessage.getTentId(); |
| | | String tenantId = realFaultMessage.getTentId(); |
| | | //收到租户实时报警数据存入redis |
| | | //转换为 Map<String, Object> |
| | | Map<String, Object> objectMap = dryFaultMap.entrySet().stream() |
| | |
| | | if(dryFaultMap.isEmpty()){ |
| | | return; |
| | | } |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | //发送广播 |
| | | System.err.println("广播给:" + recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | |
| | | |
| | | } |
| | | break; |
| | | // 接收设备报警数据 |
| | | //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据) |
| | | case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: { |
| | | String tenantId = (String) messageJson.get("tenantId"); |
| | | if (req.toString().isEmpty() || tenantId == null) { |
| | | return; |
| | | } |
| | | Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); |
| | | //转换为 Map<String, DryFaultRecordVo> |
| | | Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), |
| | | entry -> (DryFaultRecordVo) entry.getValue() |
| | | )); |
| | | |
| | | |
| | | if (dryFaultMap.isEmpty()) { |
| | | return; |
| | | } |
| | | String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic); |
| | | //发送请求设备 |
| | | System.err.println("发送给:" + resTopic); |
| | | sendMqttMessage(resTopic, mqMessage, 2); |
| | | |
| | | } |
| | | break; |
| | | // 接收设备报警历史数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送消息 |
| | | * |
| | | * @param topic 订阅 |
| | | * @param mqMessage 消息体 |
| | | * @param type 1-发送给租户 2-发送给固定id |
| | | */ |
| | | private void sendMqttMessage(String topic, MqMessage mqMessage){ |
| | | private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | if (type == 1) { |
| | | MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); |
| | | } else if (type == 2) { |
| | | MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(topic, sendMessage); |
| | | } |
| | | |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |