baoshiwei
2025-06-19 be00ddc83f86599916eb8d0f581f448aa74c9d51
feat(mqtt): 增加对空调设备数据的处理

- 新增 MqttTopic 枚举类,定义 MQTT 主题常量
- 修改 MqttInboundConfig,订阅新的空调设备主题
- 更新 MqttMessageHandler,增加对不同主题的处理逻辑
- 新增 KtDataEntity 类,用于存储空调设备数据
- 修改 DataServiceImpl,支持写入空调设备数据
- 新增 KtDataMapper 类,用于将设备数据映射到 KtDataEntity
已添加3个文件
已修改4个文件
160 ■■■■■ 文件已修改
src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/mqtt/MqttTopic.java 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/handler/MqttMessageHandler.java 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/model/entity/KtDataEntity.java 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/service/IDataService.java 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/service/impl/DataServiceImpl.java 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/util/KtDataMapper.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
@@ -31,7 +31,7 @@
    // è®¢é˜…消息适配器
    @Bean
    public MqttPahoMessageDrivenChannelAdapter inboundAdapter() {
        return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic);
        return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic,MqttTopic.DEVICE_KT_UP);
    }
    // å®šä¹‰æ¶ˆæ¯å¤„理流
src/main/java/com/zhitan/config/mqtt/MqttTopic.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,25 @@
package com.zhitan.config.mqtt;
public enum MqttTopic {
    SERVICE_DOWN("lanbao/nygl/service/down");
    public final static String DEFAULT_TOPIC = "lanbao/nygl/device/up";
    public final static String DEVICE_KT_UP = "lanbao/nygl/device/kt/up";
    private String topic;
    MqttTopic(String topic) {
        this.topic = topic;
    }
    public String getTopic() {
        return topic;
    }
    public void setTopic(String topic) {
        this.topic = topic;
    }
}
src/main/java/com/zhitan/handler/MqttMessageHandler.java
@@ -1,10 +1,13 @@
package com.zhitan.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhitan.config.mqtt.MqttTopic;
import com.zhitan.model.entity.DeviceData;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.model.entity.KtDataEntity;
import com.zhitan.model.entity.PowerEntity;
import com.zhitan.service.IDataService;
import com.zhitan.util.KtDataMapper;
import com.zhitan.util.PowerDataMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.support.MqttHeaders;
@@ -31,14 +34,44 @@
        String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
        String payload = (String) message.getPayload();
        System.out.println("Received message from topic " + topic + ": " + payload);
