RabbitMQ快速入門

什么是消息中間件?

定義一:消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。

定義二:消息隊(duì)列中間件(Message Queue Middleware,MQ)是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。

目前開源的主流消息中間件包括:RabbitMQ、Kafka、ActiveMQ、RocketMQ等。

消息隊(duì)列中間件(也可以稱為消息隊(duì)列或者消息中間件)一般有兩種傳遞模式:點(diǎn)對(duì)點(diǎn)模式(P2P)和發(fā)布/訂閱模式(Pub/Sub)。

· 點(diǎn)對(duì)點(diǎn)模式是基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列接收消息,隊(duì)列的存在使得消息的異步傳輸成為可能。

· 發(fā)布/訂閱模式定義了如何向一個(gè)內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這個(gè)內(nèi)容節(jié)點(diǎn)稱為主題。主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布者將消息發(fā)布到某個(gè)主題,而消息訂閱者則從主題中訂閱消息。主題使得消息的訂閱者與消息的發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布/訂閱模式在消息的一對(duì)多廣播時(shí)采用。

面向消息的中間件(Message Oriented Middleware,MOM)提供了以松散耦合的靈活方式集成應(yīng)用的一種機(jī)制:

它們提供了基于存儲(chǔ)和轉(zhuǎn)發(fā)的應(yīng)用程序之間的異步數(shù)據(jù)的發(fā)送,即應(yīng)用之間彼此不直接通信,而是與作為中介的消息中間件通信,消息中間件提供了有保證的消息發(fā)送,應(yīng)用程序開發(fā)人員無須了解遠(yuǎn)程過程調(diào)用(RPC)和網(wǎng)絡(luò)通信協(xié)議的細(xì)節(jié)。

消息中間件適用于需要可靠數(shù)據(jù)傳輸?shù)姆植际江h(huán)境,采用消息中間件的系統(tǒng)中,不同對(duì)象之間通過傳遞消息來激活對(duì)方的事件完成相應(yīng)操作。發(fā)送者將消息發(fā)送給消息服務(wù)器,消息服務(wù)器將消息存放在若干隊(duì)列中,在合適的時(shí)候再將消息轉(zhuǎn)發(fā)給接收者。

消息中間件能在不同平臺(tái)之間通信,常被用來屏蔽各種平臺(tái)及協(xié)議之間的特性,實(shí)現(xiàn)應(yīng)用程序之間的協(xié)同。其優(yōu)點(diǎn)在于能夠在客戶和服務(wù)器之間提供同步和異步的連接,并且在任何時(shí)刻都可以將消息進(jìn)行傳送或者存儲(chǔ)轉(zhuǎn)發(fā),這是它優(yōu)于遠(yuǎn)程過程調(diào)用的原因。


消息中間件的作用

解耦:消息中間件在處理過程中插入了一個(gè)隱含的、基于數(shù)據(jù)的接口層,兩邊的處理過程都要實(shí)現(xiàn)這一接口。這允許開發(fā)者獨(dú)立地?cái)U(kuò)展或修改兩邊的處理過程,只要確保遵循相同的接口約束即可。

冗余(存儲(chǔ)):在某些情況下處理數(shù)據(jù)的過程會(huì)失敗,消息中間件可以把數(shù)據(jù)進(jìn)行持久化知道完全被處理。通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。在把一個(gè)消息從消息中間件刪除之前,需要處理系統(tǒng)明確地支出該消息已經(jīng)被處理完成,從而確保數(shù)據(jù)被安全的保存直到使用完畢。

擴(kuò)展性:消息中間件解耦了應(yīng)用處理過程,提高消息入隊(duì)和處理效率只需要增加額外的處理過程,不需要其他更改。

削峰:消息中間件可以使得關(guān)鍵組件支撐突發(fā)訪問壓力,不會(huì)因?yàn)橥话l(fā)的超負(fù)荷請(qǐng)求而完全崩潰。

