From be00ddc83f86599916eb8d0f581f448aa74c9d51 Mon Sep 17 00:00:00 2001 From: baoshiwei <baoshiwei@shlanbao.cn> Date: 星期四, 19 六月 2025 08:55:57 +0800 Subject: [PATCH] feat(mqtt): 增加对空调设备数据的处理 --- src/main/java/com/zhitan/config/mqtt/MqttTopic.java | 25 ++++++++ src/main/java/com/zhitan/util/KtDataMapper.java | 28 +++++++++ src/main/java/com/zhitan/model/entity/KtDataEntity.java | 14 ++++ src/main/java/com/zhitan/service/IDataService.java | 3 + src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java | 2 src/main/java/com/zhitan/service/impl/DataServiceImpl.java | 49 ++++++++++++++++ src/main/java/com/zhitan/handler/MqttMessageHandler.java | 39 ++++++++++++- 7 files changed, 156 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java b/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java index 019c1d6..72c9277 100644 --- a/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java +++ b/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java @@ -31,7 +31,7 @@ // 璁㈤槄娑堟伅閫傞厤鍣� @Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter() { - return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic); + return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic,MqttTopic.DEVICE_KT_UP); } // 瀹氫箟娑堟伅澶勭悊娴� diff --git a/src/main/java/com/zhitan/config/mqtt/MqttTopic.java b/src/main/java/com/zhitan/config/mqtt/MqttTopic.java new file mode 100644 index 0000000..b5a48ca --- /dev/null +++ b/src/main/java/com/zhitan/config/mqtt/MqttTopic.java @@ -0,0 +1,25 @@ +package com.zhitan.config.mqtt; + +public enum MqttTopic { + SERVICE_DOWN("lanbao/nygl/service/down"); + + public final static String DEFAULT_TOPIC = "lanbao/nygl/device/up"; + + public final static String DEVICE_KT_UP = "lanbao/nygl/device/kt/up"; + + + + + + + private String topic; + MqttTopic(String topic) { + this.topic = topic; + } + public String getTopic() { + return topic; + } + public void setTopic(String topic) { + this.topic = topic; + } +} diff --git a/src/main/java/com/zhitan/handler/MqttMessageHandler.java b/src/main/java/com/zhitan/handler/MqttMessageHandler.java index 7913d74..d2f4ec0 100644 --- a/src/main/java/com/zhitan/handler/MqttMessageHandler.java +++ b/src/main/java/com/zhitan/handler/MqttMessageHandler.java @@ -1,10 +1,13 @@ package com.zhitan.handler; import com.fasterxml.jackson.databind.ObjectMapper; +import com.zhitan.config.mqtt.MqttTopic; import com.zhitan.model.entity.DeviceData; import com.zhitan.model.entity.ElectricPower; +import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import com.zhitan.service.IDataService; +import com.zhitan.util.KtDataMapper; import com.zhitan.util.PowerDataMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.mqtt.support.MqttHeaders; @@ -31,14 +34,44 @@ String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); String payload = (String) message.getPayload(); System.out.println("Received message from topic " + topic + ": " + payload); - -// dataService.writeTimeSeriesData(payload); -// ObjectMapper objectMapper = new ObjectMapper(); try { // 灏� JSON 瀛楃涓茶浆鎹负 SensorData 瀵硅薄 //ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class); DeviceData data = objectMapper.readValue(payload, DeviceData.class); + if (topic != null) { + switch ( topic) { + case MqttTopic.DEFAULT_TOPIC: + handleDeviceUpMessage(data); + break; + case MqttTopic.DEVICE_KT_UP: + handleDeviceKtUpMessage(data); + break; + default: + log.error("Invalid topic: " + topic); + } + } + } catch (Exception e) { + log.error(e.getMessage()); + } + + + + } + + private void handleDeviceKtUpMessage(DeviceData data) { + try { + KtDataEntity ktDataEntity = KtDataMapper.mapToEntity(data); + dataService.writeTimeSeriesData(ktDataEntity); + // dataService.writeTimeSeriesData(electricPower) + } catch (Exception e) { + log.error(e.getMessage()); + } + + } + + private void handleDeviceUpMessage(DeviceData data) { + try { List<PowerEntity> powerMeters = PowerDataMapper.mapToEntities(data); for (PowerEntity powerMeter : powerMeters) { dataService.writeTimeSeriesData(powerMeter); diff --git a/src/main/java/com/zhitan/model/entity/KtDataEntity.java b/src/main/java/com/zhitan/model/entity/KtDataEntity.java new file mode 100644 index 0000000..a2e1521 --- /dev/null +++ b/src/main/java/com/zhitan/model/entity/KtDataEntity.java @@ -0,0 +1,14 @@ +package com.zhitan.model.entity; + +import lombok.Data; + +@Data +public class KtDataEntity { + + private String sn; + private String imei; + private Long time; + private Double tmp; + private Double hum; + private Double ia; +} diff --git a/src/main/java/com/zhitan/service/IDataService.java b/src/main/java/com/zhitan/service/IDataService.java index 4edd3ac..ed8f886 100644 --- a/src/main/java/com/zhitan/service/IDataService.java +++ b/src/main/java/com/zhitan/service/IDataService.java @@ -1,6 +1,7 @@ package com.zhitan.service; import com.zhitan.model.entity.ElectricPower; +import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import org.jetbrains.annotations.NotNull; @@ -29,4 +30,6 @@ * @param powerEntity 鍥哄畾鏍煎紡鐨勬暟鎹� */ void writeTimeSeriesData(@NotNull PowerEntity powerEntity); + + void writeTimeSeriesData(KtDataEntity ktDataEntity); } diff --git a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java index e977412..1b31f0d 100644 --- a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java +++ b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java @@ -11,6 +11,7 @@ import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; +import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import com.zhitan.redis.RedisCache; import com.zhitan.service.IDataService; @@ -201,6 +202,54 @@ } } } +// repository.writePoints(points); + } + + + /** + * 鍐欏叆KT鏁版嵁 + * + * @param ktDataEntity 鍥哄畾鏍煎紡鐨勬暟鎹� + */ + @Override + public void writeTimeSeriesData(KtDataEntity ktDataEntity) { + List<IndexTemplate> templates = getIndexTemplate(); + // 鑾峰彇绫讳腑鎵�鏈夊0鏄庣殑瀛楁 + Field[] fields = ktDataEntity.getClass().getDeclaredFields(); + List<Point> points = new ArrayList<>(); + for (Field field : fields) { + IndexTemplate indexTemplate = templates.stream().filter(template -> + field.getName().equalsIgnoreCase(template.getGatewayKey())) + .findFirst().orElse(null); + if (indexTemplate != null) { + Point point = Point + .measurement(influxdbConfig.getMeasurement()) + .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode()) + .time(Instant.now(), WritePrecision.S); + field.setAccessible(true); + if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { + try { + // 鑾峰彇瀛楁鍊� + Object o = field.get(ktDataEntity); + if (o!=null) { + // 瀹夊叏绫诲瀷杞崲 + if (o instanceof Number) { + double value = ((Number) o).doubleValue(); + point.addField(FIELD_VALUE, value); + // 浣跨敤 value... + } else { + log.error("瀛楁 {} 绫诲瀷闈炴硶: {}", field.getName(), o.getClass()); + } + } + points.add(point); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + + + } repository.writePoints(points); } diff --git a/src/main/java/com/zhitan/util/KtDataMapper.java b/src/main/java/com/zhitan/util/KtDataMapper.java new file mode 100644 index 0000000..8735928 --- /dev/null +++ b/src/main/java/com/zhitan/util/KtDataMapper.java @@ -0,0 +1,28 @@ +package com.zhitan.util; + +import com.zhitan.model.entity.DeviceData; +import com.zhitan.model.entity.KtDataEntity; + +public class KtDataMapper { + public static KtDataEntity mapToEntity(DeviceData deviceData) + { + KtDataEntity ktDataEntity = new KtDataEntity(); + ktDataEntity.setSn(deviceData.getParams().getSysSn()); + ktDataEntity.setImei(deviceData.getParams().getSysImei()); + ktDataEntity.setTime(deviceData.getParams().getSysTime()); + deviceData.getParams().getRData().forEach(item -> { + if ("tmp".equals(item.getName())) { + ktDataEntity.setTmp(parseDouble(item.getValue())); + } else if ("hum".equals(item.getName())) { + ktDataEntity.setHum(parseDouble(item.getValue())); + } else if ("I".equals(item.getName())) { + ktDataEntity.setIa(parseDouble(item.getValue())); + } + }); + return ktDataEntity; + } + + private static Double parseDouble(String s) { + return s != null ? Double.parseDouble(s) : null; + } +} -- Gitblit v1.9.3