package com.dingzhuo.energy.project.conglomeratepush.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * MQTT工具类操作 * * @author zhw * @since 2022/04/05 */ @Slf4j @Component public class MQTTConnect { @Value("${mqtt.host}") private String HOST; private final String clientId = "ClientHBT" + (int) (Math.random() * 100000000); private MqttClient mqttClient; /** * 客户端connect连接mqtt服务器 * * @param username 用户名 * @param password 密码 * @param mqttCallback 回调函数 **/ public void setMqttClient(String username, String password, MqttCallback mqttCallback) throws MqttException { System.out.println("MQTT服务器连接开始..........."); MqttConnectOptions options = mqttConnectOptions(username, password); mqttClient.setCallback(mqttCallback); mqttClient.connect(options); } /** * MQTT连接参数设置 */ private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException { mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(userName); options.setPassword(passWord.toCharArray()); options.setConnectionTimeout(10);///默认:30 options.setAutomaticReconnect(true);//默认:false options.setCleanSession(false);//默认:true //options.setKeepAliveInterval(20);//默认:60 return options; } /** * 关闭MQTT连接 */ public void close() throws MqttException { try { if(mqttClient!=null){ if(mqttClient.isConnected()){ mqttClient.unsubscribe(clientId); } mqttClient.disconnect(); mqttClient.close(); } }catch (Exception e){ e.printStackTrace(); } } /** * 向某个主题发布消息 默认qos:1 */ public void pub(String topic, String msg) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); //mqttMessage.setQos(2); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 向某个主题发布消息 * * @param topic: 发布的主题 * @param msg: 发布的消息 * @param qos: 消息质量 Qos:0、1、2 */ public void pub(String topic, String msg, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(qos); mqttMessage.setPayload(msg.getBytes()); MqttTopic mqttTopic = mqttClient.getTopic(topic); MqttDeliveryToken token = mqttTopic.publish(mqttMessage); token.waitForCompletion(); } /** * 订阅某一个主题 ,此方法默认的的Qos等级为:1 * * @param topic 主题 */ public void sub(String topic) throws MqttException { mqttClient.subscribe(topic); } /** * 订阅某一个主题,可携带Qos * * @param topic 所要订阅的主题 * @param qos 消息质量:0、1、2 */ public void sub(String topic, int qos) throws MqttException { mqttClient.subscribe(topic, qos); } public static void main(String[] args) throws MqttException { //测试连接及主动订阅信息 输出 通过 //要做推送 直接在计划任务中连接推送即可,做全局conn对象,然后 每次计划任务轮询发送前判断 //对象是否存在且 没有关闭 要进行处理 // MQTTConnect mqttConnect = new MQTTConnect(); // String msg = "Mr.Qu" + (int) (Math.random() * 100000000); // mqttConnect.setMqttClient("dingzhuo", "dingzhuo@2022", new InitCallback()); // mqttConnect.sub("iot-2/evt/waconn/#",0); // try { // Thread.sleep(5000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // mqttConnect.close(); } }