可恢復(fù)性:當(dāng)一部分失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息中間件降低了進(jìn)程間的耦合度,即使一個(gè)處理消息的進(jìn)程掛掉,加入消息中間件的消息仍然可以在系統(tǒng)恢復(fù)后進(jìn)行處理。

順序保證:在大多數(shù)使用場(chǎng)景下,數(shù)據(jù)處理的順序很重要,大部分消息中間件支持一定程度上的順序性。

緩沖:在任何重要的系統(tǒng)中,都會(huì)存在不同處理時(shí)間的元素。消息中間件通過一個(gè)緩沖層來幫助任務(wù)最高效率的執(zhí)行,寫入消息中間件的處理會(huì)盡可能快速。該緩沖層有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度。

異步通信:消息中間件提供了異步處理機(jī)制,允許應(yīng)用把一些消息放入消息中間件中,但不立即處理它,再之后需要的時(shí)候再慢慢處理。


相關(guān)概念介紹

生產(chǎn)者(Producer):就是投遞消息的一方。

生產(chǎn)者創(chuàng)建消息然后發(fā)布到RabbitMQ中。消息一般可以包含兩個(gè)部分:消息體(PayLoad)和標(biāo)簽(Label),在實(shí)際應(yīng)用中消息體一般是一個(gè)帶有業(yè)務(wù)邏輯結(jié)構(gòu)的數(shù)據(jù),比如一個(gè)JSON字符串,當(dāng)然也可以進(jìn)一步對(duì)消息體進(jìn)行序列化操作。消息的標(biāo)簽用來表述這條信息,比如一個(gè)交換器的名稱和一個(gè)路由鍵。生產(chǎn)者把消息交給RabbitMQ,RabbitMQ根據(jù)標(biāo)簽把消息發(fā)送給感興趣的消費(fèi)者(Consumer)。

消費(fèi)者(Consumer):就是接收消息的一方。

消費(fèi)者連接到RabbitMQ服務(wù)器,并訂閱到隊(duì)列上。當(dāng)消費(fèi)者消費(fèi)一條信息時(shí)只是消費(fèi)了消息的消息體(PayLoad),在消息路由的過程中消息的標(biāo)簽會(huì)丟棄,存入到隊(duì)列的消息只有消息體,消費(fèi)者也只會(huì)消費(fèi)到消息體,也就不知道消息的生產(chǎn)者是誰(也不需要知道)。

服務(wù)節(jié)點(diǎn)(Broker):消息中間件的服務(wù)節(jié)點(diǎn)。對(duì)于RabbitMQ來說,一個(gè)RabbitMQ Broker可以看作一個(gè)服務(wù)節(jié)點(diǎn)(服務(wù)實(shí)例),大多數(shù)情況下也可以將其看作一臺(tái)RabbitMQ的服務(wù)器。

首先生產(chǎn)者將業(yè)務(wù)方數(shù)據(jù)進(jìn)行包裝封裝成消息,發(fā)送(在AMQP協(xié)議里命令為Basic.Publish)到Broker中,消費(fèi)者訂閱并接收消息(在AMQP協(xié)議里命令為Basic.Consume或Basic.Get),經(jīng)過解包處理得到原始數(shù)據(jù)再進(jìn)行業(yè)務(wù)處理邏輯。這個(gè)業(yè)務(wù)的處理邏輯不一定和接收消息的邏輯使用同一個(gè)線程,消費(fèi)者進(jìn)程可以使用一個(gè)線程去接收消息存入內(nèi)存(如Java中的BlockingQueue),業(yè)務(wù)邏輯使用另一個(gè)線程從內(nèi)存讀取,進(jìn)一步解耦應(yīng)用,提高處理效率。

隊(duì)列(Queue):是RabbitMQ的內(nèi)部對(duì)象,用于存儲(chǔ)消息。RabbitMQ中消息只能存儲(chǔ)在隊(duì)列中,這一點(diǎn)和Kafka相反。Kafka將消息存儲(chǔ)在topic(主題)這個(gè)邏輯層面,而相對(duì)應(yīng)的隊(duì)列邏輯知識(shí)topic實(shí)際存儲(chǔ)文件中的位移標(biāo)識(shí)。

