5. Spring Boot Messaging

第一章 說明

Spring Boot Messaging
Spring框架提供了與消息傳遞系統(tǒng)集成的廣泛支持,從使用JmsTemplate簡化JMS API到異步接收消息的完整基礎(chǔ)結(jié)構(gòu)。Spring AMQP為高級消息隊(duì)列協(xié)議提供了類似的特性集。Spring Boot還為RabbitTemplate和RabbitMQ提供了自動(dòng)配置選項(xiàng)。Spring WebSocket本身就支持STOMP消息傳遞,Spring Boot通過啟動(dòng)器和少量的自動(dòng)配置支持這一點(diǎn)。Spring Boot也支持Apache Kafka。

AMQP
高級消息隊(duì)列協(xié)議(Advanced Message queue Protocol, AMQP)是面向消息中間件的平臺(tái)無關(guān)的、線級協(xié)議。Spring AMQP項(xiàng)目將核心Spring概念應(yīng)用于基于AMQP的消息傳遞解決方案的開發(fā)。Spring Boot為通過RabbitMQ使用AMQP提供了一些便利,包括Spring-Boot-Starter-AMQP“Starter”。

RabbitMQ 擴(kuò)展
RabbitMQ是一個(gè)基于AMQP協(xié)議的輕量級、可靠、可伸縮和可移植的消息代理。Spring使用RabbitMQ通過AMQP協(xié)議進(jìn)行通信。

第二章 編寫代碼

2.1 啟動(dòng) RabbitMQ

安裝 RabbitMQ 服務(wù),以 Mac 為例

brew install rabbitmq

等待安裝完成后,進(jìn)入安裝目錄 '/usr/local/Cellar/rabbitmq/3.7.4/sbin',啟動(dòng)

cd /usr/local/Cellar/rabbitmq/3.7.4/sbin
./rabbitmq-server

如果啟動(dòng)成功,將會(huì)看到

  ##  ##
  ##  ##      RabbitMQ 3.7.4. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
                    /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

              Starting broker...
 completed with 6 plugins.

2.2 代碼

2.2.1 文件結(jié)構(gòu)

src/
 +- main/
     +- java/
         +- com/
             +- lee/
                 +- springbootdemo/
                     +- config
                         +- RabbitMQConfig.java
                     +- pojo/
                         +- Receiver.java
                         +- Runner.java
                     +- SpringBootDemoApplication.java
     +- resources/
         +- <other resource>

2.2.2 創(chuàng)建 RabbitMQ 消息接受

src/main/java/com/lee/springbootdemo/pojo/Receiver.java

package com.lee.springbootdemo.pojo;

import org.springframework.stereotype.Component;
import java.util.concurrent.CountDownLatch;

@Component
public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

2.2.3 注冊監(jiān)聽和發(fā)送消息

src/main/java/com/lee/springbootdemo/config/RabbitMQConfig.java

package com.lee.springbootdemo.config;

import com.lee.springbootdemo.pojo.Receiver;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    public static final String topicExchangeName = "spring-boot-exchange";
    public static final String queueName = "spring-boot";

    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(topicExchangeName);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    public MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}
  • queue() 方法創(chuàng)建一個(gè) AMQP 隊(duì)列

  • exchange() 方法創(chuàng)建一個(gè) direct 交換器

  • binding() 方法把上面兩個(gè)內(nèi)容綁定在一起,定義 RabbitTemplate 發(fā)布消息的行為

    Spring AMQP要求將隊(duì)列、TopicExchange和綁定聲明為頂級Spring bean,以便正確設(shè)置。

  • 在這里,我們把 direct exchange 和 queue 綁定的路由鍵(routing key)是 foo.bar.#

2.2.4 測試發(fā)布消息

src/main/java/com/lee/springbootdemo/pojo/Runner.java

package com.lee.springbootdemo.pojo;

import com.lee.springbootdemo.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

@Component
public class Runner implements CommandLineRunner {

    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.receiver = receiver;
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message ...");
        rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
    }
}

這里我們定義的路由鍵(routing key)是foo.bar.baz

2.2.5 運(yùn)行

運(yùn)行 SpringBootDemoApplication.main() 方法,如果運(yùn)行成功,將會(huì)看到

Sending message ...
Received <Hello from RabbitMQ!>

附錄 A 參考

  1. Spring Boot Messaging 官方文檔
  2. Spring Messaging 代碼案例

上一篇:4. Spring Boot Caching Redis

下一篇:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容