ustcyc
2025-01-07 de5d55508afd27fb2b47e6d4d6fd9984525c222c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.zhitan.framework.mqtt;
 
import com.zhitan.realtimedata.data.influxdb.InfluxDBRepository;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
@Component
@Slf4j
public class MqttMessageCallback implements MqttCallback {
 
    @Autowired
    private InfluxDBRepository repository;
    /**
     * 链接丢失时处理
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        //可以做重连 或者 其他业务处理
    }
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收到消息具体信息---->{}",new String(mqttMessage.getPayload()));
//        final String msg = new String(mqttMessage.getPayload());
//        JSONObject jsonObject = JSON.parseObject(msg);
//        final String values = jsonObject.getString("values");
//        final List<EletricData> eletricData = JSON.parseArray(values, EletricData.class);
//        List<TagValue> tagValueList = new ArrayList<>();
//        //结合业务 编写具体信息即可
//        eletricData.forEach(ele->{
//            final String key = ele.getKey();
//            TagValue tagValue = new TagValue();
//            tagValue.setDataTime(new DateTime(ele.getTime()));
//            tagValue.setValue(ele.getVaule());
//            tagValue.setTagCode(key.substring(key.lastIndexOf(".") + 1));
//            tagValueList.add(tagValue);
//        });
//        repository.store(tagValueList);
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}