package com.zhitan.config.mqtt;
|
|
import com.zhitan.handler.MqttMessageHandler;
|
import com.zhitan.service.IDataService;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Configuration;
|
import org.springframework.integration.dsl.IntegrationFlow;
|
import org.springframework.integration.dsl.IntegrationFlows;
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
|
@Configuration
|
public class MqttInboundConfig {
|
|
private final MqttPahoClientFactory mqttClientFactory;
|
|
private final IDataService dataService;
|
|
@Value("${spring.mqtt.client-id}")
|
private String clientId;
|
|
@Value("${spring.mqtt.default-topic}")
|
private String defaultTopic;
|
|
public MqttInboundConfig(MqttPahoClientFactory mqttClientFactory, IDataService dataService) {
|
this.mqttClientFactory = mqttClientFactory;
|
this.dataService = dataService;
|
}
|
|
// 订阅消息适配器
|
@Bean
|
public MqttPahoMessageDrivenChannelAdapter inboundAdapter() {
|
return new MqttPahoMessageDrivenChannelAdapter(clientId + "-inbound", mqttClientFactory, defaultTopic);
|
}
|
|
// 定义消息处理流
|
@Bean
|
public IntegrationFlow mqttInFlow() {
|
return IntegrationFlows.from(inboundAdapter())
|
.handle(new MqttMessageHandler(dataService))
|
.get();
|
}
|
}
|