RabbitMQ的生產(chǎn)者生產(chǎn)消息并最終投遞到隊(duì)列中,消費(fèi)者可以從隊(duì)列中獲取消息并消費(fèi)。多個(gè)消費(fèi)者可以訂閱同一個(gè)隊(duì)列,這時(shí)隊(duì)列中的消息會(huì)通過輪詢策略平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行消費(fèi),而不是每個(gè)消費(fèi)者都收到所有的消息進(jìn)行處理。RabbitMQ不支持隊(duì)列層面的廣播消費(fèi),雖然可以二次開發(fā)實(shí)現(xiàn)功能但會(huì)使處理邏輯異常復(fù)雜,不建議這樣做。

交換器(Exchange)、路由鍵(RoutingKey)、綁定(Binding):實(shí)際上生產(chǎn)者不會(huì)直接將消息投遞到隊(duì)列中,而是生產(chǎn)者將消息發(fā)送到交換器,由交換器將消息路由到一個(gè)或多個(gè)隊(duì)列中(如果路由不到,可能會(huì)返回給多個(gè)消費(fèi)者,也可能直接丟棄)。

RabbitMQ中的交換器有四種,每種類型的交換器有不同的策略。生產(chǎn)者將消息發(fā)送給交換器的時(shí)候,一般會(huì)指定一個(gè)路由鍵用來指定這個(gè)消息的路由規(guī)則,路由鍵需要和綁定鍵(BindingKey)聯(lián)合使用才能生效。在交換器類型和綁定鍵固定的情況下,生產(chǎn)者可以通過指定路由鍵來決定信息流向哪里。在綁定多個(gè)隊(duì)列到同一個(gè)交換器的時(shí)候,這些綁定允許使用相同的綁定鍵,但綁定鍵是否生效取決于交換器類型。


交換器類型

RabbitMQ常用的交換器類型有 fanout、direct、topic、headers 四種,AMQP協(xié)議里還提供了 System 和 自定義 兩種類型。

fanout:會(huì)把所有發(fā)送到該交換器的消息路由到所有與該交換器綁定的隊(duì)列中。

direct:會(huì)把消息路由到那些BindingKey和RoutingKey完全匹配的隊(duì)列中。

topic:與direct的匹配規(guī)則類似,但增加了模糊匹配。

我們將使用點(diǎn)號(hào)分割的字符串稱為單詞(適用于RoutingKey和BindingKey),在BindingKey中存在兩種特殊字符 * 和 # 用于模糊匹配,其中 * 用于匹配一個(gè)單詞 # 用于匹配多規(guī)格單詞(也可以是0個(gè))。

headers:不依賴于路由鍵的匹配規(guī)則來路由消息,而是根據(jù)發(fā)送的消息內(nèi)容中的headers屬性進(jìn)行匹配。

在綁定隊(duì)列和交換器時(shí)指定一組鍵值對(duì),當(dāng)發(fā)送消息到交換器時(shí),RabbitMQ會(huì)獲取到該消息的headers(即一個(gè)鍵值對(duì)的形式)對(duì)比其中的將支隊(duì)是否完全匹配隊(duì)列和交換器綁定時(shí)指定的鍵值對(duì)。如果完全匹配則消息會(huì)路由到該隊(duì)列,否則就不會(huì)。headers類型的交換器性能很差且不實(shí)用,基本上不會(huì)被使用。


RabbitMQ的安裝運(yùn)行

· 安裝Erlang:yum install erlang

· 安裝RabbitMQ:yum install rabbitmq-server

· 啟動(dòng)RabbitMQ:rabbitmq-server

· 驗(yàn)證啟動(dòng):rabbitmqctl status

· 驗(yàn)證集群:rabbitmqctl cluster_status


生產(chǎn)消息的工作流程

