RabbitMQ 簡(jiǎn)介

RabbitMQ 簡(jiǎn)介

1. RabbitMQ 介紹

    RabbitMQ,俗稱“兔子MQ”(可見(jiàn)其輕巧,敏捷),是目前非常熱門的一款開(kāi)源消息中間件,不管是互聯(lián)網(wǎng)

行業(yè)還是傳統(tǒng)行業(yè)都廣泛使用(最早是為了解決電信行業(yè)系統(tǒng)之間的可靠通信而設(shè)計(jì))。

1. 高可靠性、易擴(kuò)展(集群、橫向擴(kuò)展、插件)、高可用、功能豐富等 

2. 支持大多數(shù)(甚至冷門)的編程語(yǔ)言客戶端。

3. RabbitMQ遵循AMQP協(xié)議,自身采用Erlang(一種由愛(ài)立信開(kāi)發(fā)的通用面向并發(fā)編程的語(yǔ)言)編寫。

4. RabbitMQ也支持MQTT等其他協(xié)議。

RabbitMQ具有很強(qiáng)大的插件擴(kuò)展能力,官方和社區(qū)提供了非常豐富的插件可供選擇:

https://www.rabbitmq.com/community-plugins.html

1.1 RabbitMQ整體邏輯架構(gòu)

image-20210121091640247.png

1.2 Exchange類型

    RabbitMQ常用的交換器類型有: `fanout` 、 `direct` 、 `topic` 、 `headers` 四種。  

1.2.1 Fanout

    扇出交換器

    會(huì)把所有發(fā)送到該交換器的消息路由到所有與該交換器綁定的隊(duì)列中,如圖:  
image-20210121091848196.png

生產(chǎn)者發(fā)送消息到MQ,消費(fèi)者在線可以收到該消息,不在線的消費(fèi)者上線后不能收到上線之前生產(chǎn)者發(fā)送的消息

1.2.2 Direct

    直接交換器

    direct類型的交換器路由規(guī)則很簡(jiǎn)單,它會(huì)把消息路由到那些BindingKey和RoutingKey完全匹配的隊(duì)列中,

如下圖:

image-20210121091932351.png

1.2.3 Topic

    主題交換器

    topic類型的交換器在direct匹配規(guī)則上進(jìn)行了擴(kuò)展,也是將消息路由到BindingKey和RoutingKey相匹配的隊(duì)

列中,這里的匹配規(guī)則稍微不同,它約定:

BindingKey和RoutingKey一樣都是由"."分隔的字符串;BindingKey中可以存在兩種特殊字符“”和“#”,用于模糊匹配,其中""用于匹配一個(gè)單詞,"#"用于匹配多個(gè)單詞(可以是0個(gè))。

image-20210121092030157.png

1.2.4 Headers

    headers類型的交換器不依賴于路由鍵的匹配規(guī)則來(lái)路由信息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)

行匹配。在綁定隊(duì)列和交換器時(shí)指定一組鍵值對(duì),當(dāng)發(fā)送的消息到交換器時(shí),RabbitMQ會(huì)獲取到該消息的

headers,對(duì)比其中的鍵值對(duì)是否完全匹配隊(duì)列和交換器綁定時(shí)指定的鍵值對(duì),如果匹配,消息就會(huì)路由到該隊(duì)

列。headers類型的交換器性能很差,不實(shí)用。

1.3 RabbitMQ數(shù)據(jù)存儲(chǔ)

1.3.1 消息類型

RabbitMQ消息有兩種類型:

  • 持久化消息
  • 非持久化消息

這兩種消息都會(huì)被寫入磁盤。

持久化消息在到達(dá)隊(duì)列時(shí)寫入磁盤,同時(shí)會(huì)內(nèi)存中保存一份備份,當(dāng)內(nèi)存吃緊時(shí),消息從內(nèi)存中清除。這會(huì)提高一定的性能。

非持久化消息一般只存于內(nèi)存中,當(dāng)內(nèi)存壓力大時(shí)數(shù)據(jù)刷盤處理,以節(jié)省內(nèi)存空間。

1.3.2 存儲(chǔ)層

RabbitMQ存儲(chǔ)層包含兩個(gè)部分:隊(duì)列索引和消息存儲(chǔ)。

image-20210121092348344.png

1.3.2.1 隊(duì)列索引

**隊(duì)列索引:rabbit_queue_index **

索引維護(hù)隊(duì)列的落盤消息的信息,如存儲(chǔ)地點(diǎn)、是否已被給消費(fèi)者接收、是否已被消費(fèi)者ack等。

每個(gè)隊(duì)列都有相對(duì)應(yīng)的索引。

    索引使用順序的段文件來(lái)存儲(chǔ),后綴為.idx,文件名從0開(kāi)始累加,每個(gè)段文件中包含固定的`segment_entry_count`條記錄,默認(rèn)值是**16384**。每個(gè)index從磁盤中讀取消息的時(shí)候,**至少要在內(nèi)存

