RabbitMQ基本原理

一、基本結(jié)構(gòu)

工作原理

所有中間件技術(shù)都是基于 TCP/IP 協(xié)議基礎(chǔ)之上進(jìn)行構(gòu)建新的協(xié)議規(guī)范,RabbitMQ遵循的是AMQP協(xié)議(Advanced Message Queuing Protocol - 高級消息隊列協(xié)議)。
生產(chǎn)者發(fā)送消息流程:

  • 1、生產(chǎn)者和Broker建立TCP連接;
  • 2、生產(chǎn)者和Broker建立通道;
  • 3、生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā);
  • 4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊列)。

【詳細(xì)】

1、消息生產(chǎn)者連接到`RabbitMQ Broker`,建立鏈接(Connection),在鏈接(Connection)上開啟一個信道(Channel);
2、聲明一個交換機(jī)(Exchange),并設(shè)置相關(guān)屬性,比如交換機(jī)類型、是否持久化等;
3、聲明一個隊列(Queue),并設(shè)置相關(guān)屬性,比如是否排他、是否持久化、是否自動刪除等;
4、使用路由鍵(RoutingKey)將隊列(Queue)和交換機(jī)(Exchange)綁定起來;
5、生產(chǎn)者發(fā)送消息至 RabbitMQ Broker,其中包含路由鍵、交換器等信息,根據(jù)路由鍵(RoutingKey)發(fā)送消息到交換機(jī)(Exchange);
6、相應(yīng)的交換器(Exchange)根據(jù)接收到的路由鍵(RoutingKey)查找相匹配的隊列如果找到 ,則將從生產(chǎn)者發(fā)送過來的消息存入相應(yīng)的隊列中;
7、如果沒有找到 ,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者;
8、關(guān)閉信道(Channel);
9、關(guān)閉鏈接(Connection);

消費者接收消息流程:

  • 1、消費者和Broker建立TCP連接;
  • 2、消費者和Broker建立通道;
  • 3、消費者監(jiān)聽指定的Queue(隊列);
  • 4、當(dāng)有消息到達(dá)QueueBroker默認(rèn)將消息推送給消費者;
  • 5、消費者接收到消息;
  • 6、ack回復(fù)。

【詳細(xì)】

- 1、建立鏈接(Connection);
- 2、在鏈接(Connection)上開啟一個信道(Channel);
- 3、請求消費指定隊列(Queue)的消息,并設(shè)置回調(diào)函數(shù)(onMessage);
- 4、[MQ]將消息推送給消費者,消費者接收消息;
- 5、消費者發(fā)送消息確定(Ack[acknowledge]);
- 6、[MQ]刪除被確認(rèn)的消息;
- 7、關(guān)閉信道(Channel);
- 8、關(guān)閉鏈接(Connection);

MQ消費消息分發(fā)原理

1)一種是Pull模式,對應(yīng)的方法是basicGet。
消息存放在服務(wù)端,只有消費者主動獲取才能拿到消息。如果每擱一段時間獲取一次消息,消息的實時性會降低。
但是好處是可以根據(jù)自己的消費能力決定消息的頻率。

2)另一種是push,對應(yīng)的方法是BasicConsume,只要生產(chǎn)者發(fā)消息到服務(wù)器,就馬上推送給消費者,
消息保存客戶端,實時性很高,如果消費不過來有可能會造成消息積壓。Spring AMQP是push方式,
通過事件機(jī)制對隊列進(jìn)行監(jiān)聽,只要有消息到達(dá)隊列,就會觸發(fā)消費消息的方法。

