RabbitMQ 簡(jiǎn)介
1. RabbitMQ 介紹
RabbitMQ,俗稱“兔子MQ”(可見(jiàn)其輕巧,敏捷),是目前非常熱門的一款開(kāi)源消息中間件,不管是互聯(lián)網(wǎng)
行業(yè)還是傳統(tǒng)行業(yè)都廣泛使用(最早是為了解決電信行業(yè)系統(tǒng)之間的可靠通信而設(shè)計(jì))。
1. 高可靠性、易擴(kuò)展(集群、橫向擴(kuò)展、插件)、高可用、功能豐富等
2. 支持大多數(shù)(甚至冷門)的編程語(yǔ)言客戶端。
3. RabbitMQ遵循AMQP協(xié)議,自身采用Erlang(一種由愛(ài)立信開(kāi)發(fā)的通用面向并發(fā)編程的語(yǔ)言)編寫。
4. RabbitMQ也支持MQTT等其他協(xié)議。
RabbitMQ具有很強(qiáng)大的插件擴(kuò)展能力,官方和社區(qū)提供了非常豐富的插件可供選擇:
https://www.rabbitmq.com/community-plugins.html
1.1 RabbitMQ整體邏輯架構(gòu)

1.2 Exchange類型
RabbitMQ常用的交換器類型有: `fanout` 、 `direct` 、 `topic` 、 `headers` 四種。
1.2.1 Fanout
扇出交換器
會(huì)把所有發(fā)送到該交換器的消息路由到所有與該交換器綁定的隊(duì)列中,如圖:

生產(chǎn)者發(fā)送消息到MQ,消費(fèi)者在線可以收到該消息,不在線的消費(fèi)者上線后不能收到上線之前生產(chǎn)者發(fā)送的消息
1.2.2 Direct
直接交換器
direct類型的交換器路由規(guī)則很簡(jiǎn)單,它會(huì)把消息路由到那些BindingKey和RoutingKey完全匹配的隊(duì)列中,
如下圖:

1.2.3 Topic
主題交換器
topic類型的交換器在direct匹配規(guī)則上進(jìn)行了擴(kuò)展,也是將消息路由到BindingKey和RoutingKey相匹配的隊(duì)
列中,這里的匹配規(guī)則稍微不同,它約定:
BindingKey和RoutingKey一樣都是由"."分隔的字符串;BindingKey中可以存在兩種特殊字符“”和“#”,用于模糊匹配,其中""用于匹配一個(gè)單詞,"#"用于匹配多個(gè)單詞(可以是0個(gè))。

1.2.4 Headers
headers類型的交換器不依賴于路由鍵的匹配規(guī)則來(lái)路由信息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)
行匹配。在綁定隊(duì)列和交換器時(shí)指定一組鍵值對(duì),當(dāng)發(fā)送的消息到交換器時(shí),RabbitMQ會(huì)獲取到該消息的
headers,對(duì)比其中的鍵值對(duì)是否完全匹配隊(duì)列和交換器綁定時(shí)指定的鍵值對(duì),如果匹配,消息就會(huì)路由到該隊(duì)
列。headers類型的交換器性能很差,不實(shí)用。
1.3 RabbitMQ數(shù)據(jù)存儲(chǔ)
1.3.1 消息類型
RabbitMQ消息有兩種類型:
- 持久化消息
- 非持久化消息
這兩種消息都會(huì)被寫入磁盤。
持久化消息在到達(dá)隊(duì)列時(shí)寫入磁盤,同時(shí)會(huì)內(nèi)存中保存一份備份,當(dāng)內(nèi)存吃緊時(shí),消息從內(nèi)存中清除。這會(huì)提高一定的性能。
非持久化消息一般只存于內(nèi)存中,當(dāng)內(nèi)存壓力大時(shí)數(shù)據(jù)刷盤處理,以節(jié)省內(nèi)存空間。
1.3.2 存儲(chǔ)層
RabbitMQ存儲(chǔ)層包含兩個(gè)部分:隊(duì)列索引和消息存儲(chǔ)。

1.3.2.1 隊(duì)列索引
**隊(duì)列索引:rabbit_queue_index **
索引維護(hù)隊(duì)列的落盤消息的信息,如存儲(chǔ)地點(diǎn)、是否已被給消費(fèi)者接收、是否已被消費(fèi)者ack等。
每個(gè)隊(duì)列都有相對(duì)應(yīng)的索引。
索引使用順序的段文件來(lái)存儲(chǔ),后綴為.idx,文件名從0開(kāi)始累加,每個(gè)段文件中包含固定的`segment_entry_count`條記錄,默認(rèn)值是**16384**。每個(gè)index從磁盤中讀取消息的時(shí)候,**至少要在內(nèi)存
中維護(hù)一個(gè)段文件,所以設(shè)置 queue_index_embed_msgs_below值得時(shí)候要格外謹(jǐn)慎**,一點(diǎn)點(diǎn)增大也
可能會(huì)引起內(nèi)存爆炸式增長(zhǎng)。
1.3.2.2 消息存儲(chǔ)
**消息存儲(chǔ): rabbit_msg_store **
消息以**鍵值對(duì)的形式**存儲(chǔ)到文件中,一個(gè)虛擬主機(jī)上的所有隊(duì)列使用同一塊存儲(chǔ),每個(gè)節(jié)點(diǎn)只有一個(gè)。存儲(chǔ)
分為持久化存儲(chǔ)(msg_store_persistent)和短暫存儲(chǔ)(msg_store_transient)。持久化存儲(chǔ)的內(nèi)容在broker重
啟后不會(huì)丟失,短暫存儲(chǔ)的內(nèi)容在broker重啟后丟失。
store使用文件來(lái)存儲(chǔ),后綴為.rdq,經(jīng)過(guò)store處理的所有消息都會(huì)以追加的方式寫入到該文件中,當(dāng)該文件
的大小超過(guò)指定的限制(file_size_limit)后,將會(huì)關(guān)閉該文件并創(chuàng)建一個(gè)新的文件以供新的消息寫入。文件名從0
開(kāi)始進(jìn)行累加。在進(jìn)行消息的存儲(chǔ)時(shí),RabbitMQ會(huì)在ETS(Erlang TermStorage)表中記錄消息在文件中的位置
映射和文件的相關(guān)信息。
消息(包括消息頭、消息體、屬性)可以直接存儲(chǔ)在index中,也可以存儲(chǔ)在store中。最佳的方式是較小的消息存在index中,而較大的消息存在store中。這個(gè)消息大小的界定可以通過(guò)`queue_index_embed_msgs_below`來(lái)配置,默認(rèn)值為4096B。當(dāng)一個(gè)消息小于設(shè)定的大小閾值時(shí),就可以存儲(chǔ)在index中,這樣性能上可以得到優(yōu)化。**一個(gè)完整的消息**大小小于這個(gè)值,就放到索引中,否則放到持久化消息文件中。
rabbitmq.conf中的配置信息:
## Size in bytes below which to embed messages in the queue index.
## Related doc guide: https://rabbitmq.com/persistence-conf.html
##
# queue_index_embed_msgs_below = 4096
## You can also set this size in memory units
##
# queue_index_embed_msgs_below = 4kb
如果消息小于這個(gè)值,就在索引中存儲(chǔ),如果消息大于這個(gè)值就在store中存儲(chǔ):
大于這個(gè)值的消息存儲(chǔ)于msg_store_persistent目錄中的<num>.rdq文件中:
小于這個(gè)值的消息存儲(chǔ)于<num>.idx索引文件中:
讀取消息時(shí),先根據(jù)消息的ID(msg_id)找到對(duì)應(yīng)存儲(chǔ)的文件,如果文件存在并且未被鎖住,則直接打開(kāi)文件,從指定位置讀取消息內(nèi)容。如果文件不存在或者被鎖住了,則發(fā)送請(qǐng)求由store進(jìn)行處理。
刪除消息時(shí),只是從ETS表刪除指定消息的相關(guān)信息,同時(shí)更新消息對(duì)應(yīng)的存儲(chǔ)文件和相關(guān)信息。在執(zhí)行消息刪除操作時(shí),并不立即對(duì)文件中的消息進(jìn)行刪除,也就是說(shuō)消息依然在文件中,僅僅是標(biāo)記為垃圾數(shù)據(jù)而已。當(dāng)一個(gè)文件中都是垃圾數(shù)據(jù)時(shí)可以將這個(gè)文件刪除。當(dāng)檢測(cè)到前后兩個(gè)文件中的有效數(shù)據(jù)可以合并成一個(gè)文件,并且所有的垃圾數(shù)據(jù)的大小和所有文件(至少有3個(gè)文件存在的情況下)的數(shù)據(jù)大小的比值超過(guò)設(shè)置的閾值garbage_fraction(默認(rèn)值0.5)時(shí),才會(huì)觸發(fā)垃圾回收,將這兩個(gè)文件合并,執(zhí)行合并的兩個(gè)文件一定是邏輯上相鄰的兩個(gè)文件。合并邏輯:
鎖定這兩個(gè)文件
先整理前面的文件的有效數(shù)據(jù),再整理后面的文件的有效數(shù)據(jù)
將后面文件的有效數(shù)據(jù)寫入到前面的文件中
更新消息在ETS表中的記錄
刪除后面文件

1.3.3 隊(duì)列結(jié)構(gòu)
通常隊(duì)列由rabbit_amqqueue_process和backing_queue這兩部分組成,rabbit_amqqueue_process負(fù)責(zé)協(xié)議相關(guān)的消息處理,即接收生產(chǎn)者發(fā)布的消息、向消費(fèi)者交付消息、處理消息的確認(rèn)(包括生產(chǎn)端的confirm和消費(fèi)端的ack)等。backing_queue是消息存儲(chǔ)的具體形式和引擎,并向rabbit_amqqueue_process提供相關(guān)的接口以供調(diào)用。

如果消息投遞的目的隊(duì)列是空的,并且有消費(fèi)者訂閱了這個(gè)隊(duì)列,那么該消息會(huì)直接發(fā)送給消費(fèi)
者,不會(huì)經(jīng)過(guò)隊(duì)列這一步。當(dāng)消息無(wú)法直接投遞給消費(fèi)者時(shí),需要暫時(shí)將消息存入隊(duì)列,以便重新投
遞。
`rabbit_variable_queue.erl `源碼中定義了RabbitMQ隊(duì)列的**4種狀態(tài)**:
1. alpha:消息索引和消息內(nèi)容都存內(nèi)存,最耗內(nèi)存,很少消耗CPU
2. beta:消息索引存內(nèi)存,消息內(nèi)存存磁盤
3. gama:消息索引內(nèi)存和磁盤都有,消息內(nèi)容存磁盤
4. delta:消息索引和內(nèi)容都存磁盤,基本不消耗內(nèi)存,消耗更多CPU和I/O操作
消息存入隊(duì)列后,不是固定不變的,它會(huì)隨著系統(tǒng)的負(fù)載在隊(duì)列中不斷流動(dòng),消息的狀態(tài)會(huì)不斷發(fā)送變化。
持久化的消息,索引和內(nèi)容都必須先保存在磁盤上,才會(huì)處于上述狀態(tài)中的一種
gama狀態(tài)只有持久化消息才會(huì)有的狀態(tài)。
在運(yùn)行時(shí),RabbitMQ會(huì)根據(jù)消息傳遞的速度定期計(jì)算一個(gè)當(dāng)前內(nèi)存中能夠保存的最大消息數(shù)量(target_ram_count),如果alpha狀態(tài)的消息數(shù)量大于此值,則會(huì)引起消息的狀態(tài)轉(zhuǎn)換,多余的消息可能會(huì)轉(zhuǎn)換到beta、gama或者delta狀態(tài)。區(qū)分這4種狀態(tài)的主要作用是滿足不同的內(nèi)存和CPU需求。
對(duì)于**普通沒(méi)有設(shè)置優(yōu)先級(jí)和鏡像**的隊(duì)列來(lái)說(shuō),backing_queue的默認(rèn)實(shí)現(xiàn)是rabbit_variable_queue,其內(nèi)部通過(guò)**5個(gè)子隊(duì)列**Q1、Q2、delta、Q3、Q4來(lái)體現(xiàn)消息的各個(gè)狀態(tài)。


消費(fèi)者獲取消息也會(huì)引起消息的狀態(tài)轉(zhuǎn)換。
當(dāng)消費(fèi)者獲取消息時(shí)
1.首先會(huì)從Q4中獲取消息,如果獲取成功則返回。
2.如果Q4為空,則嘗試從Q3中獲取消息,系統(tǒng)首先會(huì)判斷Q3是否為空,如果為空則返回隊(duì)列為空,即此時(shí)隊(duì)列中無(wú)消息。
3.如果Q3不為空,則取出Q3中的消息;進(jìn)而再判斷此時(shí)Q3和Delta中的長(zhǎng)度,如果都為空,則可以認(rèn)為 Q2、Delta、 Q3、Q4 全部為空,此時(shí)將Q1中的消息直接轉(zhuǎn)移至Q4,下次直接從Q4 中獲取消息。
4.如果Q3為空,Delta不為空,則將Delta的消息轉(zhuǎn)移至Q3中,下次可以直接從Q3中獲取消息。在將消息從Delta轉(zhuǎn)移到Q3的過(guò)程中,是按照索引分段讀取的,首先讀取某一段,然后判斷讀取的消息的個(gè)數(shù)與Delta中消息的個(gè)數(shù)是否相等,如果相等,則可以判定此時(shí)Delta中己無(wú)消息,則直接將Q2和剛讀取到的消息一并放入到Q3中,如果不相等,僅將此次讀取到的消息轉(zhuǎn)移到Q3。
這里就有兩處疑問(wèn),第一個(gè)疑問(wèn)是:為什么Q3為空則可以認(rèn)定整個(gè)隊(duì)列為空?
- 試想一下,如果Q3為空,Delta不為空,那么在Q3取出最后一條消息的時(shí)候,Delta 上的消息就會(huì)被轉(zhuǎn)移到Q3這樣與 Q3 為空矛盾;
- 如果Delta 為空且Q2不為空,則在Q3取出最后一條消息時(shí)會(huì)將Q2的消息并入到Q3中,這樣也與Q3為空矛盾;
- 在Q3取出最后一條消息之后,如果Q2、Delta、Q3都為空,且Q1不為空時(shí),則Q1的消息會(huì)
被轉(zhuǎn)移到Q4,這與Q4為空矛盾。
其實(shí)這一番論述也解釋了另一個(gè)問(wèn)題:為什么Q3和Delta都為空時(shí),則可以認(rèn)為 Q2、Delta、Q3、Q4全部為空?
通常在負(fù)載正常時(shí),如果消費(fèi)速度大于生產(chǎn)速度,對(duì)于不需要保證可靠不丟失的消息來(lái)說(shuō),極有可能只會(huì)處于alpha狀態(tài)。
對(duì)于持久化消息,它一定會(huì)進(jìn)入gamma狀態(tài),在開(kāi)啟publisher confirm機(jī)制時(shí),只有到了gamma 狀態(tài)時(shí)才會(huì)確認(rèn)該消息己被接收,若消息消費(fèi)速度足夠快、內(nèi)存也充足,這些消息也不會(huì)繼續(xù)走到下一個(gè)狀態(tài)。
1.4 為什么消息的堆積導(dǎo)致性能下降?
在系統(tǒng)負(fù)載較高時(shí),消息若不能很快被消費(fèi)掉,這些消息就會(huì)進(jìn)入到很深的隊(duì)列中去,這樣會(huì)增加處理每個(gè)消息的平均開(kāi)銷。因?yàn)橐ǜ嗟臅r(shí)間和資源處理“堆積”的消息,如此用來(lái)處理新流入的消息的能力就會(huì)降低,使得后流入的消息又被積壓到很深的隊(duì)列中,繼續(xù)增大處理每個(gè)消息的平均開(kāi)銷,繼而情況變得越來(lái)越惡化,使得系統(tǒng)的處理能力大大降低。
應(yīng)對(duì)這一問(wèn)題一般有3種措施:
- 增加prefetch_count的值,即一次發(fā)送多條消息給消費(fèi)者,加快消息被消費(fèi)的速度。
- 采用multiple ack,降低處理 ack 帶來(lái)的開(kāi)銷
- 流量控制
2. RabbitMQ 安裝配置
相關(guān)說(shuō)明
| 說(shuō)明 | 版本 |
|---|---|
| 系統(tǒng) | CentOS7.4 |
| Erlang | erlang-23.0.2-1.el7.x86_64 |
| RabbitMQ | rabbitmq-server-3.8.4-1.el7.noarch |
RabbitMQ的安裝需要首先安裝Erlang,因?yàn)樗腔贓rlang的VM運(yùn)行的。
RabbitMQ需要的依賴:socat和logrotate,logrotate操作系統(tǒng)中已經(jīng)存在了,只需要安裝socat就可以了。
RabbitMQ與Erlang的兼容關(guān)系詳見(jiàn)
https://www.rabbitmq.com/which-erlang.html

1、安裝依賴
yum install socat -y
2、安裝Erlang
erlang-23.0.2-1.el7.x86_64.rpm下載地址:
https://github.com/rabbitmq/erlang-rpm/releases/download/v23.0.2/erlang-23.0.2-1.el7.x86_64.rpm
首先將erlang-23.0.2-1.el7.x86_64.rpm上傳至服務(wù)器,然后執(zhí)行下述命令:
rpm -ivh erlang-23.0.2-1.el7.x86_64.rpm
3、安裝RabbitMQ
rabbitmq-server-3.8.4-1.el7.noarch.rpm下載地址:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.5/rabbitmq-server-3.8.5-1.el7.noarch.rpm
首先將rabbitmq-server-3.8.4-1.el7.noarch.rpm上傳至服務(wù)器,然后執(zhí)行下述命令:
rpm -ivh rabbitmq-server-3.8.4-1.el7.noarch.rpm
默認(rèn)安裝目錄
/usr/lib/rabbitmq
4、啟用RabbitMQ的管理插件

rabbitmq-plugins enable rabbitmq_management

5、開(kāi)啟RabbitMQ
systemctl start rabbitmq-server
或
rabbitmq-server
或者后臺(tái)啟動(dòng)
rabbitmq-server -detached
6、添加用戶
rabbitmqctl add_user root 123456
7、給用戶添加權(quán)限
給root用戶在虛擬主機(jī)"/"上的配置、寫、讀的權(quán)限

rabbitmqctl set_permissions root -p / ".*" ".*" ".*"
8、給用戶設(shè)置標(biāo)簽
rabbitmqctl set_user_tags root administrator
用戶的標(biāo)簽和權(quán)限:
| Tag | Capabilities |
|---|---|
| (None) | 沒(méi)有訪問(wèn)management插件的權(quán)限 |
| management | 可以使用消息協(xié)議做任何操作的權(quán)限,加上: <br />1. 可以使用AMQP協(xié)議登錄的虛擬主機(jī)的權(quán)限 <br />2. 查看它們能登錄的所有虛擬主機(jī)中所有隊(duì)列、交換器和綁定的權(quán)限 <br />3. 查看和關(guān)閉它們自己的通道和連接的權(quán)限 <br />4. 查看它們能訪問(wèn)的虛擬主機(jī)中的全局統(tǒng)計(jì)信息,包括其他用戶的活動(dòng) |
| policymaker | 所有management標(biāo)簽可以做的,加上: <br />1. 在它們能通過(guò)AMQP協(xié)議登錄的虛擬主機(jī)上,查看、創(chuàng)建和刪除策略以及虛 擬主機(jī)參數(shù)的權(quán)限 |
| monitoring | 所有management能做的,加上:<br /> 1. 列出所有的虛擬主機(jī),包括列出不能使用消息協(xié)議訪問(wèn)的虛擬主機(jī)的權(quán)限 <br />2. 查看其他用戶連接和通道的權(quán)限 <br />3. 查看節(jié)點(diǎn)級(jí)別的數(shù)據(jù)如內(nèi)存使用和集群的權(quán)限 <br />4. 查看真正的全局所有虛擬主機(jī)統(tǒng)計(jì)數(shù)據(jù)的權(quán)限 |
| administrator | 所有policymaker和monitoring能做的,加上:<br /> 1. 創(chuàng)建刪除虛擬主機(jī)的權(quán)限 <br />2. 查看、創(chuàng)建和刪除用戶的權(quán)限 <br />3. 查看、創(chuàng)建和刪除權(quán)限的權(quán)限 <br />4. 關(guān)閉其他用戶連接的權(quán)限 |
9、打開(kāi)瀏覽器,訪問(wèn)http://IP:15672
10、使用剛才創(chuàng)建的用戶登錄:
3. RabbitMQ常用操作命令
# 前臺(tái)啟動(dòng)Erlang VM和RabbitMQ
rabbitmq-server
# 后臺(tái)啟動(dòng)
rabbitmq-server -detached
# 停止RabbitMQ和Erlang VM
rabbitmqctl stop
# 查看所有隊(duì)列
rabbitmqctl list_queues
# 查看所有虛擬主機(jī)
rabbitmqctl list_vhosts
# 在Erlang VM運(yùn)行的情況下啟動(dòng)RabbitMQ應(yīng)用
rabbitmqctl start_app
rabbitmqctl stop_app
# 查看節(jié)點(diǎn)狀態(tài)
rabbitmqctl status
# 查看所有可用的插件
rabbitmq-plugins list
# 啟用插件
rabbitmq-plugins enable <plugin-name>
# 停用插件
rabbitmq-plugins disable <plugin-name>
# 添加用戶
rabbitmqctl add_user username password
# 列出所有用戶:
rabbitmqctl list_users
# 刪除用戶:
rabbitmqctl delete_user username
# 清除用戶權(quán)限:
rabbitmqctl clear_permissions -p vhostpath username
# 列出用戶權(quán)限:
rabbitmqctl list_user_permissions username
# 修改密碼:
rabbitmqctl change_password username newpassword
# 設(shè)置用戶權(quán)限:
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*"
# 創(chuàng)建虛擬主機(jī):
rabbitmqctl add_vhost vhostpath
# 列出所以虛擬主機(jī):
rabbitmqctl list_vhosts
# 列出虛擬主機(jī)上的所有權(quán)限:
rabbitmqctl list_permissions -p vhostpath
# 刪除虛擬主機(jī):
rabbitmqctl delete_vhost vhost vhostpath
# 移除所有數(shù)據(jù),要在 rabbitmqctl stop_app 之后使用:
rabbitmqctl reset
4. RabbitMQ工作流程詳解
4.1 生產(chǎn)者發(fā)送消息的流程
- 生產(chǎn)者連接RabbitMQ,建立TCP連接( Connection),開(kāi)啟信道(Channel)
- 生產(chǎn)者聲明一個(gè)Exchange(交換器),并設(shè)置相關(guān)屬性,比如交換器類型、是否持久化等
- 生產(chǎn)者聲明一個(gè)隊(duì)列井設(shè)置相關(guān)屬性,比如是否排他、是否持久化、是否自動(dòng)刪除等
- 生產(chǎn)者通過(guò) bindingKey (綁定Key)將交換器和隊(duì)列綁定( binding )起來(lái)
- 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含 routingKey (路由鍵)、交換器等信息
- 相應(yīng)的交換器根據(jù)接收到的 routingKey 查找相匹配的隊(duì)列。
- 如果找到,則將從生產(chǎn)者發(fā)送過(guò)來(lái)的消息存入相應(yīng)的隊(duì)列中。
- 如果沒(méi)有找到,則根據(jù)生產(chǎn)者配置的屬性選擇丟棄還是回退給生產(chǎn)者
- 關(guān)閉信道。
- 關(guān)閉連接。
4.2 消費(fèi)者接收消息的過(guò)程
- 消費(fèi)者連接到RabbitMQ Broker ,建立一個(gè)連接(Connection ) ,開(kāi)啟一個(gè)信道(Channel) 。
- 消費(fèi)者向RabbitMQ Broker 請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,可能會(huì)設(shè)置相應(yīng)的回調(diào)函數(shù), 以及做一些準(zhǔn)備工作
- 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息, 消費(fèi)者接收消息。
- 消費(fèi)者確認(rèn)( ack) 接收到的消息。
- RabbitMQ 從隊(duì)列中刪除相應(yīng)己經(jīng)被確認(rèn)的消息。
- 關(guān)閉信道。
- 關(guān)閉連接。
4.3 案例
詳見(jiàn) demo_02
4.4 Connection 和 Channel關(guān)系
生產(chǎn)者和消費(fèi)者,需要與RabbitMQ Broker 建立TCP連接,也就是Connection 。一旦TCP 連接建立起來(lái),客戶端緊接著創(chuàng)建一個(gè)AMQP 信道(Channel),每個(gè)信道都會(huì)被指派一個(gè)唯一的ID。信道是建立在Connection 之上的虛擬連接, RabbitMQ 處理的每條AMQP 指令都是通過(guò)信道完成的。

**為什么不直接使用TCP連接,而是使用信道? **
RabbitMQ 采用類似NIO的做法,復(fù)用TCP 連接,減少性能開(kāi)銷,便于管理。
當(dāng)每個(gè)信道的流量不是很大時(shí),復(fù)用單一的Connection 可以在產(chǎn)生性能瓶頸的情況下有效地節(jié)省TCP 連接資源。
當(dāng)信道本身的流量很大時(shí),一個(gè)Connection 就會(huì)產(chǎn)生性能瓶頸,流量被限制。需要建立多個(gè)Connection ,分?jǐn)傂诺?。具體的調(diào)優(yōu)看業(yè)務(wù)需要。
信道在AMQP 中是一個(gè)很重要的概念,大多數(shù)操作都是在信道這個(gè)層面進(jìn)行的。
channel.exchangeDeclare
channel.queueDeclare
channel.basicPublish
channel.basicConsume
// ...
RabbitMQ 相關(guān)的API與AMQP緊密相連,比如channel.basicPublish 對(duì)應(yīng)AMQP 的Basic.Publish命令。
4.5 RabbitMQ工作模式詳解
https://www.rabbitmq.com/getstarted.htm
4.5.1 Work Queue(direct)
生產(chǎn)者發(fā)消息,啟動(dòng)多個(gè)消費(fèi)者實(shí)例來(lái)消費(fèi)消息,每個(gè)消費(fèi)者僅消費(fèi)部分信息,可達(dá)到負(fù)載均衡的效果。

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè)消息隊(duì)列
channel.queueDeclare("queue.wq", true, false, false, null);
// 聲明一個(gè) 交換器
channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
// 將消息隊(duì)列和交換器綁定,并制定綁卡鍵
channel.queueBind("queue.wq", "ex.wq", "key.wq");
for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.wq", "key.wq", null, ("工作隊(duì)列" + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè)消息隊(duì)列
channel.queueDeclare("queue.wq", true, false, false, null);
channel.basicConsume("queue.wq", new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("推送來(lái)的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("CancelCallback:" + consumerTag);
}
});
}
}
4.5.2 發(fā)布訂閱(fanout)
使用fanout類型交換器,routingKey忽略。每個(gè)消費(fèi)者定義生成一個(gè)隊(duì)列并綁定到同一個(gè)Exchange,每個(gè)消費(fèi)者都可以消費(fèi)到完整的消息。
消息廣播給所有訂閱該消息的消費(fèi)者。
在RabbitMQ中,生產(chǎn)者不是將消息直接發(fā)送給消息隊(duì)列,實(shí)際上生產(chǎn)者根本不知道一個(gè)消息被發(fā)送到哪個(gè)隊(duì)列。
生產(chǎn)者將消息發(fā)送給交換器。交換器非常簡(jiǎn)單,從生產(chǎn)者接收消息,將消息推送給消息隊(duì)列。交換器必須清楚地知道要怎么處理接收到的消息。應(yīng)該是追加到一個(gè)指定的隊(duì)列,還是追加到多個(gè)隊(duì)列,還是丟棄。規(guī)則就是交換器類型。

交換器的類型前面已經(jīng)介紹過(guò)了: direct 、 topic 、 headers 和 fanout 四種類型。發(fā)布訂閱使用fanout。創(chuàng)建交換器,名字叫 logs :
channel.exchangeDeclare("logs", "fanout");
`fanout` 交換器很簡(jiǎn)單,從名字就可以看出來(lái)(用風(fēng)扇吹出去),將所有收到的消息發(fā)送給它知道的所有的隊(duì)列。
rabbitmqctl list_exchanges
列出RabbitMQ的交換器,包括了 amq.* 的和默認(rèn)的(未命名)的交換器。

未命名交換器
在前面的那里中我們沒(méi)有指定交換器,但是依然可以向隊(duì)列發(fā)送消息。這是因?yàn)槲覀兪褂昧四J(rèn)的交換器。
channel.basicPublish("", "hello", null, message.getBytes());
第一個(gè)參數(shù)就是交換器名稱,為空字符串。直接使用routingKey向隊(duì)列發(fā)送消息,如果該routingKey指定的隊(duì)列存在的話
現(xiàn)在,向指定的交換器發(fā)布消息:
channel.basicPublish("logs", "", null, message.getBytes());
臨時(shí)隊(duì)列
前面我們使用隊(duì)列的名稱,生產(chǎn)者和消費(fèi)者都是用該名稱來(lái)發(fā)送和接收該隊(duì)列中的消息。
首先,我們無(wú)論何時(shí)連接RabbitMQ的時(shí)候,都需要一個(gè)新的,空的隊(duì)列。我們可以使用隨機(jī)的名字創(chuàng)建隊(duì)列,也可以讓服務(wù)器幫我們生成隨機(jī)的消息隊(duì)列名字。
其次,一旦我們斷開(kāi)到消費(fèi)者的連接,該隊(duì)列應(yīng)該自動(dòng)刪除。
String queueName = channel.queueDeclare().getQueue();
上述代碼我們聲明了一個(gè)非持久化的、排他的、自動(dòng)刪除的隊(duì)列,并且名字是服務(wù)器隨機(jī)生成的。
queueName一般的格式類似: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。
綁定

在創(chuàng)建了消息隊(duì)列和 fanout 類型的交換器之后,我們需要將兩者進(jìn)行綁定,讓交換器將消息發(fā)送給該隊(duì)列
channel.queueBind(queueName, "logs", "");
此時(shí), logs 交換器會(huì)將接收到的消息追加到我們的隊(duì)列中。
可以使用下述命令列出RabbitMQ中交換器的綁定關(guān)系:
rabbitmqctl list_bindings

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) fanout 類型的交換器
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
for (int i = 0; i < 20; i++) {
// fanout 類型的交換器不需要指定路由鍵
channel.basicPublish("ex.myfan", "", null, ("hello word fan " + i).getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Consumer3 {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明零時(shí)隊(duì)列,隊(duì)列的名字由RabbitMQ自動(dòng)生成
String queueName = channel.queueDeclare().getQueue();
System.out.println("生成的零時(shí)隊(duì)列名字為:" + queueName);
// 聲明一個(gè) fanout 類型的交換器
channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
// fanout 類型的交換器不需要綁定路由鍵
channel.queueBind(queueName, "ex.myfan", "");
channel.basicConsume(queueName, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
消費(fèi)者日志
生成的零時(shí)隊(duì)列名字為:amq.gen-xcaoVeYdLVWP3OHpHjNzXQ
收到的消息:hello word fan 0
收到的消息:hello word fan 1
收到的消息:hello word fan 2
收到的消息:hello word fan 3
收到的消息:hello word fan 4
收到的消息:hello word fan 5
收到的消息:hello word fan 6
收到的消息:hello word fan 7
收到的消息:hello word fan 8
收到的消息:hello word fan 9
收到的消息:hello word fan 10
收到的消息:hello word fan 11
收到的消息:hello word fan 12
收到的消息:hello word fan 13
收到的消息:hello word fan 14
收到的消息:hello word fan 15
收到的消息:hello word fan 16
收到的消息:hello word fan 17
收到的消息:hello word fan 18
收到的消息:hello word fan 19
4.5.3 路由模式
使用 direct 類型的Exchange,發(fā)N條消費(fèi)并使用不同的 routingKey ,消費(fèi)者定義隊(duì)列并將隊(duì)列、routingKey 、Exchange綁定。此時(shí)使用 direct 模式Exchagne必須要 routingKey 完全匹配的情況下消息才會(huì)轉(zhuǎn)發(fā)到對(duì)應(yīng)的隊(duì)列中被消費(fèi)。
現(xiàn)在我們想讓接收者只接收部分消息,如,我們通過(guò)直接模式的交換器將關(guān)鍵的錯(cuò)誤信息記錄到log文件,同時(shí)在控制臺(tái)正常打印所有的日志信息。
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Random;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};
private static Random random = new Random();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
for (int i = 0; i < 100; i++) {
String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
channel.basicPublish("ex.routing", level, null, ("這是[" + level + "]消息").getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
channel.queueDeclare("queue.error", false, false, false, null);
// 消息隊(duì)列 綁定 路由鍵
channel.queueBind("queue.error", "ex.routing", "ERROR");
channel.basicConsume("queue.error", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerFatal {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
channel.queueDeclare("queue.fatal", false, false, false, null);
// 消息隊(duì)列 綁定 路由鍵
channel.queueBind("queue.fatal", "ex.routing", "FATAL");
channel.basicConsume("queue.fatal", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("FATAL 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerWarn {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) DIRECT 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.routing", BuiltinExchangeType.DIRECT, false, false, null);
// 此處也可以聲明為零時(shí)的消息隊(duì)列,但看消息是否重要
channel.queueDeclare("queue.warn", false, false, false, null);
// 消息隊(duì)列 綁定 路由鍵
channel.queueBind("queue.warn", "ex.routing", "WARN");
channel.basicConsume("queue.warn", new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("WARN 收到的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
}
}
4.5.4 主題模式
使用 topic 類型的交換器,隊(duì)列綁定到交換器、 bindingKey 時(shí)使用通配符,交換器將消息路由轉(zhuǎn)發(fā)到具體隊(duì)列時(shí)會(huì)根據(jù)消息 routingKey 模糊匹配,比較靈活。
上個(gè)模式中,我們通過(guò) direct 類型的交換器做到了根據(jù)日志級(jí)別的不同,將消息發(fā)送給了不同隊(duì)列的。
這里有一個(gè)限制,加入現(xiàn)在我不僅想根據(jù)日志級(jí)別劃分日志消息,還想根據(jù)日志來(lái)源劃分日志,怎么做?
比如,我想監(jiān)聽(tīng)cron服務(wù)發(fā)送的 error 消息,又想監(jiān)聽(tīng)從kern服務(wù)發(fā)送的所有消息。
此時(shí)可以使用RabbitMQ的主題模式( Topic )
要想 topic 類型的交換器, routingKey 就不能隨便寫了,它必須得是點(diǎn)分單詞。單詞可以隨便寫,生產(chǎn)中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。**該點(diǎn)分單詞字符串最長(zhǎng)255字節(jié) **
bindingKey 也必須是這種形式。 topic 類型的交換器背后原理跟 direct 類型的類似:只要隊(duì)列的 bindingKey 的值與消息的 routingKey 匹配,隊(duì)列就可以收到該消息。有兩個(gè)不同:
-
*(star)匹配一個(gè)單詞 -
#匹配0到多個(gè)單詞

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Random;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class Producer {
private static final String[] LOG_LEVEL = {"ERROR", "FATAL", "WARN"};
private static final String[] LOG_AREA = {"beijing", "shanghai", "chengdu"};
private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
String routingKey, message;
String level, area, biz;
for (int i = 0; i < 100; i++) {
level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];
// 路由鍵由多個(gè)維度組成
routingKey = area + "." + biz + "." + level;
message = "LOG:[" + level + "]:這是 [" + area + "] 地址 [" + biz + "] 服務(wù)器發(fā)來(lái)的消息,MSQ_SEQ = " + i;
channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
}
channel.close();
connection.close();
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerBeijing {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
// 消息隊(duì)列 綁定 路由鍵
// 只要是以 beijing 開(kāi)頭的,不管后邊有幾個(gè)點(diǎn)分單詞都可以接受
channel.queueBind(queue, "ex.topic", "beijing.#");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("Beijing 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerChengduEdu {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
// 消息隊(duì)列 綁定 路由鍵
channel.queueBind(queue, "ex.topic", "chengdu.edu-online.*");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("chengdu edu-online 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
// 消息隊(duì)列 綁定 路由鍵
// 不管前面有幾個(gè)點(diǎn)分單詞,只有最后一個(gè)單詞是ERROR
channel.queueBind(queue, "ex.topic", "#.ERROR");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("ERROR 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jie.luo
* @since 2021/1/25
*/
public class ConsumerShanghaiError {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@192.168.110.151:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明一個(gè) TOPIC 類型的交換器 , 交換器和消息隊(duì)列的綁定不需要在這里處理
channel.exchangeDeclare("ex.topic", BuiltinExchangeType.TOPIC, false, false, null);
// 零時(shí)隊(duì)列
String queue = channel.queueDeclare().getQueue();
// 消息隊(duì)列 綁定 路由鍵
channel.queueBind(queue, "ex.topic", "shanghai.*.ERROR");
channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println("shanghai error 收到的消息:" + new String(message.getBody(), "utf-8"));
}, consumerTag -> {
});
}
}