中維護(hù)一個(gè)段文件,所以設(shè)置 queue_index_embed_msgs_below值得時(shí)候要格外謹(jǐn)慎**,一點(diǎn)點(diǎn)增大也
可能會(huì)引起內(nèi)存爆炸式增長(zhǎng)。

1.3.2.2 消息存儲(chǔ)
    **消息存儲(chǔ): rabbit_msg_store  **

    消息以**鍵值對(duì)的形式**存儲(chǔ)到文件中,一個(gè)虛擬主機(jī)上的所有隊(duì)列使用同一塊存儲(chǔ),每個(gè)節(jié)點(diǎn)只有一個(gè)。存儲(chǔ)

分為持久化存儲(chǔ)(msg_store_persistent)和短暫存儲(chǔ)(msg_store_transient)。持久化存儲(chǔ)的內(nèi)容在broker重

啟后不會(huì)丟失,短暫存儲(chǔ)的內(nèi)容在broker重啟后丟失。

    store使用文件來(lái)存儲(chǔ),后綴為.rdq,經(jīng)過(guò)store處理的所有消息都會(huì)以追加的方式寫入到該文件中,當(dāng)該文件

的大小超過(guò)指定的限制(file_size_limit)后,將會(huì)關(guān)閉該文件并創(chuàng)建一個(gè)新的文件以供新的消息寫入。文件名從0

開(kāi)始進(jìn)行累加。在進(jìn)行消息的存儲(chǔ)時(shí),RabbitMQ會(huì)在ETS(Erlang TermStorage)表中記錄消息在文件中的位置

映射和文件的相關(guān)信息。

    消息(包括消息頭、消息體、屬性)可以直接存儲(chǔ)在index中,也可以存儲(chǔ)在store中。最佳的方式是較小的消息存在index中,而較大的消息存在store中。這個(gè)消息大小的界定可以通過(guò)`queue_index_embed_msgs_below`來(lái)配置,默認(rèn)值為4096B。當(dāng)一個(gè)消息小于設(shè)定的大小閾值時(shí),就可以存儲(chǔ)在index中,這樣性能上可以得到優(yōu)化。**一個(gè)完整的消息**大小小于這個(gè)值,就放到索引中,否則放到持久化消息文件中。  

    rabbitmq.conf中的配置信息:  
## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb
如果消息小于這個(gè)值,就在索引中存儲(chǔ),如果消息大于這個(gè)值就在store中存儲(chǔ):  

大于這個(gè)值的消息存儲(chǔ)于msg_store_persistent目錄中的<num>.rdq文件中:  

小于這個(gè)值的消息存儲(chǔ)于<num>.idx索引文件中:  



讀取消息時(shí),先根據(jù)消息的ID(msg_id)找到對(duì)應(yīng)存儲(chǔ)的文件,如果文件存在并且未被鎖住,則直接打開(kāi)文件,從指定位置讀取消息內(nèi)容。如果文件不存在或者被鎖住了,則發(fā)送請(qǐng)求由store進(jìn)行處理。  

刪除消息時(shí),只是從ETS表刪除指定消息的相關(guān)信息,同時(shí)更新消息對(duì)應(yīng)的存儲(chǔ)文件和相關(guān)信息。在執(zhí)行消息刪除操作時(shí),并不立即對(duì)文件中的消息進(jìn)行刪除,也就是說(shuō)消息依然在文件中,僅僅是標(biāo)記為垃圾數(shù)據(jù)而已。當(dāng)一個(gè)文件中都是垃圾數(shù)據(jù)時(shí)可以將這個(gè)文件刪除。當(dāng)檢測(cè)到前后兩個(gè)文件中的有效數(shù)據(jù)可以合并成一個(gè)文件,并且所有的垃圾數(shù)據(jù)的大小和所有文件(至少有3個(gè)文件存在的情況下)的數(shù)據(jù)大小的比值超過(guò)設(shè)置的閾值garbage_fraction(默認(rèn)值0.5)時(shí),才會(huì)觸發(fā)垃圾回收,將這兩個(gè)文件合并,執(zhí)行合并的兩個(gè)文件一定是邏輯上相鄰的兩個(gè)文件。合并邏輯:  
  1. 鎖定這兩個(gè)文件

  2. 先整理前面的文件的有效數(shù)據(jù),再整理后面的文件的有效數(shù)據(jù)

  3. 將后面文件的有效數(shù)據(jù)寫入到前面的文件中

  4. 更新消息在ETS表中的記錄

  5. 刪除后面文件

image-20210121093527206.png