二、RabbitMQ組成部分說明

  • Producer: 消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送;

  • Connection:TCP連接,生產(chǎn)者或消費者與消息隊列RabbitMQ版間的物理TCP連接;

    1)Connection會執(zhí)行認(rèn)證、IP解析、路由等底層網(wǎng)絡(luò)任務(wù)。
    2)應(yīng)用與消息隊列RabbitMQ版完成Connection建立大約需要15個TCP報文交互,因而會消耗大量的網(wǎng)絡(luò)資源和消息隊列RabbitMQ版資源。
    3)一個進(jìn)程對應(yīng)一個Connection,一個進(jìn)程中的多個線程則分別對應(yīng)一個Connection中的多個Channel。
    4)Producer和Consumer分別使用不同的Connection進(jìn)行消息發(fā)送和消費;

  • Channel:在客戶端的每個物理TCP連接里,可建立多個Channel,每個Channel代表一個會話任務(wù)。

    1)Channel是物理TCP連接中的虛擬連接。
    2)當(dāng)應(yīng)用通過Connection與消息隊列RabbitMQ版建立連接后,所有的AMQP協(xié)議操作(例如創(chuàng)建隊列、發(fā)送消息、接收消息等)都會通過Connection中的Channel完成。
    3) Channel可以復(fù)用Connection,即一個Connection下可以建立多個Channel。
    4) Channel不能脫離Connection獨立存在,而必須存活在Connection中。
    5) 當(dāng)某個Connection斷開時,該Connection下的所有Channel都會斷開。

  • Broker:消息隊列服務(wù)進(jìn)程,此進(jìn)程包括兩個部分:Exchange和Queue;

  • Exchange(交換器):生產(chǎn)者將消息發(fā)送到Exchange,由Exchange將消息路由到一個或多個Queue中。Exchange根據(jù)消息的屬性或內(nèi)容路由消息。

  • Queue:消息隊列,存儲消息的隊列,每個消息都會被投入到一個或多個Queue里;

  • Consumer:消息消費者,即消費方客戶端,接收MQ轉(zhuǎn)發(fā)的消息;

  • Routing Key(路由鍵):生產(chǎn)者在向Exchange發(fā)送消息時,需要指定一個Routing Key來設(shè)定該消息的路由規(guī)則。 Routing Key需要與Exchange類型及Binding Key聯(lián)合使用才能生效。一般情況下,生產(chǎn)者在向Exchange發(fā)送消息時,可以通過指定Routing Key來決定消息被路由到哪個或哪些Queue;

  • Binding:一套綁定規(guī)則,用于告訴Exchange消息應(yīng)該被存儲到哪個Queue。它的作用是把Exchange和Queue按照路由規(guī)則綁定起來。

  • Binding Key(綁定鍵):用于告知Exchange應(yīng)該將消息投遞到哪些Queue中(生產(chǎn)者將消息發(fā)送給哪個Exchange是需要由RoutingKey決定的,生產(chǎn)者需要將Exchange與哪個隊列綁定時需要由BindingKey決定的);

  • Virtual Host:虛擬主機(jī),本質(zhì)上是一個mini版的RabbitMQ服務(wù)器,擁有自己的隊列、交換機(jī)、綁定和權(quán)限機(jī)制,vhost是共享相同的身份認(rèn)證和加密環(huán)境的獨立服務(wù)器域。vhost是AMQP的基礎(chǔ),必須在連接時指定,RabbitMQ默認(rèn)的vhost是/。

三、交換模式

Direct Exchange(直連模式)

【路由規(guī)則】 Direct Exchange根據(jù)Binding Key和Routing Key完全匹配的規(guī)則路由消息。
【使用場景】 Direct Exchange適用于通過簡單字符標(biāo)識符區(qū)分消息的場景。
Direct Exchange常用于單播路由。


Direct

Fanout Exchange(廣播模式)

【路由規(guī)則】 Fanout Exchange忽略Routing Key和Binding Key的匹配規(guī)則,將消息路由到所有綁定的Queue。
【使用場景】 Fanout Exchange適用于廣播消息的場景。例如,分發(fā)系統(tǒng)使用Fanout Exchange來廣播各種狀態(tài)和配置更新。


廣播模式

Topic Exchange(主題模式)

【路由規(guī)則】 Topic Exchange根據(jù)Binding Key和Routing Key通配符匹配的規(guī)則路由消息。
Topic Exchange支持的通配符包括星號(*)和井號(#)。 星號(*)代表一個英文單詞(例如cn)。 井號(#)代表零個、一個或多個英文單詞,英文單詞間通過英文句號(.)分隔,例如cn.zj.hz。
【使用場景】 Topic Exchange適用于通過通配符區(qū)分消息的場景。
Topic Exchange常用于多播路由。例如,使用Topic Exchange分發(fā)有關(guān)于特定地理位置的數(shù)據(jù)。

主題模式

Headers Exchange (頭部交換機(jī))

【路由規(guī)則】 Headers Exchange可以被視為Direct Exchange的另一種表現(xiàn)形式。
Headers Exchange可以像Direct Exchange一樣工作,不同之處在于Headers Exchange使用Headers屬性代替Routing Key進(jìn)行路由匹配。
在綁定Headers Exchange和Queue時,可以設(shè)置綁定屬性的鍵值對。然后,在向Headers Exchange發(fā)送消息時,設(shè)置消息的Headers屬性鍵值對。
Headers Exchange將根據(jù)消息Headers屬性鍵值對和綁定屬性鍵值對的匹配情況路由消息。
匹配算法由一個特殊的綁定屬性鍵值對控制。該屬性為x-match,只有以下兩種取值:

  • 1)all:所有除x-match以外的綁定屬性鍵值對必須和消息Headers屬性鍵值對匹配才會路由消息。
  • 2)any:只要有一組除x-match以外的綁定屬性鍵值對和消息Headers屬性鍵值對匹配就會路由消息。
    以下兩種情況下,認(rèn)為消息Headers屬性鍵值對和綁定屬性鍵值對匹配:
    • 1、 消息Headers屬性的鍵和值與綁定屬性的鍵和值完全相同;
    • 2、 消息Headers屬性的鍵和綁定屬性的鍵完全相同,但綁定屬性的值為空。

【使用場景】 Headers Exchange適用于通過多組Headers屬性區(qū)分消息的場景。Headers Exchange常用于多播路由。例如,涉及到分類或者標(biāo)簽的新聞更新。

生產(chǎn)者確認(rèn)機(jī)制

1、確認(rèn)原理
生產(chǎn)者將消息發(fā)送到exchange,exchange根據(jù)路由規(guī)則將消息投遞到了queue。

  • 1)Confirm確認(rèn):生產(chǎn)者發(fā)送消息到交換機(jī)時會存在消息丟失的情景,開啟事務(wù)會導(dǎo)致吞吐量下降,Confirm機(jī)制就是消息發(fā)送到交換機(jī)(Exchange)時會觸發(fā)Confirm回調(diào)。通過 publisher confirm (發(fā)送方確認(rèn)機(jī)制)可以確定消息是否被成功路由到MQ broker從而選擇是否重發(fā)等步驟。當(dāng)生產(chǎn)者開啟 publisher confirm 消息發(fā)送到MQ端之后,MQ會回一個ack給生產(chǎn)者,ack是個boolean值,為true消息成功發(fā)送到MQ。反之發(fā)送失敗。
  • 2)Return確認(rèn):從交換機(jī)到隊列也有可能出現(xiàn)路由失敗導(dǎo)致消息丟失情景(可能是MQ出問題導(dǎo)致queue和exchange綁定丟失,或者失誤刪除了綁定關(guān)系等),Return機(jī)制可解決這個問題,路由失敗時可以通過Return回調(diào)來將路由失敗的消息記錄下來。

消費者確認(rèn)機(jī)制

1、消費者確認(rèn)原理
消費者確認(rèn)是指當(dāng)一條消息投遞到消費者處理后,消費者發(fā)送給MQ broker的確認(rèn)
(通俗的說就是 告知服務(wù)器這條消息已經(jīng)被我消費了,可以在隊列刪掉 ,這樣以后就不會再發(fā)了, 否則消息服務(wù)器以為這條消息沒處理掉 重啟應(yīng)用后還會在發(fā))。

有auto和manual兩種

  • 1)auto則由broker自行選擇時機(jī),一般可認(rèn)為消息發(fā)送到消費者后就直接被ack,也即消息會被從隊列中移除掉而不顧消息的處理邏輯是否成功;

  • 2)manual則是需要消費者顯式的去手動ack后消息才會被從隊列中移除掉,通過這個機(jī)制可以限制在消息處理完之后再Ack或者nack; 開啟手動確認(rèn)模式,即由消費方自行決定何時應(yīng)該ack,通過設(shè)置autoAck=false開啟手動確認(rèn)模式;

消息持久化

消息發(fā)送并保存到隊列之后如果不做特殊處理是保存在內(nèi)存中,當(dāng)節(jié)點宕機(jī)重啟或者內(nèi)存故障等,會導(dǎo)致消息丟失,通過對消息進(jìn)行持久化到磁盤可以降低這種風(fēng)險, 除了對消息進(jìn)行持久化還是不夠,還需要對queue、exchange進(jìn)行持久化。

RabbitMQ解決消息丟失問題

消息確認(rèn)機(jī)制

RabbitMQ提供了消息確認(rèn)機(jī)制,即生產(chǎn)者在發(fā)送消息后,可以等待RabbitMQ服務(wù)器返回確認(rèn)信息,以確保消息已經(jīng)被正確地接收和處理。如果RabbitMQ服務(wù)器沒有返回確認(rèn)信息,生產(chǎn)者可以選擇重新發(fā)送消息或者采取其他的補(bǔ)救措施。

生產(chǎn)者確認(rèn)消息和重試

  1. 使用緩存:在confirmCallback中,將ackfalse的消息存到緩存中。然后,可以使用另外的線程或者定時任務(wù)來處理這些失敗的消息,進(jìn)行重試。

  2. 設(shè)置重試次數(shù):為了避免無限重試,我們可以設(shè)置一個重試次數(shù)的上限。當(dāng)達(dá)到這個上限后,我們可以選擇將消息發(fā)送到死信隊列,或者進(jìn)行其他的錯誤處理。

  3. 使用死信隊列:在RabbitMQ中,我們可以設(shè)置一個死信隊列來存儲那些無法被正常處理的消息。當(dāng)消息在主隊列中被拒絕或者過期后,它們會被發(fā)送到死信隊列。然后,我們可以對死信隊列中的消息進(jìn)行人工處理,或者在一段時間后再次進(jìn)行處理。

事務(wù)機(jī)制

1.首先需要配置一個事務(wù)管理器

 @Bean
 PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
      return new RabbitTransactionManager(connectionFactory);
  }

2.然后在生產(chǎn)者上添加事務(wù)注解以及設(shè)置通信通道為事務(wù)模式。

@Transactional

  1. 開啟事務(wù)機(jī)制就三步:
  • 配置事務(wù)管理器
  • 使用 @Transactional 注解開啟事務(wù)
  • 調(diào)用 setChannelTransacted 方法設(shè)置消息通道為事務(wù)模式,即設(shè)置為 true

4.當(dāng)我們開啟事務(wù)模式之后,RabbitMQ 生產(chǎn)者發(fā)送消息會有這樣幾個步驟:

  • (1) 客服端發(fā)出請求,將通信管道設(shè)置為事務(wù)模式
  • (2) 服務(wù)端給出回復(fù),同意將通信管道設(shè)置為事務(wù)模式
  • (3) 客戶端發(fā)送消息
  • (4) 客戶端提交事務(wù)
  • (5) 服務(wù)端給出響應(yīng),確認(rèn)事務(wù)提交

6.兩步 RabbitMQ 都有提供解決的方案。那么,如果確保消息成功到達(dá) RabbitMQ 呢?

  • (1) 開啟事務(wù)機(jī)制

  • (2) 發(fā)送方確認(rèn)機(jī)制
    注意:這是兩種不同的方案,不可以同時開啟,只能二選其一。如果同時開啟,則會報錯

消息持久化

RabbitMQ支持將消息持久化到磁盤,即使RabbitMQ服務(wù)器宕機(jī)或重啟,消息也不會丟失。在發(fā)布消息時,可以設(shè)置消息的持久化標(biāo)志,這樣消息就會被寫入磁盤中,而不是僅僅保存在內(nèi)存中。
其中我們交換機(jī)和隊列都要設(shè)置對應(yīng)的持久化,在創(chuàng)建時,我們會設(shè)置持久化參數(shù)。同時為了避免單點故障,RabbitMQ應(yīng)該做成集群模式,以免一臺機(jī)器損壞,出現(xiàn)數(shù)據(jù)丟失的問題。

消費端確認(rèn)和重試

對于消費者來說,該配置不僅起到了連接作用,同時也啟動了重試機(jī)制,默認(rèn)重試 2 次。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.listener.simple.retry.enabled=true # 開啟消費者重試機(jī)制
spring.rabbitmq.listener.simple.retry.max-attempts=3 # 最大重試次數(shù)
spring.rabbitmq.listener.simple.retry.initial-interval=3000 # 重試時間間隔

確認(rèn)的話需要我們做簽收操作

 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

死信隊列配置

被拒收的消息,或者是過期的消息,或者是隊列已經(jīng)滿了的消息,都會進(jìn)入死信隊列,死信隊列有一個默認(rèn)的生效時間,如果沒有做任務(wù)配置,到了時間會自動刪除消息。
java中配置死信隊列

    /**
     * 延遲隊列,又叫死信隊列 “
     *
     * @return
     */
    @Bean
    public Queue delayQueue() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "test_ex");
        arguments.put("x-dead-letter-routing-key", "test_ex.dead");
        // 消息過期時間 2分鐘
        arguments.put("x-message-ttl", 60000);
        return new Queue("delayQueue", true, false, false, arguments);
    }

以上參數(shù)說明:

  • x-dead-letter-exchange:死信隊列過期以后往指定交換機(jī)發(fā)
  • x-dead-letter-routing-key:死信隊列過期指定路由鍵
  • x-message-ttl: 死信隊列過期時間,單位是毫秒

RabbitMQ解決消息積壓問題

RabbitMQ消息積壓問題通常是由于消費者無法及時消費消息或消費速度過慢或發(fā)送者流量太大導(dǎo)致的。以下是一些解決方法:

1.增加消費者數(shù)量:可以通過增加消費者的數(shù)量來提高消費速度,減少消息積壓??梢酝ㄟ^添加更多的消費者進(jìn)程或者增加消費者的線程數(shù)來實現(xiàn)。

2.調(diào)整消費者的QoS參數(shù):消費者的QoS參數(shù)可以控制消費者每次從RabbitMQ服務(wù)器獲取的消息數(shù)量,以及未確認(rèn)消息的最大數(shù)量??梢赃m當(dāng)調(diào)整這些參數(shù),以減少消息積壓。

3.設(shè)置消費者的超時時間:可以設(shè)置消費者的超時時間,如果消費者在指定的時間內(nèi)沒有消費消息,就將消息重新投遞到隊列中,以便其他消費者消費。

4.增加隊列的容量:可以增加隊列的容量,以便存儲更多的消息。但是,如果隊列容量過大,可能會導(dǎo)致內(nèi)存占用過高,影響系統(tǒng)的性能。

5.使用死信隊列:可以將未能及時消費的消息轉(zhuǎn)移到死信隊列中,以便后續(xù)處理??梢栽O(shè)置死信隊列的超時時間,以便在一定時間內(nèi)處理這些消息。

6.監(jiān)控和調(diào)整:可以使用RabbitMQ的監(jiān)控工具來監(jiān)控隊列的狀態(tài)和消費者的消費速度,及時發(fā)現(xiàn)并解決消息積壓問題。

RabbitMQ解決消息重復(fù)消費問題

RabbitMQ提供了消息去重的機(jī)制來解決消息重復(fù)消費的問題。具體來說,可以使用以下兩種方式來實現(xiàn):

1.消息去重插件
RabbitMQ提供了一個消息去重插件,可以通過在RabbitMQ節(jié)點上安裝該插件來實現(xiàn)消息去重。該插件會在消息傳輸之前對消息進(jìn)行唯一性校驗,如果消息已經(jīng)被消費過,那么該消息將被丟棄。該插件的實現(xiàn)原理是將已經(jīng)消費過的消息ID保存在內(nèi)存中,當(dāng)新消息到達(dá)時,會檢查該消息ID是否已經(jīng)存在,如果存在則丟棄該消息。

2.消息冪等性設(shè)計
消息冪等性是指對于同一條消息,無論消費多少次,最終的結(jié)果都是一致的。因此,可以通過在消息的生產(chǎn)者或消費者端實現(xiàn)消息冪等性來解決消息重復(fù)消費的問題。具體實現(xiàn)方式包括:

  • 在消息生產(chǎn)者端,為每條消息生成唯一的ID,將該ID與消息一起發(fā)送到RabbitMQ,消費者在消費消息時根據(jù)該ID進(jìn)行冪等性校驗;
  • 在消息消費者端,記錄已經(jīng)消費過的消息ID,當(dāng)重復(fù)消費同一條消息時,直接忽略該消息。

需要注意的是,實現(xiàn)消息冪等性需要考慮業(yè)務(wù)邏輯的復(fù)雜性和消息處理的性能。如果業(yè)務(wù)邏輯比較簡單,可以通過對消息進(jìn)行去重來解決問題;如果業(yè)務(wù)邏輯比較復(fù)雜,可以通過實現(xiàn)消息冪等性來保證消息的正確性。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、背景 RabbitMQ是一個由erlang開發(fā)的AMQP(Advanved Message Queue)的開源...
    初心myp閱讀 1,035評論 0 1
  • 1.RabbitMQ(基本理解) 2.學(xué)前必讀 3.事宜人群 4.學(xué)習(xí)順序 5.基本流程圖 也可以定義虛擬主機(jī) (...
    hello9geg閱讀 407評論 0 0
  • 01AMQP協(xié)議 1.1概述 AMQP:是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列...
    程序員姜戈閱讀 641評論 0 0
  • 一、RabbitMQ的工作原理 RabbitMQ是一種開源的消息隊列中間件,它實現(xiàn)了高級消息隊列協(xié)議(AMQP)的...
    眺望77閱讀 565評論 0 0
  • 1.RabbitMQ和消息隊列是什么? (1)RabbitMQ其實就是一個消息中間件,主要就是負(fù)責(zé) 消息發(fā)送和消息...
    奔向金字塔閱讀 807評論 0 0

友情鏈接更多精彩內(nèi)容