SpringBoot2.x集成MQTT实现消息推送
1.引⼊相关的依赖
org.springframework.boot
spring-boot-starter-integration
org.springframework.integration spring-integration-stream
org.springframework.integration spring-integration-mqtt
2.在配置⽂件中配置MQTT服务器信息
spring.mqtt.username = usernamespring.mqtt.password = password
spring.mqtt.url = tcp://xx.xx.xx.xx:18083 spring.mqtt.client.id = clientid spring.mqtt.default.topic = topic
spring.mqtt.default.completionTimeout = 3000
3.配置MQTT消息推送配置
/**
* @Author: songyaru * @Date: 2020/9/1 13:42 * @Version 1.0 */
@Configuration
@IntegrationComponentScanpublic class MqttSenderConfig {
@Value(\"${spring.mqtt.username}\") private String username;
@Value(\"${spring.mqtt.password}\") private String password; @Value(\"${spring.mqtt.url}\") private String hostUrl;
@Value(\"${spring.mqtt.client.id}\") private String clientId;
@Value(\"${spring.mqtt.default.topic}\") private String defaultTopic;
@Value(\"${spring.mqtt.default.completionTimeout}\") private int completionTimeout;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; }
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; }
@Bean
@ServiceActivator(inputChannel = \"mqttOutboundChannel\") public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic); return messageHandler; }
@Bean
public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }}
4.MQTT消息推送接⼝
/**
* @Author: songyaru * @Date: 2020/9/1 13:51 * @Version 1.0 */
@MessagingGateway(defaultRequestChannel = \"mqttOutboundChannel\")public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);}
5.MQTT消息推送API
/**
* @Author: songyaru * @Date: 2020/9/1 13:52 * @Version 1.0 */
@RestController
public class MessageController { @Autowired
MqttGateway mqttGateway;
/***
* 发布消息,⽤于其他客户端消息接收测试 */
@RequestMapping(\"/sendMqttMessage\")
public String sendMqttMessage(String message, String topic) { mqttGateway.sendToMqtt(message, topic); return \"ok\"; }}
6、测试
在POSTMAN中进⾏测试了,输⼊消息内容和主题,就可以在相应的频道发送消息了。使⽤其它的消息客户端进⾏测试,可以接受到消息。