//        dataService.writeTimeSeriesData(payload);
//
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            // å°† JSON å­—符串转换为 SensorData å¯¹è±¡
            //ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class);
            DeviceData data = objectMapper.readValue(payload, DeviceData.class);
            if (topic != null) {
                switch ( topic) {
                    case MqttTopic.DEFAULT_TOPIC:
                        handleDeviceUpMessage(data);
                        break;
                    case MqttTopic.DEVICE_KT_UP:
                        handleDeviceKtUpMessage(data);
                        break;
                    default:
                        log.error("Invalid topic: " + topic);
                }
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
    private void handleDeviceKtUpMessage(DeviceData data) {
        try {
            KtDataEntity ktDataEntity = KtDataMapper.mapToEntity(data);
            dataService.writeTimeSeriesData(ktDataEntity);
            // dataService.writeTimeSeriesData(electricPower)
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
    private void handleDeviceUpMessage(DeviceData data) {
        try {
            List<PowerEntity> powerMeters = PowerDataMapper.mapToEntities(data);
            for (PowerEntity powerMeter : powerMeters) {
                dataService.writeTimeSeriesData(powerMeter);
src/main/java/com/zhitan/model/entity/KtDataEntity.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,14 @@
package com.zhitan.model.entity;
import lombok.Data;
@Data
public class KtDataEntity {
    private String sn;
    private String imei;
    private Long time;
    private Double tmp;
    private Double hum;
    private Double ia;
}
src/main/java/com/zhitan/service/IDataService.java
@@ -1,6 +1,7 @@
package com.zhitan.service;
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.model.entity.KtDataEntity;
import com.zhitan.model.entity.PowerEntity;
import org.jetbrains.annotations.NotNull;
@@ -29,4 +30,6 @@
     * @param powerEntity å›ºå®šæ ¼å¼çš„æ•°æ®
     */
    void writeTimeSeriesData(@NotNull PowerEntity powerEntity);
    void writeTimeSeriesData(KtDataEntity ktDataEntity);
}
src/main/java/com/zhitan/service/impl/DataServiceImpl.java
@@ -11,6 +11,7 @@
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.influxdb.InfluxdbRepository;
import com.zhitan.mapper.CommonMapper;
import com.zhitan.model.entity.KtDataEntity;
import com.zhitan.model.entity.PowerEntity;
import com.zhitan.redis.RedisCache;
import com.zhitan.service.IDataService;
@@ -201,6 +202,54 @@
                }
            }
        }
//        repository.writePoints(points);
    }
    /**
     * å†™å…¥KT数据
     *
     * @param ktDataEntity å›ºå®šæ ¼å¼çš„æ•°æ®
     */
    @Override
    public void writeTimeSeriesData(KtDataEntity ktDataEntity) {
        List<IndexTemplate> templates = getIndexTemplate();
        // èŽ·å–ç±»ä¸­æ‰€æœ‰å£°æ˜Žçš„å­—æ®µ
        Field[] fields = ktDataEntity.getClass().getDeclaredFields();
        List<Point> points = new ArrayList<>();
        for (Field field : fields) {
            IndexTemplate indexTemplate = templates.stream().filter(template ->
                            field.getName().equalsIgnoreCase(template.getGatewayKey()))
                    .findFirst().orElse(null);
                    if (indexTemplate != null) {
                        Point point = Point
                                .measurement(influxdbConfig.getMeasurement())
                                .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode())
                                .time(Instant.now(), WritePrecision.S);
                        field.setAccessible(true);
                        if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
                            try {
                                // èŽ·å–å­—æ®µå€¼
                                Object o = field.get(ktDataEntity);
                                if (o!=null) {
                                    // å®‰å…¨ç±»åž‹è½¬æ¢
                                    if (o instanceof Number) {
                                        double value = ((Number) o).doubleValue();
                                        point.addField(FIELD_VALUE, value);
                                        // ä½¿ç”¨ value...
                                    } else {
                                        log.error("字段 {} ç±»åž‹éžæ³•: {}", field.getName(), o.getClass());
                                    }
                                }
                                points.add(point);
                            } catch (IllegalAccessException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
        }
        repository.writePoints(points);
    }
src/main/java/com/zhitan/util/KtDataMapper.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,28 @@
package com.zhitan.util;
import com.zhitan.model.entity.DeviceData;
import com.zhitan.model.entity.KtDataEntity;
public class KtDataMapper {
    public static KtDataEntity mapToEntity(DeviceData deviceData)
    {
        KtDataEntity ktDataEntity = new KtDataEntity();
        ktDataEntity.setSn(deviceData.getParams().getSysSn());
        ktDataEntity.setImei(deviceData.getParams().getSysImei());
        ktDataEntity.setTime(deviceData.getParams().getSysTime());
        deviceData.getParams().getRData().forEach(item -> {
            if ("tmp".equals(item.getName())) {
                ktDataEntity.setTmp(parseDouble(item.getValue()));
            } else if ("hum".equals(item.getName())) {
                ktDataEntity.setHum(parseDouble(item.getValue()));
            } else if ("I".equals(item.getName())) {
                ktDataEntity.setIa(parseDouble(item.getValue()));
            }
        });
        return ktDataEntity;
    }
    private static Double parseDouble(String s) {
        return s != null ? Double.parseDouble(s) : null;
    }
}