【RabbitMQ-1】SpringBoot2.x集成使用

環(huán)境:
JDK8
SpringBoot 2.1.3.RELEASE

依賴:

<dependency> 
   <groupId>org.springframework.boot</groupId>  
   <artifactId>spring-boot-starter-amqp</artifactId> 
</dependency>

1. 提供者

配置文件:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true  # 確保消息不丟失
    publisher-returns: true # 確保消息不丟失

注冊組件:

因為RabbitMQ中Exchange有三種模式:訂閱、路由、通配符。

  1. direct(路由模式):根據(jù)routingKey值選擇對應(yīng)的Binding。
direct模式.png
  1. fanout(訂閱模式):每個和交換機綁定的隊列都會收到消息,相當于廣播。
fanout模式.png
  1. topic(通配符模式):支持routingKey的模糊匹配選擇Binding。
topic模式.png

用戶可以在項目啟動時,向MQ注冊一些Exchange和Queue。用戶可以自由組合注冊Binding關(guān)系。它們使用routingKey進行區(qū)別。

生產(chǎn)者發(fā)送消息需要指定ExchangeroutingKey。請求到達Exchange后,根據(jù)Exchange的模式和routingKey找到一個或一組Binding關(guān)系。并將消息發(fā)送Binding對應(yīng)到Queue中。

消費者監(jiān)聽對應(yīng)的Queue,來處理消息。


  1. direct也稱為路由模式,消息到達Exchange后,會根據(jù)指定的routingKey路由到指定routingKey的Binding上。
    所以需要在注冊Binding時,指定各個Binding的路由鍵。
@Configuration
public class DirectConfig {
    @Bean("directMessage1")
    public Queue directQueue1() {
        //name才是queue的名字,消費者實際監(jiān)聽的是dirQueue-1隊列
        return new Queue("dirQueue-1");
    }
    @Bean("directMessage2")
    public Queue directQueue2() {
        return new Queue("dirQueue-2");
    }
    //注冊交換機
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
    @Bean
    public Binding bindingDirectExchange1(@Qualifier("directMessage1") Queue queue,
                                          DirectExchange directExchange) {
        String routingKey = "directExchange.message-1";
        return BindingBuilder.bind(queue).to(directExchange).with(routingKey);
    }
    @Bean
    public Binding bindingDirectExchange2(@Qualifier("directMessage2") Queue queue,
                                          DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("directExchange.message-2");
    }
}
  1. fanout模式不需要指定routingKey關(guān)系,消息發(fā)送到Exchange后,將分發(fā)到與交換機綁定的各個Queue中。也稱為訂閱-發(fā)布模式。
@Configuration
public class FanooutConfig {
    @Bean(name = "AMessage")
    public Queue fanAMessage() {
        return new Queue("fanout.A");
    }
    @Bean(name = "BMessage")
    public Queue fanBMessage() {
        return new Queue("fanout.B");
    }
    @Bean(name = "CMessage")
    public Queue fanCMessage() {
        return new Queue("fanout.C");
    }
    //廣播模式
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
    @Bean
    Binding bindingExchangeA(@Qualifier("AMessage") Queue message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(message).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeB(@Qualifier("BMessage") Queue message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(message).to(fanoutExchange);
    }
    @Bean
    Binding bindingExchangeC(@Qualifier("CMessage") Queue message, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(message).to(fanoutExchange);
    }
}
  1. topic通配符模式,注冊的Binding的routingKey可以使用#或者*來進行模糊匹配。

消息到達Exchange后,使用指定的routingKey模糊匹配到注冊的routingKey。

  • * 代表的是一個單詞。
  • # 代表的是一個或多個單詞。
@Configuration
public class TopicConfig {
    @Bean("message")
    public Queue queueMessage() {
        return new Queue("topic.message");
    }
    @Bean("messages")
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    @Bean
    public TopicExchange exchange(){
        return new TopicExchange("topicExchange");
    }
    //普通綁定
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue message, TopicExchange exchange){
        return BindingBuilder.bind(message).to(exchange).with("topic.message");
    }
    //通配符綁定
    @Bean("topicBindingExchangeMessages")
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessage,TopicExchange exchange){
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.#");
    }
}

生產(chǎn)者發(fā)送消息:

@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //直接向隊列中發(fā)送數(shù)據(jù)
    @GetMapping("send")
    public String send() {
        String content = "Date:" + System.currentTimeMillis();
        rabbitTemplate.convertAndSend("kinson", content);
        return content;
    }


    @GetMapping("sendDirect")
    public Book sendDirect() {
        Book book = new Book();
        book.setId("001");
        book.setName("JAVA編思想");
        book.setPrice(100);
        book.setInfo("學(xué)習(xí)JAVA必備");
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend("directExchange",
                "directExchange.message", book, correlationData);
        return book;
    }

    @GetMapping("sendFanout")
    public Book sendFanout() {
        Book book = new Book();
        book.setId("005");
        book.setName("深入理解JVM虛擬機");
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend("fanoutExchange", ""
                , book, correlationData);
        return book;
    }

    @GetMapping("sendTopic")
    public Book sendTopic() {
        Book book = new Book();
        book.setId("003");
        book.setName("mysql高性能優(yōu)化");
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message"
                , book, correlationData);
        return book;
    }

    /**
     * * 可以代替一個單詞。
     * # 可以替代零個或多個單詞。
     */
    @GetMapping("sendTopic2")
    public Book sendTopic2() {
        Book book = new Book();
        book.setId("004");
        book.setName("高并發(fā)實戰(zhàn)");
        String id = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(id);
        rabbitTemplate.convertAndSend("topicExchange", "topic.xxx"
                , book, correlationData);
        return book;
    }
}

2. 消費者

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
#        acknowledge-mode: manual  # 手動確定(默認自動確認)
        concurrency: 5 # 消費端的監(jiān)聽個數(shù)
        max-concurrency: 10 # 消費端的監(jiān)聽最大個數(shù)
    connection-timeout: 15000   # 超時時間
@Component
@Slf4j
public class MyReceiver1 {
    @RabbitListener(queues = {"kinson"})
    public void receiver(Message msg, Channel channel) {

        byte[] messageBytes = msg.getBody();

        if (messageBytes != null && messageBytes.length > 0) {
            //打印數(shù)據(jù)
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("開始消費:{}\n channel:{}", message, channel);
        }
    }
    //監(jiān)聽的Queue
    //沒有找到監(jiān)聽的Queue啟動時會出現(xiàn)的異常:(reply-code=404, reply-text=NOT_FOUND - no queue 'directMessage' in vhost '/', class-id=50, method-id=10)
    @RabbitListener(queues = "dirQueue-1")
    public void receiverDirect(Message msg, Channel channel) throws IOException, ClassNotFoundException {
        log.info("【DirectExchange 綁定的隊列】");
        byte[] messageBytes = msg.getBody();
        Book book = (Book) deserializable(messageBytes);
        log.info("開始消費:[{}]", JSON.toJSONString(book));
    }


    public static Object deserializable(byte[] bytes) throws IOException, ClassNotFoundException {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));
        return in.readObject();
    }
}

相關(guān)閱讀

【*** window10安裝RabbitMQ資源及教程 ***】

【*** SpringBoot2.x集成RabbitMQ***】

【*** SpringBoot2.x下RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)***】

推薦閱讀

RabbitMQ中 exchange、route、queue的關(guān)系

SpringBoot 2.1.3.RELEASE整合amqp官方文檔

RabbitMQ官網(wǎng)—MQ的消息模型

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容