1.3.3 隊(duì)列結(jié)構(gòu)

    通常隊(duì)列由rabbit_amqqueue_process和backing_queue這兩部分組成,rabbit_amqqueue_process負(fù)責(zé)協(xié)議相關(guān)的消息處理,即接收生產(chǎn)者發(fā)布的消息、向消費(fèi)者交付消息、處理消息的確認(rèn)(包括生產(chǎn)端的confirm和消費(fèi)端的ack)等。backing_queue是消息存儲(chǔ)的具體形式和引擎,并向rabbit_amqqueue_process提供相關(guān)的接口以供調(diào)用。   
image-20210121093716111.png
    如果消息投遞的目的隊(duì)列是空的,并且有消費(fèi)者訂閱了這個(gè)隊(duì)列,那么該消息會(huì)直接發(fā)送給消費(fèi)

者,不會(huì)經(jīng)過(guò)隊(duì)列這一步。當(dāng)消息無(wú)法直接投遞給消費(fèi)者時(shí),需要暫時(shí)將消息存入隊(duì)列,以便重新投
遞。

    `rabbit_variable_queue.erl `源碼中定義了RabbitMQ隊(duì)列的**4種狀態(tài)**:  

        1. alpha:消息索引和消息內(nèi)容都存內(nèi)存,最耗內(nèi)存,很少消耗CPU

        2. beta:消息索引存內(nèi)存,消息內(nèi)存存磁盤

        3. gama:消息索引內(nèi)存和磁盤都有,消息內(nèi)容存磁盤

        4. delta:消息索引和內(nèi)容都存磁盤,基本不消耗內(nèi)存,消耗更多CPU和I/O操作  

消息存入隊(duì)列后,不是固定不變的,它會(huì)隨著系統(tǒng)的負(fù)載在隊(duì)列中不斷流動(dòng),消息的狀態(tài)會(huì)不斷發(fā)送變化。  

持久化的消息,索引和內(nèi)容都必須先保存在磁盤上,才會(huì)處于上述狀態(tài)中的一種  

gama狀態(tài)只有持久化消息才會(huì)有的狀態(tài)。

在運(yùn)行時(shí),RabbitMQ會(huì)根據(jù)消息傳遞的速度定期計(jì)算一個(gè)當(dāng)前內(nèi)存中能夠保存的最大消息數(shù)量(target_ram_count),如果alpha狀態(tài)的消息數(shù)量大于此值,則會(huì)引起消息的狀態(tài)轉(zhuǎn)換,多余的消息可能會(huì)轉(zhuǎn)換到beta、gama或者delta狀態(tài)。區(qū)分這4種狀態(tài)的主要作用是滿足不同的內(nèi)存和CPU需求。  

    對(duì)于**普通沒(méi)有設(shè)置優(yōu)先級(jí)和鏡像**的隊(duì)列來(lái)說(shuō),backing_queue的默認(rèn)實(shí)現(xiàn)是rabbit_variable_queue,其內(nèi)部通過(guò)**5個(gè)子隊(duì)列**Q1、Q2、delta、Q3、Q4來(lái)體現(xiàn)消息的各個(gè)狀態(tài)。  
image-20210121094035159.png
image-20210121094048023.png
    消費(fèi)者獲取消息也會(huì)引起消息的狀態(tài)轉(zhuǎn)換。  

    當(dāng)消費(fèi)者獲取消息時(shí)

        1.首先會(huì)從Q4中獲取消息,如果獲取成功則返回。

        2.如果Q4為空,則嘗試從Q3中獲取消息,系統(tǒng)首先會(huì)判斷Q3是否為空,如果為空則返回隊(duì)列為空,即此時(shí)隊(duì)列中無(wú)消息。

        3.如果Q3不為空,則取出Q3中的消息;進(jìn)而再判斷此時(shí)Q3和Delta中的長(zhǎng)度,如果都為空,則可以認(rèn)為 Q2、Delta、 Q3、Q4 全部為空,此時(shí)將Q1中的消息直接轉(zhuǎn)移至Q4,下次直接從Q4 中獲取消息。

        4.如果Q3為空,Delta不為空,則將Delta的消息轉(zhuǎn)移至Q3中,下次可以直接從Q3中獲取消息。在將消息從Delta轉(zhuǎn)移到Q3的過(guò)程中,是按照索引分段讀取的,首先讀取某一段,然后判斷讀取的消息的個(gè)數(shù)與Delta中消息的個(gè)數(shù)是否相等,如果相等,則可以判定此時(shí)Delta中己無(wú)消息,則直接將Q2和剛讀取到的消息一并放入到Q3中,如果不相等,僅將此次讀取到的消息轉(zhuǎn)移到Q3。  

這里就有兩處疑問(wèn),第一個(gè)疑問(wèn)是:為什么Q3為空則可以認(rèn)定整個(gè)隊(duì)列為空?

  1. 試想一下,如果Q3為空,Delta不為空,那么在Q3取出最后一條消息的時(shí)候,Delta 上的消息就會(huì)被轉(zhuǎn)移到Q3這樣與 Q3 為空矛盾;
  2. 如果Delta 為空且Q2不為空,則在Q3取出最后一條消息時(shí)會(huì)將Q2的消息并入到Q3中,這樣也與Q3為空矛盾;
  3. 在Q3取出最后一條消息之后,如果Q2、Delta、Q3都為空,且Q1不為空時(shí),則Q1的消息會(huì)
    被轉(zhuǎn)移到Q4,這與Q4為空矛盾。

其實(shí)這一番論述也解釋了另一個(gè)問(wèn)題:為什么Q3和Delta都為空時(shí),則可以認(rèn)為 Q2、Delta、Q3、Q4全部為空?

通常在負(fù)載正常時(shí),如果消費(fèi)速度大于生產(chǎn)速度,對(duì)于不需要保證可靠不丟失的消息來(lái)說(shuō),極有可能只會(huì)處于alpha狀態(tài)。

對(duì)于持久化消息,它一定會(huì)進(jìn)入gamma狀態(tài),在開(kāi)啟publisher confirm機(jī)制時(shí),只有到了gamma 狀態(tài)時(shí)才會(huì)確認(rèn)該消息己被接收,若消息消費(fèi)速度足夠快、內(nèi)存也充足,這些消息也不會(huì)繼續(xù)走到下一個(gè)狀態(tài)。

1.4 為什么消息的堆積導(dǎo)致性能下降?

    在系統(tǒng)負(fù)載較高時(shí),消息若不能很快被消費(fèi)掉,這些消息就會(huì)進(jìn)入到很深的隊(duì)列中去,這樣會(huì)增加處理每個(gè)消息的平均開(kāi)銷。因?yàn)橐ǜ嗟臅r(shí)間和資源處理“堆積”的消息,如此用來(lái)處理新流入的消息的能力就會(huì)降低,使得后流入的消息又被積壓到很深的隊(duì)列中,繼續(xù)增大處理每個(gè)消息的平均開(kāi)銷,繼而情況變得越來(lái)越惡化,使得系統(tǒng)的處理能力大大降低。  

應(yīng)對(duì)這一問(wèn)題一般有3種措施:

  1. 增加prefetch_count的值,即一次發(fā)送多條消息給消費(fèi)者,加快消息被消費(fèi)的速度。
  2. 采用multiple ack,降低處理 ack 帶來(lái)的開(kāi)銷
  3. 流量控制

2. RabbitMQ 安裝配置

相關(guān)說(shuō)明

說(shuō)明 版本
系統(tǒng) CentOS7.4
Erlang erlang-23.0.2-1.el7.x86_64
RabbitMQ rabbitmq-server-3.8.4-1.el7.noarch

RabbitMQ的安裝需要首先安裝Erlang,因?yàn)樗腔贓rlang的VM運(yùn)行的。

RabbitMQ需要的依賴:socat和logrotate,logrotate操作系統(tǒng)中已經(jīng)存在了,只需要安裝socat就可以了。

RabbitMQ與Erlang的兼容關(guān)系詳見(jiàn)

https://www.rabbitmq.com/which-erlang.html
image-20210121095003686.png

1、安裝依賴

yum install socat -y

2、安裝Erlang

erlang-23.0.2-1.el7.x86_64.rpm下載地址:
https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm

首先將erlang-23.0.2-1.el7.x86_64.rpm上傳至服務(wù)器,然后執(zhí)行下述命令:

rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm

3、安裝RabbitMQ

rabbitmq-server-3.8.4-1.el7.noarch.rpm下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm

首先將rabbitmq-server-3.8.4-1.el7.noarch.rpm上傳至服務(wù)器,然后執(zhí)行下述命令:

rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm

默認(rèn)安裝目錄

/usr/lib/rabbitmq

4、啟用RabbitMQ的管理插件

image-20210125093042227.png
rabbitmq-plugins enable rabbitmq_management
image-20210125093211965.png

5、開(kāi)啟RabbitMQ

systemctl start rabbitmq-server

rabbitmq-server

或者后臺(tái)啟動(dòng)

rabbitmq-server -detached

6、添加用戶

rabbitmqctl add_user root 123456

7、給用戶添加權(quán)限
給root用戶在虛擬主機(jī)"/"上的配置、寫、讀的權(quán)限

image-20210125094103434.png
rabbitmqctl set_permissions root -p / ".*" ".*" ".*"

8、給用戶設(shè)置標(biāo)簽

rabbitmqctl set_user_tags root administrator

用戶的標(biāo)簽和權(quán)限:

