src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/config/mqtt/MqttTopic.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/handler/MqttMessageHandler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/model/entity/KtDataEntity.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/service/IDataService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/service/impl/DataServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/util/KtDataMapper.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
src/main/java/com/zhitan/config/mqtt/MqttInboundConfig.java
@@ -31,7 +31,7 @@ // è®¢é æ¶æ¯éé å¨ @Bean public MqttPahoMessageDrivenChannelAdapter inboundAdapter() { return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic); return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic,MqttTopic.DEVICE_KT_UP); } // å®ä¹æ¶æ¯å¤çæµ src/main/java/com/zhitan/config/mqtt/MqttTopic.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,25 @@ package com.zhitan.config.mqtt; public enum MqttTopic { SERVICE_DOWN("lanbao/nygl/service/down"); public final static String DEFAULT_TOPIC = "lanbao/nygl/device/up"; public final static String DEVICE_KT_UP = "lanbao/nygl/device/kt/up"; private String topic; MqttTopic(String topic) { this.topic = topic; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } } src/main/java/com/zhitan/handler/MqttMessageHandler.java
@@ -1,10 +1,13 @@ package com.zhitan.handler; import com.fasterxml.jackson.databind.ObjectMapper; import com.zhitan.config.mqtt.MqttTopic; import com.zhitan.model.entity.DeviceData; import com.zhitan.model.entity.ElectricPower; import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import com.zhitan.service.IDataService; import com.zhitan.util.KtDataMapper; import com.zhitan.util.PowerDataMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.integration.mqtt.support.MqttHeaders; @@ -31,14 +34,44 @@ String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); String payload = (String) message.getPayload(); System.out.println("Received message from topic " + topic + ": " + payload); // dataService.writeTimeSeriesData(payload); // ObjectMapper objectMapper = new ObjectMapper(); try { // å° JSON å符串转æ¢ä¸º SensorData 对象 //ElectricPower electricPower = objectMapper.readValue(payload, ElectricPower.class); DeviceData data = objectMapper.readValue(payload, DeviceData.class); if (topic != null) { switch ( topic) { case MqttTopic.DEFAULT_TOPIC: handleDeviceUpMessage(data); break; case MqttTopic.DEVICE_KT_UP: handleDeviceKtUpMessage(data); break; default: log.error("Invalid topic: " + topic); } } } catch (Exception e) { log.error(e.getMessage()); } } private void handleDeviceKtUpMessage(DeviceData data) { try { KtDataEntity ktDataEntity = KtDataMapper.mapToEntity(data); dataService.writeTimeSeriesData(ktDataEntity); // dataService.writeTimeSeriesData(electricPower) } catch (Exception e) { log.error(e.getMessage()); } } private void handleDeviceUpMessage(DeviceData data) { try { List<PowerEntity> powerMeters = PowerDataMapper.mapToEntities(data); for (PowerEntity powerMeter : powerMeters) { dataService.writeTimeSeriesData(powerMeter); src/main/java/com/zhitan/model/entity/KtDataEntity.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,14 @@ package com.zhitan.model.entity; import lombok.Data; @Data public class KtDataEntity { private String sn; private String imei; private Long time; private Double tmp; private Double hum; private Double ia; } src/main/java/com/zhitan/service/IDataService.java
@@ -1,6 +1,7 @@ package com.zhitan.service; import com.zhitan.model.entity.ElectricPower; import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import org.jetbrains.annotations.NotNull; @@ -29,4 +30,6 @@ * @param powerEntity åºå®æ ¼å¼çæ°æ® */ void writeTimeSeriesData(@NotNull PowerEntity powerEntity); void writeTimeSeriesData(KtDataEntity ktDataEntity); } src/main/java/com/zhitan/service/impl/DataServiceImpl.java
@@ -11,6 +11,7 @@ import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import com.zhitan.redis.RedisCache; import com.zhitan.service.IDataService; @@ -201,6 +202,54 @@ } } } // repository.writePoints(points); } /** * åå ¥KTæ°æ® * * @param ktDataEntity åºå®æ ¼å¼çæ°æ® */ @Override public void writeTimeSeriesData(KtDataEntity ktDataEntity) { List<IndexTemplate> templates = getIndexTemplate(); // è·åç±»ä¸ææå£°æçåæ®µ Field[] fields = ktDataEntity.getClass().getDeclaredFields(); List<Point> points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // è·ååæ®µå¼ Object o = field.get(ktDataEntity); if (o!=null) { // å®å ¨ç±»åè½¬æ¢ if (o instanceof Number) { double value = ((Number) o).doubleValue(); point.addField(FIELD_VALUE, value); // ä½¿ç¨ value... } else { log.error("åæ®µ {} ç±»åéæ³: {}", field.getName(), o.getClass()); } } points.add(point); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } } } repository.writePoints(points); } src/main/java/com/zhitan/util/KtDataMapper.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,28 @@ package com.zhitan.util; import com.zhitan.model.entity.DeviceData; import com.zhitan.model.entity.KtDataEntity; public class KtDataMapper { public static KtDataEntity mapToEntity(DeviceData deviceData) { KtDataEntity ktDataEntity = new KtDataEntity(); ktDataEntity.setSn(deviceData.getParams().getSysSn()); ktDataEntity.setImei(deviceData.getParams().getSysImei()); ktDataEntity.setTime(deviceData.getParams().getSysTime()); deviceData.getParams().getRData().forEach(item -> { if ("tmp".equals(item.getName())) { ktDataEntity.setTmp(parseDouble(item.getValue())); } else if ("hum".equals(item.getName())) { ktDataEntity.setHum(parseDouble(item.getValue())); } else if ("I".equals(item.getName())) { ktDataEntity.setIa(parseDouble(item.getValue())); } }); return ktDataEntity; } private static Double parseDouble(String s) { return s != null ? Double.parseDouble(s) : null; } }