package com.zhitan.framework.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Objects; @Component @Slf4j public class MqttClientUtil { @Value("${mqtt.username}") private String username; @Value("${mqtt.password}") private String password; @Value("${mqtt.host}")//这个是安装mqtt的ip以及端口,1883是mqtt默认端口 private String host; @Value("${mqtt.clientId}")//这个随便写但是是唯一的。 private String clientId; @Value("${mqtt.topics}") //这个是mqtt发送消息的咱们要订阅的topic, cyt/# 代表以cyt/开始的所有topic都接收 private String topic; @Value("${mqtt.timeout}")//IOT_MQTT_Yield会block住timeout的时间去尝试接收数据,直到timeout才会退出。可以写在这里也可以写在yml配置文件中 private int timeOut; @Value("${mqtt.keepalive}") private int interval; @Autowired private MqttMessageCallback mqttMessageCallback2; private MqttClient mqttClient; private MqttConnectOptions mqttConnectOptions; @PostConstruct private void init(){ connect(host, clientId, topic); } /** * 链接mqtt * @param host * @param clientId */ private void connect(String host,String clientId,String topic){ try{ mqttClient = new MqttClient(host,clientId,new MemoryPersistence()); mqttConnectOptions = getMqttConnectOptions(); //设置回调函数 mqttClient.setCallback(mqttMessageCallback2); //链接mqtt mqttClient.connect(mqttConnectOptions); //订阅消息 mqttClient.subscribe(topic,2); }catch (Exception e){ log.error("mqtt服务链接异常!"); e.printStackTrace(); } } /** * 设置链接对象信息 * setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息 * @return */ private MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{host}); mqttConnectOptions.setKeepAliveInterval(interval); mqttConnectOptions.setConnectionTimeout(timeOut); mqttConnectOptions.setCleanSession(true); return mqttConnectOptions; } /** *mqtt链接状态 * @return */ private boolean isConnect(){ if(Objects.isNull(this.mqttClient)){ return false; } return mqttClient.isConnected(); } /** * 设置重连 * @throws Exception */ private void reConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服务已重新链接..."); this.mqttClient.connect(this.mqttConnectOptions); } } /** * 断开链接 * @throws Exception */ private void closeConnect() throws Exception{ if(Objects.nonNull(this.mqttClient)){ log.info("mqtt 服务已断开链接..."); this.mqttClient.disconnect(); } } /** * 发布消息 * @param topic * @param message * @param qos * @throws Exception */ public void sendMessage(String topic,String message,int qos) throws Exception { if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(message.getBytes()); mqttMessage.setQos(qos); MqttTopic mqttTopic = mqttClient.getTopic(topic); if(Objects.nonNull(mqttTopic)){ try{ MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); if(publish.isComplete()){ log.info("消息发送成功---->{}",message); } }catch(Exception e){ log.error("消息发送异常",e); } } }else{ reConnect(); } } }