Tag Capabilities
(None) 沒(méi)有訪問(wèn)management插件的權(quán)限
management 可以使用消息協(xié)議做任何操作的權(quán)限,加上: <br />1. 可以使用AMQP協(xié)議登錄的虛擬主機(jī)的權(quán)限 <br />2. 查看它們能登錄的所有虛擬主機(jī)中所有隊(duì)列、交換器和綁定的權(quán)限 <br />3. 查看和關(guān)閉它們自己的通道和連接的權(quán)限 <br />4. 查看它們能訪問(wèn)的虛擬主機(jī)中的全局統(tǒng)計(jì)信息,包括其他用戶的活動(dòng)
policymaker 所有management標(biāo)簽可以做的,加上: <br />1. 在它們能通過(guò)AMQP協(xié)議登錄的虛擬主機(jī)上,查看、創(chuàng)建和刪除策略以及虛 擬主機(jī)參數(shù)的權(quán)限
monitoring 所有management能做的,加上:<br /> 1. 列出所有的虛擬主機(jī),包括列出不能使用消息協(xié)議訪問(wèn)的虛擬主機(jī)的權(quán)限 <br />2. 查看其他用戶連接和通道的權(quán)限 <br />3. 查看節(jié)點(diǎn)級(jí)別的數(shù)據(jù)如內(nèi)存使用和集群的權(quán)限 <br />4. 查看真正的全局所有虛擬主機(jī)統(tǒng)計(jì)數(shù)據(jù)的權(quán)限
administrator 所有policymaker和monitoring能做的,加上:<br /> 1. 創(chuàng)建刪除虛擬主機(jī)的權(quán)限 <br />2. 查看、創(chuàng)建和刪除用戶的權(quán)限 <br />3. 查看、創(chuàng)建和刪除權(quán)限的權(quán)限 <br />4. 關(guān)閉其他用戶連接的權(quán)限

9、打開(kāi)瀏覽器,訪問(wèn)http://IP:15672

10、使用剛才創(chuàng)建的用戶登錄:

3. RabbitMQ常用操作命令

# 前臺(tái)啟動(dòng)Erlang VM和RabbitMQ
rabbitmq-server

# 后臺(tái)啟動(dòng)
rabbitmq-server -detached

# 停止RabbitMQ和Erlang VM
rabbitmqctl stop

# 查看所有隊(duì)列
rabbitmqctl list_queues

# 查看所有虛擬主機(jī)
rabbitmqctl list_vhosts

# 在Erlang VM運(yùn)行的情況下啟動(dòng)RabbitMQ應(yīng)用
rabbitmqctl start_app
rabbitmqctl stop_app

# 查看節(jié)點(diǎn)狀態(tài)
rabbitmqctl status

# 查看所有可用的插件
rabbitmq-plugins list

# 啟用插件
rabbitmq-plugins enable <plugin-name>

# 停用插件
rabbitmq-plugins disable <plugin-name>

# 添加用戶
rabbitmqctl add_user username password

# 列出所有用戶:
rabbitmqctl list_users

# 刪除用戶:
rabbitmqctl delete_user username

# 清除用戶權(quán)限:
rabbitmqctl clear_permissions -p vhostpath username

# 列出用戶權(quán)限:
rabbitmqctl list_user_permissions username

# 修改密碼:
rabbitmqctl change_password username newpassword

# 設(shè)置用戶權(quán)限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"

# 創(chuàng)建虛擬主機(jī):
rabbitmqctl add_vhost vhostpath

# 列出所以虛擬主機(jī):
rabbitmqctl list_vhosts

# 列出虛擬主機(jī)上的所有權(quán)限:
rabbitmqctl list_permissions -p vhostpath

# 刪除虛擬主機(jī):
rabbitmqctl delete_vhost vhost vhostpath

# 移除所有數(shù)據(jù),要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset

4. RabbitMQ工作流程詳解

4.1 生產(chǎn)者發(fā)送消息的流程

  1. 生產(chǎn)者連接RabbitMQ,建立TCP連接( Connection),開(kāi)啟信道(Channel)
  2. 生產(chǎn)者聲明一個(gè)Exchange(交換器),并設(shè)置相關(guān)屬性,比如交換器類型、是否持久化等
  3. 生產(chǎn)者聲明一個(gè)隊(duì)列井設(shè)置相關(guān)屬性,比如是否排他、是否持久化、是否自動(dòng)刪除等
  4. 生產(chǎn)者通過(guò) bindingKey (綁定Key)將交換器和隊(duì)列綁定( binding )起來(lái)
  5. 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含 routingKey (路由鍵)、交換器等信息
  6. 相應(yīng)的交換器根據(jù)接收到的 routingKey 查找相匹配的隊(duì)列。
  7. 如果找到,則將從生產(chǎn)者發(fā)送過(guò)來(lái)的消息存入相應(yīng)的隊(duì)列中。
  8. 如果沒(méi)有找到,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
  9. 關(guān)閉信道。
  10. 關(guān)閉連接。

