第一章 說明
Spring Boot Messaging
Spring框架提供了與消息傳遞系統(tǒng)集成的廣泛支持,從使用JmsTemplate簡化JMS API到異步接收消息的完整基礎(chǔ)結(jié)構(gòu)。Spring AMQP為高級消息隊(duì)列協(xié)議提供了類似的特性集。Spring Boot還為RabbitTemplate和RabbitMQ提供了自動(dòng)配置選項(xiàng)。Spring WebSocket本身就支持STOMP消息傳遞,Spring Boot通過啟動(dòng)器和少量的自動(dòng)配置支持這一點(diǎn)。Spring Boot也支持Apache Kafka。
AMQP
高級消息隊(duì)列協(xié)議(Advanced Message queue Protocol, AMQP)是面向消息中間件的平臺(tái)無關(guān)的、線級協(xié)議。Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)。Spring Boot為通過RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。
RabbitMQ 擴(kuò)展
RabbitMQ是一個(gè)基于AMQP協(xié)議的輕量級、可靠、可伸縮和可移植的消息代理。Spring使用RabbitMQ通過AMQP協(xié)議進(jìn)行通信。
第二章 編寫代碼
2.1 啟動(dòng) RabbitMQ
安裝 RabbitMQ 服務(wù),以 Mac 為例
brew install rabbitmq
等待安裝完成后,進(jìn)入安裝目錄 '/usr/local/Cellar/rabbitmq/3.7.4/sbin',啟動(dòng)
cd /usr/local/Cellar/rabbitmq/3.7.4/sbin
./rabbitmq-server
如果啟動(dòng)成功,將會(huì)看到
## ##
## ## RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
/usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log
Starting broker...
completed with 6 plugins.
2.2 代碼
2.2.1 文件結(jié)構(gòu)
src/
+- main/
+- java/
+- com/
+- lee/
+- springbootdemo/
+- config
+- RabbitMQConfig.java
+- pojo/
+- Receiver.java
+- Runner.java
+- SpringBootDemoApplication.java
+- resources/
+- <other resource>
2.2.2 創(chuàng)建 RabbitMQ 消息接受
src/main/java/com/lee/springbootdemo/pojo/Receiver.java
package com.lee.springbootdemo.pojo;
import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
2.2.3 注冊監(jiān)聽和發(fā)送消息
src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java
package com.lee.springbootdemo.config;
import com.lee.springbootdemo.pojo.Receiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String topicExchangeName = "spring-boot-exchange";
public static final String queueName = "spring-boot";
@Bean
public Queue queue() {
return new Queue(queueName, false);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(topicExchangeName);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
public MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
queue()方法創(chuàng)建一個(gè) AMQP 隊(duì)列exchange()方法創(chuàng)建一個(gè) direct 交換器-
binding()方法把上面兩個(gè)內(nèi)容綁定在一起,定義 RabbitTemplate 發(fā)布消息的行為Spring AMQP要求將隊(duì)列、TopicExchange和綁定聲明為頂級Spring bean,以便正確設(shè)置。
在這里,我們把 direct exchange 和 queue 綁定的路由鍵(routing key)是
foo.bar.#
2.2.4 測試發(fā)布消息
src/main/java/com/lee/springbootdemo/pojo/Runner.java
package com.lee.springbootdemo.pojo;
import com.lee.springbootdemo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.receiver = receiver;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message ...");
rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
這里我們定義的路由鍵(routing key)是foo.bar.baz
2.2.5 運(yùn)行
運(yùn)行 SpringBootDemoApplication.main() 方法,如果運(yùn)行成功,將會(huì)看到
Sending message ...
Received <Hello from RabbitMQ!>
附錄 A 參考
上一篇:4. Spring Boot Caching Redis
下一篇: