From 5fd51c437819f1c9d027a936db4ba2ee7cd2e053 Mon Sep 17 00:00:00 2001 From: ustcyc <yincun@163.com> Date: 星期二, 07 一月 2025 15:02:58 +0800 Subject: [PATCH] 升级架构 --- zhitan-framework/src/main/java/com/zhitan/framework/mqtt/MqttMessageCallback.java | 89 ++++++++++++++++++++++++++++++++++++-------- 1 files changed, 72 insertions(+), 17 deletions(-) diff --git a/zhitan-framework/src/main/java/com/zhitan/framework/mqtt/MqttMessageCallback.java b/zhitan-framework/src/main/java/com/zhitan/framework/mqtt/MqttMessageCallback.java index 7f64714..e3c5e61 100644 --- a/zhitan-framework/src/main/java/com/zhitan/framework/mqtt/MqttMessageCallback.java +++ b/zhitan-framework/src/main/java/com/zhitan/framework/mqtt/MqttMessageCallback.java @@ -1,6 +1,11 @@ package com.zhitan.framework.mqtt; +import cn.hutool.core.date.DateTime; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.zhitan.framework.mqtt.domain.EletricData; import com.zhitan.realtimedata.data.influxdb.InfluxDBRepository; +import com.zhitan.realtimedata.domain.TagValue; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -8,9 +13,16 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + @Component @Slf4j public class MqttMessageCallback implements MqttCallback { + + private ExecutorService executorService = Executors.newFixedThreadPool(10); // 鍒涘缓涓�涓嚎绋嬫睜 @Autowired private InfluxDBRepository repository; @@ -24,24 +36,67 @@ } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - log.info("鎺ユ敹鍒版秷鎭叿浣撲俊鎭�---->{}",new String(mqttMessage.getPayload())); -// final String msg = new String(mqttMessage.getPayload()); -// JSONObject jsonObject = JSON.parseObject(msg); -// final String values = jsonObject.getString("values"); -// final List<EletricData> eletricData = JSON.parseArray(values, EletricData.class); -// List<TagValue> tagValueList = new ArrayList<>(); -// //缁撳悎涓氬姟 缂栧啓鍏蜂綋淇℃伅鍗冲彲 -// eletricData.forEach(ele->{ -// final String key = ele.getKey(); -// TagValue tagValue = new TagValue(); -// tagValue.setDataTime(new DateTime(ele.getTime())); -// tagValue.setValue(ele.getVaule()); -// tagValue.setTagCode(key.substring(key.lastIndexOf(".") + 1)); -// tagValueList.add(tagValue); -// }); -// repository.store(tagValueList); + + executorService.submit(new Runnable() { + @Override + public void run() { + final String msg = new String(mqttMessage.getPayload()); + JSONObject jsonObject = JSON.parseObject(msg); + final String values = jsonObject.getString("values"); + final List<EletricData> eletricData = JSON.parseArray(values, EletricData.class); + List<TagValue> tagValueList = new ArrayList<>(); + + //EMS.HB.High.9_42302035834_Voltagec + //9 鏄厤鐢电锛�42302035834鏄數琛ㄥ彿 + //缁撳悎涓氬姟 缂栧啓鍏蜂綋淇℃伅鍗冲彲 + eletricData.forEach(ele->{ + final String key = ele.getKey(); + TagValue tagValue = new TagValue(); + tagValue.setDataTime(new DateTime(ele.getTime())); + tagValue.setValue(ele.getVaule()); + final String eNoIndex = key.substring(key.indexOf("_") + 1); + String tagValueString = formateTag(eNoIndex); + tagValue.setTagCode(tagValueString); + tagValueList.add(tagValue); + }); + repository.store(tagValueList); + } + }); + +// } + + //灏嗙數琛ㄥ彿鍜屾寚鏍� 42302035834_Voltagec 杞崲鎴愭垜浠郴缁熺殑 + private String formateTag(String tag) { + final String[] tags = tag.split("_"); + String index = tags[1]; + String tagValue = tags[0] + "_"; + switch (index){ + case "PowFactorT": //鍔熺巼鍥犳暟 + tagValue += "PowFactorT"; + break; + case "Imp": //姝e悜鐢佃兘 + tagValue += "ActiveZT"; + break; +// case "spare": // +// tagValue += ""; +// break; +// case "CurrentD": // +// tagValue += "CurrentD"; +// break; +// case "EXP": //鍙嶅悜鐢佃兘 +// tagValue += ""; +// break; +// case "Total": //鐢佃兘鎬诲拰 +// tagValue += ""; +// break; + default: + tagValue = tag; + } + return tagValue; + } + @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } -} +} \ No newline at end of file -- Gitblit v1.9.3