4.2 消費(fèi)者接收消息的過(guò)程

  1. 消費(fèi)者連接到RabbitMQ Broker ,建立一個(gè)連接(Connection ) ,開(kāi)啟一個(gè)信道(Channel) 。
  2. 消費(fèi)者向RabbitMQ Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,可能會(huì)設(shè)置相應(yīng)的回調(diào)函數(shù), 以及做一些準(zhǔn)備工作
  3. 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息, 消費(fèi)者接收消息。
  4. 消費(fèi)者確認(rèn)( ack) 接收到的消息。
  5. RabbitMQ 從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息。
  6. 關(guān)閉信道。
  7. 關(guān)閉連接。

4.3 案例

詳見(jiàn) demo_02

4.4 Connection 和 Channel關(guān)系

    生產(chǎn)者和消費(fèi)者,需要與RabbitMQ Broker 建立TCP連接,也就是Connection 。一旦TCP 連接建立起來(lái),客戶端緊接著創(chuàng)建一個(gè)AMQP 信道(Channel),每個(gè)信道都會(huì)被指派一個(gè)唯一的ID。信道是建立在Connection 之上的虛擬連接, RabbitMQ 處理的每條AMQP 指令都是通過(guò)信道完成的。  
image-20210125113915612.png

**為什么不直接使用TCP連接,而是使用信道? **

RabbitMQ 采用類似NIO的做法,復(fù)用TCP 連接,減少性能開(kāi)銷,便于管理。

當(dāng)每個(gè)信道的流量不是很大時(shí),復(fù)用單一的Connection 可以在產(chǎn)生性能瓶頸的情況下有效地節(jié)省TCP 連接資源。

當(dāng)信道本身的流量很大時(shí),一個(gè)Connection 就會(huì)產(chǎn)生性能瓶頸,流量被限制。需要建立多個(gè)Connection ,分?jǐn)傂诺?。具體的調(diào)優(yōu)看業(yè)務(wù)需要。

信道在AMQP 中是一個(gè)很重要的概念,大多數(shù)操作都是在信道這個(gè)層面進(jìn)行的。

channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...

RabbitMQ 相關(guān)的API與AMQP緊密相連,比如channel.basicPublish 對(duì)應(yīng)AMQP 的Basic.Publish命令。

4.5 RabbitMQ工作模式詳解

https://www.rabbitmq.com/getstarted.htm

4.5.1 Work Queue(direct)

生產(chǎn)者發(fā)消息,啟動(dòng)多個(gè)消費(fèi)者實(shí)例來(lái)消費(fèi)消息,每個(gè)消費(fèi)者僅消費(fèi)部分信息,可達(dá)到負(fù)載均衡的效果。

image-20210125114123821.png
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè)消息隊(duì)列
        channel.queueDeclare("queue.wq", true, false, false, null);
        // 聲明一個(gè) 交換器
        channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
        // 將消息隊(duì)列和交換器綁定,并制定綁卡鍵
        channel.queueBind("queue.wq", "ex.wq", "key.wq");

        for (int i = 0; i < 15; i++) {
            channel.basicPublish("ex.wq", "key.wq", null, ("工作隊(duì)列" + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè)消息隊(duì)列
        channel.queueDeclare("queue.wq", true, false, false, null);

        channel.basicConsume("queue.wq", new DeliverCallback() {
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("推送來(lái)的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            public void handle(String consumerTag) throws IOException {
                System.out.println("CancelCallback:" + consumerTag);
            }
        });

    }
}

4.5.2 發(fā)布訂閱(fanout)

    使用fanout類型交換器,routingKey忽略。每個(gè)消費(fèi)者定義生成一個(gè)隊(duì)列并綁定到同一個(gè)Exchange,每個(gè)消費(fèi)者都可以消費(fèi)到完整的消息。

    消息廣播給所有訂閱該消息的消費(fèi)者。  

    在RabbitMQ中,生產(chǎn)者不是將消息直接發(fā)送給消息隊(duì)列,實(shí)際上生產(chǎn)者根本不知道一個(gè)消息被發(fā)送到哪個(gè)隊(duì)列。  

    生產(chǎn)者將消息發(fā)送給交換器。交換器非常簡(jiǎn)單,從生產(chǎn)者接收消息,將消息推送給消息隊(duì)列。交換器必須清楚地知道要怎么處理接收到的消息。應(yīng)該是追加到一個(gè)指定的隊(duì)列,還是追加到多個(gè)隊(duì)列,還是丟棄。規(guī)則就是交換器類型。  
image-20210125144116722.png

交換器的類型前面已經(jīng)介紹過(guò)了: direct 、 topic 、 headersfanout 四種類型。發(fā)布訂閱使用fanout。創(chuàng)建交換器,名字叫 logs :

channel.exchangeDeclare("logs", "fanout");
`fanout` 交換器很簡(jiǎn)單,從名字就可以看出來(lái)(用風(fēng)扇吹出去),將所有收到的消息發(fā)送給它知道的所有的隊(duì)列。  
rabbitmqctl list_exchanges

列出RabbitMQ的交換器,包括了 amq.* 的和默認(rèn)的(未命名)的交換器。

image-20210125145221866.png

未命名交換器

在前面的那里中我們沒(méi)有指定交換器,但是依然可以向隊(duì)列發(fā)送消息。這是因?yàn)槲覀兪褂昧四J(rèn)的交換器。

channel.basicPublish("", "hello", null, message.getBytes());

第一個(gè)參數(shù)就是交換器名稱,為空字符串。直接使用routingKey向隊(duì)列發(fā)送消息,如果該routingKey指定的隊(duì)列存在的話

現(xiàn)在,向指定的交換器發(fā)布消息:

channel.basicPublish("logs", "", null, message.getBytes());
臨時(shí)隊(duì)列

前面我們使用隊(duì)列的名稱,生產(chǎn)者和消費(fèi)者都是用該名稱來(lái)發(fā)送和接收該隊(duì)列中的消息。

首先,我們無(wú)論何時(shí)連接RabbitMQ的時(shí)候,都需要一個(gè)新的,空的隊(duì)列。我們可以使用隨機(jī)的名字創(chuàng)建隊(duì)列,也可以讓服務(wù)器幫我們生成隨機(jī)的消息隊(duì)列名字。

其次,一旦我們斷開(kāi)到消費(fèi)者的連接,該隊(duì)列應(yīng)該自動(dòng)刪除。

String queueName = channel.queueDeclare().getQueue();

上述代碼我們聲明了一個(gè)非持久化的、排他的、自動(dòng)刪除的隊(duì)列,并且名字是服務(wù)器隨機(jī)生成的。

queueName一般的格式類似: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。

綁定
image-20210125144545132.png

在創(chuàng)建了消息隊(duì)列和 fanout 類型的交換器之后,我們需要將兩者進(jìn)行綁定,讓交換器將消息發(fā)送給該隊(duì)列

channel.queueBind(queueName, "logs", "");

此時(shí), logs 交換器會(huì)將接收到的消息追加到我們的隊(duì)列中。

可以使用下述命令列出RabbitMQ中交換器的綁定關(guān)系:

rabbitmqctl list_bindings
image-20210125144646707.png
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) fanout 類型的交換器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        for (int i = 0; i < 20; i++) {
            // fanout 類型的交換器不需要指定路由鍵
            channel.basicPublish("ex.myfan", "", null, ("hello word fan " + i).getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class Consumer3 {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明零時(shí)隊(duì)列,隊(duì)列的名字由RabbitMQ自動(dòng)生成
        String queueName = channel.queueDeclare().getQueue();
        System.out.println("生成的零時(shí)隊(duì)列名字為:" + queueName);

        // 聲明一個(gè) fanout 類型的交換器
        channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);

        // fanout 類型的交換器不需要綁定路由鍵
        channel.queueBind(queueName, "ex.myfan", "");

        channel.basicConsume(queueName, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("收到的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
            }
        });

    }
}

消費(fèi)者日志

生成的零時(shí)隊(duì)列名字為:amq.gen-xcaoVeYdLVWP3OHpHjNzXQ
收到的消息:hello word fan 0
收到的消息:hello word fan 1
收到的消息:hello word fan 2
收到的消息:hello word fan 3
收到的消息:hello word fan 4
收到的消息:hello word fan 5
收到的消息:hello word fan 6
收到的消息:hello word fan 7
收到的消息:hello word fan 8
收到的消息:hello word fan 9
收到的消息:hello word fan 10
收到的消息:hello word fan 11
收到的消息:hello word fan 12
收到的消息:hello word fan 13
收到的消息:hello word fan 14
收到的消息:hello word fan 15
收到的消息:hello word fan 16
收到的消息:hello word fan 17
收到的消息:hello word fan 18
收到的消息:hello word fan 19

4.5.3 路由模式

    使用 direct 類型的Exchange,發(fā)N條消費(fèi)并使用不同的 routingKey ,消費(fèi)者定義隊(duì)列并將隊(duì)列、routingKey 、Exchange綁定。此時(shí)使用 direct 模式Exchagne必須要 routingKey 完全匹配的情況下消息才會(huì)轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中被消費(fèi)。  

現(xiàn)在我們想讓接收者只接收部分消息,如,我們通過(guò)直接模式的交換器將關(guān)鍵的錯(cuò)誤信息記錄到log文件,同時(shí)在控制臺(tái)正常打印所有的日志信息。

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Random;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class Producer {

    private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};

    private static Random random = new Random();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);

        for (int i = 0; i < 100; i++) {
            String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
            channel.basicPublish("ex.routing", level, null, ("這是[" + level + "]消息").getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerError {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);

        // 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
        channel.queueDeclare("queue.error", false, false, false, null);

        // 消息隊(duì)列 綁定 路由鍵
        channel.queueBind("queue.error", "ex.routing", "ERROR");

        channel.basicConsume("queue.error", new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        });

    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerFatal {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);

        // 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
        channel.queueDeclare("queue.fatal", false, false, false, null);

        // 消息隊(duì)列 綁定 路由鍵
        channel.queueBind("queue.fatal", "ex.routing", "FATAL");

        channel.basicConsume("queue.fatal", new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("FATAL 收到的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        });

    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerWarn {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);

        // 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
        channel.queueDeclare("queue.warn", false, false, false, null);

        // 消息隊(duì)列 綁定 路由鍵
        channel.queueBind("queue.warn", "ex.routing", "WARN");

        channel.basicConsume("queue.warn", new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
                System.out.println("WARN 收到的消息:" + new String(message.getBody(), "utf-8"));
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {

            }
        });

    }
}

4.5.4 主題模式

使用 topic 類型的交換器,隊(duì)列綁定到交換器、 bindingKey 時(shí)使用通配符,交換器將消息路由轉(zhuǎn)發(fā)到具體隊(duì)列時(shí)會(huì)根據(jù)消息 routingKey 模糊匹配,比較靈活。

上個(gè)模式中,我們通過(guò) direct 類型的交換器做到了根據(jù)日志級(jí)別的不同,將消息發(fā)送給了不同隊(duì)列的。

這里有一個(gè)限制,加入現(xiàn)在我不僅想根據(jù)日志級(jí)別劃分日志消息,還想根據(jù)日志來(lái)源劃分日志,怎么做?

比如,我想監(jiān)聽(tīng)cron服務(wù)發(fā)送的 error 消息,又想監(jiān)聽(tīng)從kern服務(wù)發(fā)送的所有消息。

此時(shí)可以使用RabbitMQ的主題模式( Topic )

要想 topic 類型的交換器, routingKey 就不能隨便寫了,它必須得是點(diǎn)分單詞。單詞可以隨便寫,生產(chǎn)中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。**該點(diǎn)分單詞字符串最長(zhǎng)255字節(jié) **

bindingKey 也必須是這種形式。 topic 類型的交換器背后原理跟 direct 類型的類似:只要隊(duì)列的 bindingKey 的值與消息的 routingKey 匹配,隊(duì)列就可以收到該消息。有兩個(gè)不同:

  1. *(star)匹配一個(gè)單詞
  2. # 匹配0到多個(gè)單詞
image-20210125161534181.png
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Random;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class Producer {

    private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};
    private static final String[] LOG_AREA = {"beijing", "shanghai", "chengdu"};
    private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};

    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);

        String routingKey, message;

        String level, area, biz;

        for (int i = 0; i < 100; i++) {

            level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
            area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
            biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];

            // 路由鍵由多個(gè)維度組成
            routingKey = area + "." + biz + "." + level;
            message = "LOG:[" + level + "]:這是 [" + area + "] 地址 [" + biz + "] 服務(wù)器發(fā)來(lái)的消息,MSQ_SEQ = " + i;

            channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
        }

        channel.close();
        connection.close();
    }

}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerBeijing {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);

        // 零時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();

        // 消息隊(duì)列 綁定 路由鍵
        // 只要是以 beijing 開(kāi)頭的,不管后邊有幾個(gè)點(diǎn)分單詞都可以接受
        channel.queueBind(queue, "ex.topic", "beijing.#");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println("Beijing 收到的消息:" + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {

        });
    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerChengduEdu {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);

        // 零時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();

        // 消息隊(duì)列 綁定 路由鍵
        channel.queueBind(queue, "ex.topic", "chengdu.edu-online.*");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println("chengdu edu-online 收到的消息:" + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {

        });

    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerError {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);

        // 零時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();

        // 消息隊(duì)列 綁定 路由鍵
        // 不管前面有幾個(gè)點(diǎn)分單詞,只有最后一個(gè)單詞是ERROR
        channel.queueBind(queue, "ex.topic", "#.ERROR");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {

        });
    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author jie.luo
 * @since 2021/1/25
 */
public class ConsumerShanghaiError {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
        channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);

        // 零時(shí)隊(duì)列
        String queue = channel.queueDeclare().getQueue();

        // 消息隊(duì)列 綁定 路由鍵
        channel.queueBind(queue, "ex.topic", "shanghai.*.ERROR");

        channel.basicConsume(queue, (consumerTag, message) -> {
            System.out.println("shanghai error 收到的消息:" + new String(message.getBody(), "utf-8"));
        }, consumerTag -> {

        });

    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容