| | |
| | | public class MqttSampleCallback implements MqttCallback { |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | |
| | | |
| | | |
| | | @Autowired |
| | | private MqttUtil mqttUtil; |
| | | @Autowired |
| | |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage mqttMessage) { |
| | | System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" |
| | | + new String(mqttMessage.getPayload())); |
| | | // System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" |
| | | // + new String(mqttMessage.getPayload())); |
| | | |
| | | switch (role) { |
| | | // 管理员 |
| | |
| | | //clientid |
| | | String clientid = messageJson.getString("clientid"); |
| | | item.put("clientid", clientid); |
| | | // 不符合的设备不进行管理 |
| | | if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; |
| | | //是否连接 |
| | | item.put("connected", true); |
| | | //根据clientid解析(注意配置文件中clientid格式 例:client-1000) |
| | |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | //item.put("code", info[2]); |
| | | item.put("code", info[2]); |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); |
| | | System.err.println(String.format("设备: %s上线", clientid)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); |
| | | System.err.println(String.format("设备: %s上线", clientid)); |
| | | |
| | | |
| | | } |
| | | |
| | | } |
| | |
| | | }); |
| | | break; |
| | | |
| | | // 接收设备实时数据 |
| | | // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP |
| | | case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | // ThreadUtil.execute(() -> { |
| | | // try { |
| | | // RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); |
| | | // realTimeDataService.realTimeDataHandle(vo); |
| | | // } catch (Exception e) { |
| | | // e.printStackTrace(); |
| | | // } |
| | | // }); |
| | | |
| | | break; |
| | | // 接收设备实时数据-机台 |
| | |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | synchronized (realTimeDataService) { |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | break; |
| | | //各租户上传的实时报警数据 |
| | | //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP |
| | | case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { |
| | | |
| | | MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { |
| | | }); |
| | | //故障数据 |
| | | Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); |
| | | //租户id |
| | | String tenantId = realFaultMessage.getTentId(); |
| | | // MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { |
| | | // }); |
| | | // //故障数据 |
| | | // Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); |
| | | // //租户id |
| | | // String tenantId = realFaultMessage.getTentId(); |
| | | //收到租户实时报警数据存入redis |
| | | //转换为 Map<String, Object> |
| | | Map<String, Object> objectMap = dryFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | Map.Entry::getKey, |
| | | entry -> (Object) entry.getValue() |
| | | )); |
| | | redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); |
| | | //广播发送给各租户下移动设备 |
| | | if (dryFaultMap.isEmpty()) { |
| | | return; |
| | | } |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | //发送广播 |
| | | System.err.println("广播给:" + recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | // //转换为 Map<String, Object> |
| | | // Map<String, Object> objectMap = dryFaultMap.entrySet().stream() |
| | | // .collect(Collectors.toMap( |
| | | // Map.Entry::getKey, |
| | | // entry -> (Object) entry.getValue() |
| | | // )); |
| | | // redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); |
| | | // //广播发送给各租户下移动设备 |
| | | // if (dryFaultMap.isEmpty()) { |
| | | // return; |
| | | // } |
| | | // String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | // //数据转换 |
| | | // List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | // MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | // //发送广播 |
| | | // System.err.println("广播给:" + recTopic); |
| | | // sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | } |
| | | break; |
| | | //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据) |
| | |
| | | if (req.toString().isEmpty() || tenantId == null) { |
| | | return; |
| | | } |
| | | Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); |
| | | Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); |
| | | //转换为 Map<String, DryFaultRecordVo> |
| | | Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |