liulingling.177216
2024-08-26 349f1cfc5fa77fbc636d542df0d8050fddec48c2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.dingzhuo.energy.project.conglomeratepush.mqtt;
 
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
 
/**
 * MQTT工具类操作
 *
 * @author zhw
 * @since 2022/04/05
 */
@Slf4j
@Component
public class MQTTConnect {
 
  @Value("${mqtt.host}")
  private String HOST;
  private final String clientId = "ClientHBT" + (int) (Math.random() * 100000000);
  private MqttClient mqttClient;
 
  /**
   * 客户端connect连接mqtt服务器
   *
   * @param username 用户名
   * @param password 密码
   * @param mqttCallback 回调函数
   **/
  public void setMqttClient(String username, String password, MqttCallback mqttCallback)
      throws MqttException {
    System.out.println("MQTT服务器连接开始...........");
    MqttConnectOptions options = mqttConnectOptions(username, password);
    mqttClient.setCallback(mqttCallback);
    mqttClient.connect(options);
  }
 
  /**
   * MQTT连接参数设置
   */
  private MqttConnectOptions mqttConnectOptions(String userName, String passWord)
      throws MqttException {
    mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(userName);
    options.setPassword(passWord.toCharArray());
    options.setConnectionTimeout(10);///默认:30
    options.setAutomaticReconnect(true);//默认:false
    options.setCleanSession(false);//默认:true
    //options.setKeepAliveInterval(20);//默认:60
    return options;
  }
 
  /**
   * 关闭MQTT连接
   */
  public void close() throws MqttException {
    try {
      if(mqttClient!=null){
        if(mqttClient.isConnected()){
          mqttClient.unsubscribe(clientId);
        }
        mqttClient.disconnect();
        mqttClient.close();
      }
 
    }catch (Exception e){
      e.printStackTrace();
    }
 
  }
 
  /**
   * 向某个主题发布消息 默认qos:1
   */
  public void pub(String topic, String msg) throws MqttException {
    MqttMessage mqttMessage = new MqttMessage();
    //mqttMessage.setQos(2);
    mqttMessage.setPayload(msg.getBytes());
    MqttTopic mqttTopic = mqttClient.getTopic(topic);
    MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
    token.waitForCompletion();
  }
 
  /**
   * 向某个主题发布消息
   *
   * @param topic: 发布的主题
   * @param msg: 发布的消息
   * @param qos: 消息质量    Qos:0、1、2
   */
  public void pub(String topic, String msg, int qos) throws MqttException {
    MqttMessage mqttMessage = new MqttMessage();
    mqttMessage.setQos(qos);
    mqttMessage.setPayload(msg.getBytes());
    MqttTopic mqttTopic = mqttClient.getTopic(topic);
    MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
    token.waitForCompletion();
  }
 
  /**
   * 订阅某一个主题 ,此方法默认的的Qos等级为:1
   *
   * @param topic 主题
   */
  public void sub(String topic) throws MqttException {
    mqttClient.subscribe(topic);
  }
 
  /**
   * 订阅某一个主题,可携带Qos
   *
   * @param topic 所要订阅的主题
   * @param qos 消息质量:0、1、2
   */
  public void sub(String topic, int qos) throws MqttException {
    mqttClient.subscribe(topic, qos);
  }
 
  public static void main(String[] args) throws MqttException {
    //测试连接及主动订阅信息 输出 通过
    //要做推送 直接在计划任务中连接推送即可,做全局conn对象,然后 每次计划任务轮询发送前判断
    //对象是否存在且 没有关闭 要进行处理
//    MQTTConnect mqttConnect = new MQTTConnect();
//    String msg = "Mr.Qu" + (int) (Math.random() * 100000000);
//    mqttConnect.setMqttClient("dingzhuo", "dingzhuo@2022", new InitCallback());
//    mqttConnect.sub("iot-2/evt/waconn/#",0);
//    try {
//      Thread.sleep(5000);
//    } catch (InterruptedException e) {
//      e.printStackTrace();
//    }
//    mqttConnect.close();
  }
}