package com.zhitan.framework.mqtt; import cn.hutool.core.date.DateTime; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.zhitan.framework.mqtt.domain.EletricData; import com.zhitan.realtimedata.data.influxdb.InfluxDBRepository; import com.zhitan.realtimedata.domain.TagValue; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component @Slf4j public class MqttMessageCallback implements MqttCallback { private ExecutorService executorService = Executors.newFixedThreadPool(10); // 创建一个线程池 @Autowired private InfluxDBRepository repository; /** * 链接丢失时处理 * @param throwable */ @Override public void connectionLost(Throwable throwable) { //可以做重连 或者 其他业务处理 } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { executorService.submit(new Runnable() { @Override public void run() { final String msg = new String(mqttMessage.getPayload()); JSONObject jsonObject = JSON.parseObject(msg); final String values = jsonObject.getString("values"); final List eletricData = JSON.parseArray(values, EletricData.class); List tagValueList = new ArrayList<>(); //EMS.HB.High.9_42302035834_Voltagec //9 是配电箱,42302035834是电表号 //结合业务 编写具体信息即可 eletricData.forEach(ele->{ final String key = ele.getKey(); TagValue tagValue = new TagValue(); tagValue.setDataTime(new DateTime(ele.getTime())); tagValue.setValue(ele.getVaule()); final String eNoIndex = key.substring(key.indexOf("_") + 1); String tagValueString = formateTag(eNoIndex); tagValue.setTagCode(tagValueString); tagValueList.add(tagValue); }); repository.store(tagValueList); } }); // } //将电表号和指标 42302035834_Voltagec 转换成我们系统的 private String formateTag(String tag) { final String[] tags = tag.split("_"); String index = tags[1]; String tagValue = tags[0] + "_"; switch (index){ case "PowFactorT": //功率因数 tagValue += "PowFactorT"; break; case "Imp": //正向电能 tagValue += "ActiveZT"; break; // case "spare": // // tagValue += ""; // break; // case "CurrentD": // // tagValue += "CurrentD"; // break; // case "EXP": //反向电能 // tagValue += ""; // break; // case "Total": //电能总和 // tagValue += ""; // break; default: tagValue = tag; } return tagValue; } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }