Springboot 整合 MQTT

1、簡述

????MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。?MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。

2、創(chuàng)建springboot工程

3、添加依賴

<dependency>

????????< groupId > org.springframework.bootgroupId >

????????< artifactId > spring-boot-starter-integration?< / artifactId >

< dependency >

< dependency >

????????< groupId > org.springframework.integration < / groupId >

????????< artifactId > spring-integration-stream < / artifactId >

< / dependency >

< dependency >

????????< groupId > org.springframework.integration < / groupId >

????????< artifactId > spring-integration-mqtt < / artifactId >

< /dependency >

<?dependency?>

????????<?groupId?>org.projectlombok< /?groupId?>

????????<?artifactId?>?lombok< /?artifactId?>

? ? ? ? <version>1.16.10</version>

? ? ? ? <scope>provided</scope>

< /dependency?>

4、MqttInboundProperties

inBound配置屬性

package com.chen.config;

import lombok.Data;

/**

* @author: ChenJie

* @date 2018/8/21

*/

@Data

public class MqttInboundProperties {

private Stringurl;

? ? private Stringusername;

? ? private Stringpassword;

? ? private StringclientId;

? ? private Stringtopics;

}

5、MqttOutboundProperties

outBound配置屬性

@Setter

@Getter

public class MqttOutboundProperties {

private Stringurls;

? ? private Stringusername;

? ? private Stringpassword;

? ? private StringclientId;

? ? private Stringtopic;

}

6、MqttProperties

配置類

@ConfigurationProperties(prefix ="com.mqtt")

public class MqttProperties {

????private MqttInboundPropertiesinbound;

? ? private MqttOutboundPropertiesoutbound;

? ? public MqttInboundPropertiesgetInbound() {

????????return inbound;

? ? }

????public void setInbound(MqttInboundProperties inbound) {

????????this.inbound = inbound;

? ? }

????public MqttOutboundPropertiesgetOutbound() {

????????return outbound;

? ? }

????public void setOutbound(MqttOutboundProperties outbound) {

????????this.outbound = outbound;

? ? }

}

7、MqttInboundConfiguration

消息接收處理類

@Configuration

@Slf4j

public class MqttInboundConfiguration {

@Autowired

? ? private MqttPropertiesmqttProperties;

? ? @Bean

? ? public MessageChannelmqttInputChannel() {

????????return new DirectChannel();

? ? }

@Bean

? ? public MessageProducerinbound(MqttPahoClientFactory mqttPahoClientFactory) {

????????String[] inboundTopics =mqttProperties.getInbound().getTopics().split(",");

? ? ? ? MqttPahoMessageDrivenChannelAdapter adapter =??new ????????????????????????????????????????????????????????????????????????????MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), ????????????????????????????????????????????????????????????????????????????mqttProperties.getInbound().getClientId(), ? ??mqttPahoClientFactory,inboundTopics);

? ? ? ? adapter.setCompletionTimeout(5000);

? ? ? ? adapter.setConverter(new DefaultPahoMessageConverter());

? ? ? ? adapter.setQos(1);

? ? ? ? adapter.setOutputChannel(mqttInputChannel());

? ? ? ? return adapter;

? ? }

????@Bean

? ? @ServiceActivator(inputChannel ="mqttInputChannel")

? ? ?public MessageHandlerhandler() {

? ? ?????return new MessageHandler() {

????????????@Override

? ? ? ? ? ? public void handleMessage(Message message)throws MessagingException {

????????????????log.info("收到消息:"+(String) message.getPayload());

? ? ? ? ? ? }

? ? ? ? ? };

? ? }

}

8、MqttOutboundConfiguration

消息發(fā)送配置類

@Configuration

public class MqttOutboundConfiguration {

????@Autowired

? ? private MqttPropertiesmqttProperties;

? ? @Bean

? ? public MqttPahoClientFactorymqttClientFactory() {

????????String[] serverURIs =mqttProperties.getOutbound().getUrls().split(",");

? ? ? ? DefaultMqttPahoClientFactory factory =new DefaultMqttPahoClientFactory();

? ? ? ? factory.setServerURIs(serverURIs);

? ? ? ? factory.setCleanSession(false);

? ? ? ? factory.setUserName(mqttProperties.getOutbound().getUsername());

? ? ? ? factory.setPassword(mqttProperties.getOutbound().getPassword());

? ? ? ? return factory;

? ? }

????@Bean

? ? @ServiceActivator(inputChannel ="mqttOutboundChannel")

? ? public MessageHandlermqttOutbound() {

????????MqttPahoMessageHandler messageHandler =?new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(),

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?mqttClientFactory());

? ? ? ? messageHandler.setAsync(true);

? ? ? ? messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());

? ? ? ? return messageHandler;

? ? }

????@Bean

? ? public MessageChannelmqttOutboundChannel() {

????????return new DirectChannel();

? ? }

}

9、MqttGateway

消息發(fā)送service,可以直接調(diào)用來發(fā)送消息

@Component

@MessagingGateway(defaultRequestChannel ="mqttOutboundChannel")

public interface MqttGateway {

????void sendToMqtt(String data);

? ? void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

? ? void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS)int qos, String payload);

}


10、http接口觸發(fā)mqtt消息分發(fā), ?MessageController

@RestController

public class MessageController {

????@Autowired

? ? MqttGatewaymqttGateway;

? ? @RequestMapping(value="/sendMsg")

? ? public StringsendMsg(@RequestParam String message){

? ? ? ? //調(diào)用網(wǎng)關(guān)接口發(fā)送消息

????????mqttGateway.sendToMqtt(message);

? ? ? ? return "success";

? ? }

}

11、springboot啟動(dòng)類

@SpringBootApplication

@Configuration

@EnableConfigurationProperties(MqttProperties.class)

public class MqttSpringbootApplication {

????public static void main(String[] args) {

????????SpringApplication.run(MqttSpringbootApplication.class, args);

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容