package com.zhitan.framework.mqtt; import com.zhitan.realtimedata.data.influxdb.InfluxDBRepository; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class MqttMessageCallback implements MqttCallback { @Autowired private InfluxDBRepository repository; /** * 链接丢失时处理 * @param throwable */ @Override public void connectionLost(Throwable throwable) { //可以做重连 或者 其他业务处理 } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload())); // final String msg = new String(mqttMessage.getPayload()); // JSONObject jsonObject = JSON.parseObject(msg); // final String values = jsonObject.getString("values"); // final List eletricData = JSON.parseArray(values, EletricData.class); // List tagValueList = new ArrayList<>(); // //结合业务 编写具体信息即可 // eletricData.forEach(ele->{ // final String key = ele.getKey(); // TagValue tagValue = new TagValue(); // tagValue.setDataTime(new DateTime(ele.getTime())); // tagValue.setValue(ele.getVaule()); // tagValue.setTagCode(key.substring(key.lastIndexOf(".") + 1)); // tagValueList.add(tagValue); // }); // repository.store(tagValueList); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }