SpringBoot整合RabbitMQ的基礎搭建

RabbitMQ的整體概括

RabbitMQ是對于AMQP(高級消息隊列協(xié)議)的具體實現(xiàn),是一個用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息的網(wǎng)絡通信協(xié)議。

應用解耦

此模型表示,消息中間件broker是消息的容器,它從生產(chǎn)者接收消息,并根據(jù)路由規(guī)則把消息投遞給消費者。需要注意的是,生產(chǎn)者不是直接將消息投遞給broker,而是通過路由鍵(RoutingKey)投遞給交換機(Exchange),交換機綁定具體的隊列(Queue),而消費者消費的消息只與隊列有關(guān),消費將會有推拉兩種模式。

圖示如下:

AMQP協(xié)議模型

AMQP 協(xié)議層角色相關(guān)的概念:

  • 生產(chǎn)者(producer):產(chǎn)生消息的應用,能夠傳遞消息到消息中間件的應用。
  • 消息中間件(brokers):消息傳遞的中間載體,即我們今天的主角 RabbitMQ。
  • 消費者(consumers):接收并處理消息的應用。從消息中間件獲取消息并處理。
  • 連接(Connection):生產(chǎn)者或消費者和消息中間件之間需要建立起連接。AMQP 應用層協(xié)議使用的是能夠提供可靠投遞的 TCP 連接,AMQP 的連接通常是長連接,AMQP 使用認證機制并且提供 TLS(SSL)保護。當我們的生產(chǎn)者 或 消費者 不再需要連接到消息中間件的的時候,需要優(yōu)雅的釋放掉它們與消息中間件 TCP 連接,而不是直接將 TCP 連接關(guān)閉。
  • 信道(channel):通常情況下生產(chǎn)者 或 消費者 需要與 消息中間件之間建立多個連接。無論怎樣,同時開啟多個 TCP 連接都是不合適的,因為這樣做會消耗掉過多的系統(tǒng)資源。AMQP 協(xié)議提供了信道(channel)這個概念來處理多連接,可以把通道理解成共享一個 TCP 連接的多個輕量化連接。一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個 AMQP 方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個信道準備的。

消息中間件相關(guān)的概念:

  • 虛擬主機(vHosts):虛擬主機概念,一個 Virtual Host 里面可以有若干個 Exchange 和 Queue,我們可以控制用戶在 Virtual Host 的權(quán)限。
  • 用戶(User):最直接了當?shù)恼J證方式,誰可以使用當前的消息中間件。
  • 交換機(Exchange):交換機接收生產(chǎn)者發(fā)出的消息并且路由到由交換機類型和被稱作綁定(bindings)的規(guī)則所決定的到隊列中,交換機不存儲消息。
  • 消息(message):生產(chǎn)者產(chǎn)生的和消費者處理的消息。
  • 路由鍵(routing key):路由關(guān)鍵字,交換機 exchange 的路由規(guī)則利用這個關(guān)鍵字進行消息投遞到消息隊列。(路由鍵長度不能超過 255 個字節(jié))
  • 綁定(Binding):Binding 可以理解為交換機 Exchange 路由消息到消息隊列的路由規(guī)則關(guān)系(即消息隊列和交換機的綁定)。當交換機 Exchange 收到生產(chǎn)者傳遞的消息 Message 時會解析其 Routing Key,Exchange 根據(jù) Routing Key 與交換機類型 Exchange Type 將 Message 路由到消息隊列中去。

SpringBoot整合RabbitMQ

簡單了解了rabbitmq的概念,我們來實踐一下與springboot的整合。
首先引入jar包。


<groupId>org.springframework.boot</groupId>

    <artifactId>spring-boot-starter-amqp</artifactId>

    <version>1.7.9.RELEASE</version>

</dependency>

配置連接工廠

    /**
     * 創(chuàng)建連接工廠
     * @return
     */
    @Bean(name = "adapterConnectionFactory")
    @Order(value = 2)
    public ConnectionFactory adapterConnectionFactory() {
        //創(chuàng)建連接工廠
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        //設置集群方式
        connectionFactory.setAddresses(rabbitMQProperties.getAddress());
        //connectionFactory.setHost(rabbitMQProperties.getHost());設置單節(jié)點方式
        //設置端口
        connectionFactory.setPort(rabbitMQProperties.getPort());
        //設置用戶名
        connectionFactory.setUsername(rabbitMQProperties.getUsername());
        //設置密碼
        connectionFactory.setPassword(rabbitMQProperties.getPassword());
        //設置虛擬主機
        connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
        //消息確認機制confirm-callback或return-callback,成功后confirm,失敗后回調(diào)
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

消息發(fā)送的組件RabbitTemplate

    /**
     * 創(chuàng)建消息發(fā)送組件
     * @return
     */
    @Bean(name = "adapterRabbitTemplate")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(adapterConnectionFactory());
        //exchange根據(jù)路由鍵匹配不到對應的queue時將會調(diào)用basic.return將消息返還給生產(chǎn)者
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            //消息成功發(fā)送到broker
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                ApiLog.info("mq message send (ACK)status =", ack);
            }
        });
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
             //消息發(fā)送失敗
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });
        return rabbitTemplate;
    }

監(jiān)聽器容器工廠SimpleRabbitListenerContainerFactory

     /**
     * 消費者監(jiān)聽
     *
     * @return
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(adapterConnectionFactory());
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //單臺并發(fā)消費者數(shù)量
        factory.setConcurrentConsumers(10);
        //單臺并發(fā)消費的最大消費者數(shù)量
        factory.setMaxConcurrentConsumers(30);
        //預取消費數(shù)量,unacked數(shù)量超過這個值broker將不會接收消息
        factory.setPrefetchCount(5);
        //有事務時處理的消息數(shù)
        factory.setTxSize(1);
        //消息確認機制
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //構(gòu)建retryConfig,用于在JavaConfig的模式下讀取并發(fā)參數(shù)
        RabbitProperties.AmqpContainer config = rabbitMQProperties.getListener().getSimple();
        RabbitProperties.ListenerRetry retryConfig = config.getRetry();
        RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
                ? RetryInterceptorBuilder.stateless()
                : RetryInterceptorBuilder.stateful());
        //最大重試次數(shù),消費者異常之后
        builder.maxAttempts(retryConfig.getMaxAttempts());
        builder.backOffOptions(retryConfig.getInitialInterval(),
                retryConfig.getMultiplier(), retryConfig.getMaxInterval());
        MessageRecoverer recoverer = (this.messageRecoverer != null
                ? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
        builder.recoverer(recoverer);
        factory.setAdviceChain(builder.build());
        return factory;
    }

重試參數(shù)說明,其他參數(shù)略

spring.rabbitmq.listener.simple.retry.enabled=true
//消費者異常之后的最大重試次數(shù),JavaConfig方式需顯示構(gòu)建retryConfig
spring.rabbitmq.listener.simple.retry.max-attempts=4
spring.rabbitmq.listener.simple.retry.initial-interval=2000
spring.rabbitmq.listener.simple.default-requeue-rejected=true

最佳實踐

初始化

 @Autowired
    private ConnectionFactory connectionFactory;

    @Value("${mq.queue.callback_queue}")
    private String callbackQueueKey;


    @Value("${mq.exchange}")
    private String exchange;

    @PostConstruct
    public void init(){
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        //聲明exchange
        Exchange topicExchange = new TopicExchange(exchange);
        admin.declareExchange(topicExchange);
        //聲明queue
        Queue callbackQueue = new Queue(callbackQueueKey, true);
        admin.declareQueue(callbackQueue);
        //Binding
        admin.declareBinding(BindingBuilder.bind(callbackQueue)
                .to(topicExchange)
                .with(callbackQueueKey).noargs());
    }

生產(chǎn)者

        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setExchange(exchange);
        rabbitTemplate.setRoutingKey(callbackQueueKey);
        rabbitTemplate.convertAndSend(callBackRequest, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties properties = message.getMessageProperties();
                properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Obj.class);
                return message;
            }
        });

消費者

@RabbitListener(queues = "queueName", containerFactory = "singleListenerContainer")
    public void consumeMessage(@Payload Object obj, Channel channel, Message message) {
    doSometing...
}

需要注意的是,消費者的異常沒有捕獲或拋出,或者catch塊里出現(xiàn)異常,在消息確認機制是AUTO的前提下將會無限重試進入死循環(huán),這個時候可以設置最大重試次數(shù)或手動進行ack來處理。

如果需要手動ack,需要實現(xiàn)ChannelAwareMessageListener

@Override
public void onMessage(Message message, Channel channel) throws Exception {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

以上只是項目使用中的簡單闡述,除此還有exchange的4種模式,死信隊列,消息堆積,可靠性投遞機制等待補充。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,200評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,525評論 19 139
  • 這個指導提供一個AMQP 0-9-1協(xié)議的概述,它是RabbitMq支持的一個協(xié)議。 什么是AMQP 0-9-1?...
    浪_6e80閱讀 815評論 0 1
  • 什么叫消息隊列? 消息(Message)是指在應用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復...
    Agile_dev閱讀 2,432評論 0 24
  • AMQP大致內(nèi)容就是,將消息和隊列綁定起來,規(guī)定讓進入到交換機中的具有某個路由鍵的消息進入到指定隊列中去。 Rab...
    StevenMD閱讀 1,988評論 0 3

友情鏈接更多精彩內(nèi)容