· 生產(chǎn)者連接到RabbitMQ Broker,建立一個(gè)連接(Connection),開啟一個(gè)信道(Channel)

· 生產(chǎn)者聲明一個(gè)交換器,并設(shè)置相關(guān)屬性(類型、持久化等)

· 生產(chǎn)者聲明一個(gè)隊(duì)列,并設(shè)置相關(guān)屬性(排他、持久化、自動(dòng)刪除等)

· 生產(chǎn)者通過路由鍵將交換器和隊(duì)列綁定起來

· 生產(chǎn)者發(fā)送消息至RabbitMQ Broker,其中包含路由鍵、交換器等信息

· 相應(yīng)的交換器根據(jù)接收到的路由鍵查找相匹配的隊(duì)列

· 如果找到則將生產(chǎn)者發(fā)送的消息存入相應(yīng)的隊(duì)列中,如果沒有則根據(jù)配置選額丟棄還是退回

· 關(guān)閉信道和連接


RabbitMQ的消息生產(chǎn)

代碼片段

消費(fèi)消息的工作流程

· 消費(fèi)者連接到RabbitMQ Broker,建立一個(gè)連接(Connection),開啟一個(gè)信道(Channel)

· 消費(fèi)者向RabbitMQ Broker請(qǐng)求消費(fèi)相應(yīng)隊(duì)列中的消息,可能會(huì)設(shè)置相應(yīng)的回調(diào)函數(shù)及一些準(zhǔn)備工作

· 等到RabbitMQ Broker回應(yīng)并投遞相應(yīng)隊(duì)列中的消息,接收消息

· 消費(fèi)者確認(rèn)(ack)接收到的消息

· RabbitMQ從隊(duì)列中刪除相應(yīng)已經(jīng)被確認(rèn)的消息

· 關(guān)閉信道和連接


RabbitMQ的消息消費(fèi)

代碼片段

AMQP協(xié)議介紹

RabbitMQ是遵從AMQP協(xié)議的Erlang實(shí)現(xiàn)(RabbitMQ還支持STOMP、MQTT等協(xié)議),AMQP的模型架構(gòu)和RabbitMQ的模型架構(gòu)是一樣的,RabbitMQ中的交換器、交換器類型、隊(duì)列、綁定、路由鍵等都是遵循AMQP協(xié)議中的相應(yīng)概念。

AMQP協(xié)議本身包括三層:

· Module Layer:協(xié)議最高層,主要定義了一些供客戶端調(diào)用的命令,客戶端可以利用這些命令實(shí)現(xiàn)業(yè)務(wù)邏輯。

· Session Layer:中間層,主要負(fù)責(zé)將客戶端的命令發(fā)送給服務(wù)器,再將服務(wù)端的應(yīng)答返回給客戶端,主要為客戶端與服務(wù)器之間的通信提供可靠性同步機(jī)制和錯(cuò)誤處理。

· Transport Layer:最底層,主要傳輸二進(jìn)制數(shù)據(jù)流,提供幀的處理、信道復(fù)用、錯(cuò)誤檢測(cè)和數(shù)據(jù)表示等

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前面用三篇文章斷斷續(xù)續(xù)寫了Celery+RabbitMQ相關(guān)的文章。爬蟲架構(gòu)|Celery+RabbitMQ快速入...
    小怪聊職場(chǎng)閱讀 7,363評(píng)論 0 39
  • 1、相關(guān)概念 RabbitMQ是一個(gè)消息代理,事實(shí)上,它接收生產(chǎn)者產(chǎn)生的消息,然后將消息傳遞給消費(fèi)者。在這個(gè)過程中...
    AubreyXue閱讀 2,594評(píng)論 0 37
  • # HelloWorld 簡介 RabbitMQ:接受消息再傳遞消息,可以視為一個(gè)“郵局”。發(fā)送者和接受者通過隊(duì)列...
    xncode閱讀 1,519評(píng)論 0 1
  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,438評(píng)論 0 24
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,209評(píng)論 2 11

友情鏈接更多精彩內(nèi)容