環(huán)境:
JDK8
SpringBoot 2.1.3.RELEASE
依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1. 提供者
配置文件:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true # 確保消息不丟失
publisher-returns: true # 確保消息不丟失
注冊組件:
因為RabbitMQ中Exchange有三種模式:訂閱、路由、通配符。
- direct(路由模式):根據(jù)routingKey值選擇對應(yīng)的Binding。

direct模式.png
- fanout(訂閱模式):每個和交換機綁定的隊列都會收到消息,相當于廣播。

fanout模式.png
- topic(通配符模式):支持routingKey的模糊匹配選擇Binding。

topic模式.png
用戶可以在項目啟動時,向MQ注冊一些Exchange和Queue。用戶可以自由組合,注冊Binding關(guān)系。它們使用routingKey進行區(qū)別。
生產(chǎn)者發(fā)送消息需要指定Exchange和routingKey。請求到達Exchange后,根據(jù)Exchange的模式和routingKey找到一個或一組Binding關(guān)系。并將消息發(fā)送Binding對應(yīng)到Queue中。
消費者監(jiān)聽對應(yīng)的Queue,來處理消息。
- direct也稱為路由模式,消息到達Exchange后,會根據(jù)指定的
routingKey路由到指定routingKey的Binding上。
所以需要在注冊Binding時,指定各個Binding的路由鍵。
@Configuration
public class DirectConfig {
@Bean("directMessage1")
public Queue directQueue1() {
//name才是queue的名字,消費者實際監(jiān)聽的是dirQueue-1隊列
return new Queue("dirQueue-1");
}
@Bean("directMessage2")
public Queue directQueue2() {
return new Queue("dirQueue-2");
}
//注冊交換機
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public Binding bindingDirectExchange1(@Qualifier("directMessage1") Queue queue,
DirectExchange directExchange) {
String routingKey = "directExchange.message-1";
return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
}
@Bean
public Binding bindingDirectExchange2(@Qualifier("directMessage2") Queue queue,
DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("directExchange.message-2");
}
}
- fanout模式不需要指定
routingKey關(guān)系,消息發(fā)送到Exchange后,將分發(fā)到與交換機綁定的各個Queue中。也稱為訂閱-發(fā)布模式。
@Configuration
public class FanooutConfig {
@Bean(name = "AMessage")
public Queue fanAMessage() {
return new Queue("fanout.A");
}
@Bean(name = "BMessage")
public Queue fanBMessage() {
return new Queue("fanout.B");
}
@Bean(name = "CMessage")
public Queue fanCMessage() {
return new Queue("fanout.C");
}
//廣播模式
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(@Qualifier("AMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(@Qualifier("BMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(@Qualifier("CMessage") Queue message, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(message).to(fanoutExchange);
}
}
- topic通配符模式,注冊的Binding的
routingKey可以使用#或者*來進行模糊匹配。
消息到達Exchange后,使用指定的routingKey模糊匹配到注冊的routingKey。
- * 代表的是一個單詞。
- # 代表的是一個或多個單詞。
@Configuration
public class TopicConfig {
@Bean("message")
public Queue queueMessage() {
return new Queue("topic.message");
}
@Bean("messages")
public Queue queueMessages() {
return new Queue("topic.messages");
}
@Bean
public TopicExchange exchange(){
return new TopicExchange("topicExchange");
}
//普通綁定
@Bean
Binding bindingExchangeMessage(@Qualifier("message") Queue message, TopicExchange exchange){
return BindingBuilder.bind(message).to(exchange).with("topic.message");
}
//通配符綁定
@Bean("topicBindingExchangeMessages")
Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessage,TopicExchange exchange){
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#");
}
}
生產(chǎn)者發(fā)送消息:
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
//直接向隊列中發(fā)送數(shù)據(jù)
@GetMapping("send")
public String send() {
String content = "Date:" + System.currentTimeMillis();
rabbitTemplate.convertAndSend("kinson", content);
return content;
}
@GetMapping("sendDirect")
public Book sendDirect() {
Book book = new Book();
book.setId("001");
book.setName("JAVA編思想");
book.setPrice(100);
book.setInfo("學(xué)習(xí)JAVA必備");
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend("directExchange",
"directExchange.message", book, correlationData);
return book;
}
@GetMapping("sendFanout")
public Book sendFanout() {
Book book = new Book();
book.setId("005");
book.setName("深入理解JVM虛擬機");
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend("fanoutExchange", ""
, book, correlationData);
return book;
}
@GetMapping("sendTopic")
public Book sendTopic() {
Book book = new Book();
book.setId("003");
book.setName("mysql高性能優(yōu)化");
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend("topicExchange", "topic.message"
, book, correlationData);
return book;
}
/**
* * 可以代替一個單詞。
* # 可以替代零個或多個單詞。
*/
@GetMapping("sendTopic2")
public Book sendTopic2() {
Book book = new Book();
book.setId("004");
book.setName("高并發(fā)實戰(zhàn)");
String id = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.convertAndSend("topicExchange", "topic.xxx"
, book, correlationData);
return book;
}
}
2. 消費者
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 手動確定(默認自動確認)
concurrency: 5 # 消費端的監(jiān)聽個數(shù)
max-concurrency: 10 # 消費端的監(jiān)聽最大個數(shù)
connection-timeout: 15000 # 超時時間
@Component
@Slf4j
public class MyReceiver1 {
@RabbitListener(queues = {"kinson"})
public void receiver(Message msg, Channel channel) {
byte[] messageBytes = msg.getBody();
if (messageBytes != null && messageBytes.length > 0) {
//打印數(shù)據(jù)
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("開始消費:{}\n channel:{}", message, channel);
}
}
//監(jiān)聽的Queue
//沒有找到監(jiān)聽的Queue啟動時會出現(xiàn)的異常:(reply-code=404, reply-text=NOT_FOUND - no queue 'directMessage' in vhost '/', class-id=50, method-id=10)
@RabbitListener(queues = "dirQueue-1")
public void receiverDirect(Message msg, Channel channel) throws IOException, ClassNotFoundException {
log.info("【DirectExchange 綁定的隊列】");
byte[] messageBytes = msg.getBody();
Book book = (Book) deserializable(messageBytes);
log.info("開始消費:[{}]", JSON.toJSONString(book));
}
public static Object deserializable(byte[] bytes) throws IOException, ClassNotFoundException {
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
return in.readObject();
}
}
相關(guān)閱讀
【*** window10安裝RabbitMQ資源及教程 ***】
【*** SpringBoot2.x集成RabbitMQ***】
【*** SpringBoot2.x下RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)***】
推薦閱讀
RabbitMQ中 exchange、route、queue的關(guān)系