From be00ddc83f86599916eb8d0f581f448aa74c9d51 Mon Sep 17 00:00:00 2001
From: baoshiwei <baoshiwei@shlanbao.cn>
Date: 星期四, 19 六月 2025 08:55:57 +0800
Subject: [PATCH] feat(mqtt): 增加对空调设备数据的处理

---
 src/main/java/com/zhitan/config/mqtt/MqttTopic.java         |   25 ++++++++
 src/main/java/com/zhitan/util/KtDataMapper.java             |   28 +++++++++
 src/main/java/com/zhitan/model/entity/KtDataEntity.java     |   14 ++++
 src/main/java/com/zhitan/service/IDataService.java          |    3 +
 src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java |    2 
 src/main/java/com/zhitan/service/impl/DataServiceImpl.java  |   49 ++++++++++++++++
 src/main/java/com/zhitan/handler/MqttMessageHandler.java    |   39 ++++++++++++-
 7 files changed, 156 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java b/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
index 019c1d6..72c9277 100644
--- a/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
+++ b/src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
@@ -31,7 +31,7 @@
     // 璁㈤槄娑堟伅閫傞厤鍣�
     @Bean
     public MqttPahoMessageDrivenChannelAdapter inboundAdapter() {
-        return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic);
+        return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic,MqttTopic.DEVICE_KT_UP);
     }
 
     // 瀹氫箟娑堟伅澶勭悊娴�
diff --git a/src/main/java/com/zhitan/config/mqtt/MqttTopic.java b/src/main/java/com/zhitan/config/mqtt/MqttTopic.java
new file mode 100644
index 0000000..b5a48ca
--- /dev/null
+++ b/src/main/java/com/zhitan/config/mqtt/MqttTopic.java
@@ -0,0 +1,25 @@
+package com.zhitan.config.mqtt;
+
+public enum MqttTopic {
+    SERVICE_DOWN("lanbao/nygl/service/down");
+
+    public final static String DEFAULT_TOPIC = "lanbao/nygl/device/up";
+
+    public final static String DEVICE_KT_UP = "lanbao/nygl/device/kt/up";
+
+
+
+
+
+
+    private String topic;
+    MqttTopic(String topic) {
+        this.topic = topic;
+    }
+    public String getTopic() {
+        return topic;
+    }
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+}
diff --git a/src/main/java/com/zhitan/handler/MqttMessageHandler.java b/src/main/java/com/zhitan/handler/MqttMessageHandler.java
index 7913d74..d2f4ec0 100644
--- a/src/main/java/com/zhitan/handler/MqttMessageHandler.java
+++ b/src/main/java/com/zhitan/handler/MqttMessageHandler.java
@@ -1,10 +1,13 @@
 package com.zhitan.handler;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.zhitan.config.mqtt.MqttTopic;
 import com.zhitan.model.entity.DeviceData;
 import com.zhitan.model.entity.ElectricPower;
+import com.zhitan.model.entity.KtDataEntity;
 import com.zhitan.model.entity.PowerEntity;
 import com.zhitan.service.IDataService;
+import com.zhitan.util.KtDataMapper;
 import com.zhitan.util.PowerDataMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.integration.mqtt.support.MqttHeaders;
@@ -31,14 +34,44 @@
         String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
         String payload = (String) message.getPayload();
         System.out.println("Received message from topic " + topic + ": " + payload);
-
-//        dataService.writeTimeSeriesData(payload);
-//
         ObjectMapper objectMapper = new ObjectMapper();
         try {
             // 灏� JSON 瀛楃涓茶浆鎹负 SensorData 瀵硅薄
             //ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class);
             DeviceData data = objectMapper.readValue(payload, DeviceData.class);
+            if (topic != null) {
+                switch ( topic) {
+                    case MqttTopic.DEFAULT_TOPIC:
+                        handleDeviceUpMessage(data);
+                        break;
+                    case MqttTopic.DEVICE_KT_UP:
+                        handleDeviceKtUpMessage(data);
+                        break;
+                    default:
+                        log.error("Invalid topic: " + topic);
+                }
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
+
+
+
+    }
+
+    private void handleDeviceKtUpMessage(DeviceData data) {
+        try {
+            KtDataEntity ktDataEntity = KtDataMapper.mapToEntity(data);
+            dataService.writeTimeSeriesData(ktDataEntity);
+            // dataService.writeTimeSeriesData(electricPower)
+        } catch (Exception e) {
+            log.error(e.getMessage());
+        }
+
+    }
+
+    private void handleDeviceUpMessage(DeviceData data) {
+        try {
             List<PowerEntity> powerMeters = PowerDataMapper.mapToEntities(data);
             for (PowerEntity powerMeter : powerMeters) {
                 dataService.writeTimeSeriesData(powerMeter);
diff --git a/src/main/java/com/zhitan/model/entity/KtDataEntity.java b/src/main/java/com/zhitan/model/entity/KtDataEntity.java
new file mode 100644
index 0000000..a2e1521
--- /dev/null
+++ b/src/main/java/com/zhitan/model/entity/KtDataEntity.java
@@ -0,0 +1,14 @@
+package com.zhitan.model.entity;
+
+import lombok.Data;
+
+@Data
+public class KtDataEntity {
+
+    private String sn;
+    private String imei;
+    private Long time;
+    private Double tmp;
+    private Double hum;
+    private Double ia;
+}
diff --git a/src/main/java/com/zhitan/service/IDataService.java b/src/main/java/com/zhitan/service/IDataService.java
index 4edd3ac..ed8f886 100644
--- a/src/main/java/com/zhitan/service/IDataService.java
+++ b/src/main/java/com/zhitan/service/IDataService.java
@@ -1,6 +1,7 @@
 package com.zhitan.service;
 
 import com.zhitan.model.entity.ElectricPower;
+import com.zhitan.model.entity.KtDataEntity;
 import com.zhitan.model.entity.PowerEntity;
 import org.jetbrains.annotations.NotNull;
 
@@ -29,4 +30,6 @@
      * @param powerEntity 鍥哄畾鏍煎紡鐨勬暟鎹�
      */
     void writeTimeSeriesData(@NotNull PowerEntity powerEntity);
+
+    void writeTimeSeriesData(KtDataEntity ktDataEntity);
 }
diff --git a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java
index e977412..1b31f0d 100644
--- a/src/main/java/com/zhitan/service/impl/DataServiceImpl.java
+++ b/src/main/java/com/zhitan/service/impl/DataServiceImpl.java
@@ -11,6 +11,7 @@
 import com.zhitan.model.entity.ElectricPower;
 import com.zhitan.influxdb.InfluxdbRepository;
 import com.zhitan.mapper.CommonMapper;
+import com.zhitan.model.entity.KtDataEntity;
 import com.zhitan.model.entity.PowerEntity;
 import com.zhitan.redis.RedisCache;
 import com.zhitan.service.IDataService;
@@ -201,6 +202,54 @@
                 }
             }
         }
+//        repository.writePoints(points);
+    }
+
+
+    /**
+     * 鍐欏叆KT鏁版嵁
+     *
+     * @param ktDataEntity 鍥哄畾鏍煎紡鐨勬暟鎹�
+     */
+    @Override
+    public void writeTimeSeriesData(KtDataEntity ktDataEntity) {
+        List<IndexTemplate> templates = getIndexTemplate();
+        // 鑾峰彇绫讳腑鎵�鏈夊0鏄庣殑瀛楁
+        Field[] fields = ktDataEntity.getClass().getDeclaredFields();
+        List<Point> points = new ArrayList<>();
+        for (Field field : fields) {
+            IndexTemplate indexTemplate = templates.stream().filter(template ->
+                            field.getName().equalsIgnoreCase(template.getGatewayKey()))
+                    .findFirst().orElse(null);
+                    if (indexTemplate != null) {
+                        Point point = Point
+                                .measurement(influxdbConfig.getMeasurement())
+                                .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode())
+                                .time(Instant.now(), WritePrecision.S);
+                        field.setAccessible(true);
+                        if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
+                            try {
+                                // 鑾峰彇瀛楁鍊�
+                                Object o = field.get(ktDataEntity);
+                                if (o!=null) {
+                                    // 瀹夊叏绫诲瀷杞崲
+                                    if (o instanceof Number) {
+                                        double value = ((Number) o).doubleValue();
+                                        point.addField(FIELD_VALUE, value);
+                                        // 浣跨敤 value...
+                                    } else {
+                                        log.error("瀛楁 {} 绫诲瀷闈炴硶: {}", field.getName(), o.getClass());
+                                    }
+                                }
+                                points.add(point);
+                            } catch (IllegalAccessException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    }
+
+
+        }
         repository.writePoints(points);
     }
 
diff --git a/src/main/java/com/zhitan/util/KtDataMapper.java b/src/main/java/com/zhitan/util/KtDataMapper.java
new file mode 100644
index 0000000..8735928
--- /dev/null
+++ b/src/main/java/com/zhitan/util/KtDataMapper.java
@@ -0,0 +1,28 @@
+package com.zhitan.util;
+
+import com.zhitan.model.entity.DeviceData;
+import com.zhitan.model.entity.KtDataEntity;
+
+public class KtDataMapper {
+    public static KtDataEntity mapToEntity(DeviceData deviceData)
+    {
+        KtDataEntity ktDataEntity = new KtDataEntity();
+        ktDataEntity.setSn(deviceData.getParams().getSysSn());
+        ktDataEntity.setImei(deviceData.getParams().getSysImei());
+        ktDataEntity.setTime(deviceData.getParams().getSysTime());
+        deviceData.getParams().getRData().forEach(item -> {
+            if ("tmp".equals(item.getName())) {
+                ktDataEntity.setTmp(parseDouble(item.getValue()));
+            } else if ("hum".equals(item.getName())) {
+                ktDataEntity.setHum(parseDouble(item.getValue()));
+            } else if ("I".equals(item.getName())) {
+                ktDataEntity.setIa(parseDouble(item.getValue()));
+            }
+        });
+        return ktDataEntity;
+    }
+
+    private static Double parseDouble(String s) {
+        return s != null ? Double.parseDouble(s) : null;
+    }
+}

--
Gitblit v1.9.3