1、簡述
????MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用。?MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用。
2、創(chuàng)建springboot工程
3、添加依賴
<dependency>
????????< groupId > org.springframework.bootgroupId >
????????< artifactId > spring-boot-starter-integration?< / artifactId >
< dependency >
< dependency >
????????< groupId > org.springframework.integration < / groupId >
????????< artifactId > spring-integration-stream < / artifactId >
< / dependency >
< dependency >
????????< groupId > org.springframework.integration < / groupId >
????????< artifactId > spring-integration-mqtt < / artifactId >
< /dependency >
<?dependency?>
????????<?groupId?>org.projectlombok< /?groupId?>
????????<?artifactId?>?lombok< /?artifactId?>
? ? ? ? <version>1.16.10</version>
? ? ? ? <scope>provided</scope>
< /dependency?>
4、MqttInboundProperties
inBound配置屬性
package com.chen.config;
import lombok.Data;
/**
* @author: ChenJie
* @date 2018/8/21
*/
@Data
public class MqttInboundProperties {
private Stringurl;
? ? private Stringusername;
? ? private Stringpassword;
? ? private StringclientId;
? ? private Stringtopics;
}
5、MqttOutboundProperties
outBound配置屬性
@Setter
@Getter
public class MqttOutboundProperties {
private Stringurls;
? ? private Stringusername;
? ? private Stringpassword;
? ? private StringclientId;
? ? private Stringtopic;
}
6、MqttProperties
配置類
@ConfigurationProperties(prefix ="com.mqtt")
public class MqttProperties {
????private MqttInboundPropertiesinbound;
? ? private MqttOutboundPropertiesoutbound;
? ? public MqttInboundPropertiesgetInbound() {
????????return inbound;
? ? }
????public void setInbound(MqttInboundProperties inbound) {
????????this.inbound = inbound;
? ? }
????public MqttOutboundPropertiesgetOutbound() {
????????return outbound;
? ? }
????public void setOutbound(MqttOutboundProperties outbound) {
????????this.outbound = outbound;
? ? }
}
7、MqttInboundConfiguration
消息接收處理類
@Configuration
@Slf4j
public class MqttInboundConfiguration {
@Autowired
? ? private MqttPropertiesmqttProperties;
? ? @Bean
? ? public MessageChannelmqttInputChannel() {
????????return new DirectChannel();
? ? }
@Bean
? ? public MessageProducerinbound(MqttPahoClientFactory mqttPahoClientFactory) {
????????String[] inboundTopics =mqttProperties.getInbound().getTopics().split(",");
? ? ? ? MqttPahoMessageDrivenChannelAdapter adapter =??new ????????????????????????????????????????????????????????????????????????????MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInbound().getUrl(), ????????????????????????????????????????????????????????????????????????????mqttProperties.getInbound().getClientId(), ? ??mqttPahoClientFactory,inboundTopics);
? ? ? ? adapter.setCompletionTimeout(5000);
? ? ? ? adapter.setConverter(new DefaultPahoMessageConverter());
? ? ? ? adapter.setQos(1);
? ? ? ? adapter.setOutputChannel(mqttInputChannel());
? ? ? ? return adapter;
? ? }
????@Bean
? ? @ServiceActivator(inputChannel ="mqttInputChannel")
? ? ?public MessageHandlerhandler() {
? ? ?????return new MessageHandler() {
????????????@Override
? ? ? ? ? ? public void handleMessage(Message message)throws MessagingException {
????????????????log.info("收到消息:"+(String) message.getPayload());
? ? ? ? ? ? }
? ? ? ? ? };
? ? }
}
8、MqttOutboundConfiguration
消息發(fā)送配置類
@Configuration
public class MqttOutboundConfiguration {
????@Autowired
? ? private MqttPropertiesmqttProperties;
? ? @Bean
? ? public MqttPahoClientFactorymqttClientFactory() {
????????String[] serverURIs =mqttProperties.getOutbound().getUrls().split(",");
? ? ? ? DefaultMqttPahoClientFactory factory =new DefaultMqttPahoClientFactory();
? ? ? ? factory.setServerURIs(serverURIs);
? ? ? ? factory.setCleanSession(false);
? ? ? ? factory.setUserName(mqttProperties.getOutbound().getUsername());
? ? ? ? factory.setPassword(mqttProperties.getOutbound().getPassword());
? ? ? ? return factory;
? ? }
????@Bean
? ? @ServiceActivator(inputChannel ="mqttOutboundChannel")
? ? public MessageHandlermqttOutbound() {
????????MqttPahoMessageHandler messageHandler =?new MqttPahoMessageHandler(mqttProperties.getOutbound().getClientId(),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?mqttClientFactory());
? ? ? ? messageHandler.setAsync(true);
? ? ? ? messageHandler.setDefaultTopic(mqttProperties.getOutbound().getTopic());
? ? ? ? return messageHandler;
? ? }
????@Bean
? ? public MessageChannelmqttOutboundChannel() {
????????return new DirectChannel();
? ? }
}
9、MqttGateway
消息發(fā)送service,可以直接調(diào)用來發(fā)送消息
@Component
@MessagingGateway(defaultRequestChannel ="mqttOutboundChannel")
public interface MqttGateway {
????void sendToMqtt(String data);
? ? void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
? ? void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS)int qos, String payload);
}
10、http接口觸發(fā)mqtt消息分發(fā), ?MessageController
@RestController
public class MessageController {
????@Autowired
? ? MqttGatewaymqttGateway;
? ? @RequestMapping(value="/sendMsg")
? ? public StringsendMsg(@RequestParam String message){
? ? ? ? //調(diào)用網(wǎng)關(guān)接口發(fā)送消息
????????mqttGateway.sendToMqtt(message);
? ? ? ? return "success";
? ? }
}
11、springboot啟動(dòng)類
@SpringBootApplication
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
public class MqttSpringbootApplication {
????public static void main(String[] args) {
????????SpringApplication.run(MqttSpringbootApplication.class, args);
? ? }
}