RabbitMQ

MQ簡介

什么是MQ?

消息總線(Message Queue),一組跨進程、異步的通信機制,用于上下游消息傳遞。由消息系統(tǒng)來確保消息的可靠性。
MQ作用
應用解耦、異步通信、流量削峰、數(shù)據(jù)分發(fā)、錯峰流控、日志收集等。
MQ衡量標準
服務性能、數(shù)據(jù)存儲、集群架構(gòu)

主流競品分析

當前市場MQ產(chǎn)品很多,比如RocketMQ、ActiveMQ、Kafka、RabbitMQ、ZeroMQ,甚至redis也支持MQ功能。

ActiveMQ

apache出品的開源消息總線,完全支持JMS規(guī)范的消息中間件,提供豐富API,在中小企業(yè)應用廣泛。
性能
ActiveMQ在性能方便相對于其他MQ產(chǎn)品,性能較差,在高并發(fā)場景會出現(xiàn)阻塞、延遲、堆積。
存儲
Active默認基于內(nèi)存的KahaDB存儲,如果要保證消息的可靠性,可以采用關(guān)系數(shù)據(jù)庫存儲。
集群
ActiveMQ支持master-salave模式、和NETWORK模式。其集群高可用性借助Zookeeper實現(xiàn)。

master-salave模式

master-salave:通過Zookeeper進行集群管理,由Master節(jié)點對外提供服務,Master節(jié)點出現(xiàn)故障,由Zookeeper會高效的下掉Master節(jié)點,由Salave節(jié)點提供服務。
NETWORK模式:兩套master-salave,由網(wǎng)絡通聯(lián),組成分布式集群。
NETWORK模式

Kafka

LinkedIn開源的發(fā)布-訂閱系統(tǒng),目前是歸屬于apache頂級項目。Kafka基于Pull模式來處理消息消費,追求高吞吐量,設計目的就是用于日志收集和處理。Kafka不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求。適用于大量數(shù)據(jù)的互聯(lián)網(wǎng)服務的數(shù)據(jù)收集。Kafk基于操作系統(tǒng)的底層Page Cache實現(xiàn)高效的讀寫,僅使用內(nèi)存管理,不存在內(nèi)存與磁盤之前的IO操作。Kafka借助Zookeeper實現(xiàn)高可用性。


kafka集群

RokectMQ

阿里開源消息中間件,孵化為apache頂級項目。Rocket使用純java開發(fā),具有高吞吐、高可靠性、適合大型分布式系統(tǒng)應用特點。RoekectMQ 2.X集群基于Zookeeper管理,3.X集群基于NameServer管理。
RokectMQ能夠保證消息的順序消費,提供了豐富的消息拉取等處理模式,開發(fā)者可以高效的進行水平擴展,能夠承載上億數(shù)據(jù)量級。
RockerMQ集群支持Master-Salave、雙Master-Salave、多主多從模式。
存儲技術(shù)支持同步雙寫、異步復制,使用零拷貝技術(shù)。
RockerMQ缺點是上下游交互核心不開源,社區(qū)提供的JMS其性能、兼容性、可靠性無法保證。


Rocket

RabbitMQ簡介

RabbitMQ是開源的消息代理和隊列服務器,用于通過普通協(xié)議在不同應用間共享數(shù)據(jù)。RabbitMQ使用Erlang語言,基于AMQP協(xié)議實現(xiàn)。

AMQP

AMQP(Advanced Messae Queueing Protocol)高級消息隊列協(xié)議: 面向消息中間件的開放式標準應用層協(xié)議,定義了以下特性:

  • 消息方向
  • 消息隊列
  • 消息路由(包括點對點、發(fā)布訂閱)
  • 可靠性
  • 安全性

AMQP要求消息的提供者和客戶端接受者的行為實現(xiàn)對不同供應商可以使用相同的方式進行互相操作。AMQP增減了Exchange和Bindings角色。生產(chǎn)者(Priducer)把消息發(fā)布到Exchange上,消息最終達到隊列并被消費者接受,而Bindings決定Exchange消息應該達到那個隊列。


JMS

JMS(Java Message Service): java消息服務應用程序接口,是java平臺中關(guān)于面向消息中間件(MOM)的API,JMS定義了一個API以及一組消息收發(fā)必須實現(xiàn)的行為。

RabbitMQ優(yōu)勢

  • 可靠性:使用了如持久化、傳輸確認、發(fā)布確認等機制來保證消息的可靠性;
  • 靈活的路由:在消息進入隊列之前,通過Exchange來路由消息。Rabbit通過內(nèi)置的Exchange已經(jīng)實現(xiàn)了典型的路由。針對復雜路由可以通過將多個Exchange綁定組合或者通過插件機制實現(xiàn)自己的路由規(guī)則;
  • 消息集群(Clustering):多個MQ服務器可以組成一個集群,形成一個邏輯Broker;
  • 高可用性(HA):隊列可以在集群集群上進行鏡像,使得其中一個節(jié)點故障情況,隊列仍可以使用;
  • 多協(xié)議支持:支持多種消息隊列協(xié)議,如STOMP、MQTT等;
  • 管理界面:提供易于使用的管理界面,可以方便監(jiān)控和管理消息隊列;
  • 追蹤機制:Rabbit提供了消息追蹤機制,可以找出消息出現(xiàn)的問題;
  • 插件機制:提供許多插件,也可以自己實現(xiàn)插件。

基本概念

  • Broker:消息隊列服務器的實體,多個Rabbit服務器可以組成一個邏輯Broker,負責接收生產(chǎn)者的消息,并將消息轉(zhuǎn)發(fā)給其他Broker或消息消費者;
  • Exchange:消息交換機,消息第一個到達的地方,消息通過Exchange指定的路由規(guī)則分發(fā)到不同的隊列;
  • Queue:消息隊列,消息通過發(fā)送和路由到達的地方,到達Queue后,消息進入等待消費狀態(tài)。每個消息可以發(fā)送到一個或多個隊列;
  • Binding:把Exchange和Queue按照路由規(guī)則綁定起來,Exchange和Queue之間的虛擬連接;
  • Routing Key:路由關(guān)鍵字,Exchange通過這個關(guān)鍵字進行消息傳遞;
  • Virtual host: 虛擬主機,對Broker的虛擬劃分,將消費者、生產(chǎn)者和他們依賴的- AMQP服務進行隔離,每個vHost都可以看做是獨立的mini版RabbitMQ服務器。vHost是AMQP的基礎,在鏈接時必須指定,RabbitMQ 默認是 /;
  • Channel:消費通道,連接生產(chǎn)者和消費者的邏輯結(jié)構(gòu),多路復用連接中的一條雙向數(shù)據(jù)流通道。Channel是建立在真實TCP連接內(nèi)的虛擬連接,AMQP命令都是通過Channel發(fā)出,發(fā)布消息、訂閱隊列、接受消息這些動作都是通過Channel完成;
  • Producer: 消息生產(chǎn)者;
  • Customer:消息消費者;
  • Collection:生產(chǎn)者和消費者之間通信的物理網(wǎng)絡。


Exchange類型

Exchange分發(fā)消息時,各種類型不同分發(fā)策略也有所不同。目前主要有三種類型:

  • Direct Exchange:完全根據(jù)路由Key投遞消息,路由key與隊列名一致。消息中的Routing Key與Binding的 binding key 一致時,交換器就將消息分發(fā)到對應隊列。
  • Fanout Exchange:完全不適用key,采用廣播模式,一有消息進來就會分發(fā)到所有綁定的隊列。Fanout類型交換機轉(zhuǎn)發(fā)消息時最快的。
  • Topic Exchange:對key進行模糊匹配后投遞。它將Routing Key和Binding Key的字符串切分為單詞,單詞之間用點隔開。Topic只識別#號和號兩個通配符,#號匹配一個或多個單詞,號匹配一個單詞。例如,abc.#匹配abc.ghi.chf,abc.*匹配abc.ghi。

RabbitMQ持久化

RabbitMQ支持消息的持久化,消息持久化包括三部分:

  • Exchange持久化:在聲明時指定durable =>1;
  • Queue持久化:在聲明時指定durable =>1;
  • 消息持久化:在投遞消息時指定delivery_mode=>2(1是非持久化)

如果Exchange和Queue都是持久化的,那么Binding也是持久化的,如果Exchange和Queue之間有一個是持久化,一個是非持久化那么不允許綁定。

TTL

TTL(Time To Live):生存時間,RabbitMQ支持消息的過期時間,一共兩種:

  • 在發(fā)送消息時指定,通過配置消息體的properties,可以指定消息的過期時間;
  • 在創(chuàng)建Exchange是指定,消息進入隊列開始計算,超過隊列的超時時間配置,那么消息就會自動清除。

死信隊列DLX

死信隊列(DLX Dead-Letter-Exchange):利用DLX,當消息在一個隊列變成死信(Dead-Message)之后,它能重新pulish到另一個Exchange,這個Exchange就是DLX。DLX也是正常Exchange,和一般Exchang沒有區(qū)別,能在任何隊列被指定,實際就是設定某個隊列屬性。
消息變成死信的場景

  • 消息被拒絕(basic.reject/basic.nack)并且requeue=false;
  • 消息TTL過期;
  • 隊列達到最大長度

死信隊列設置
需要設置死信隊列的Exchange和Queue,然后通過Routing Key進行綁定。只是在隊列加上一個參數(shù)即可。

       Map<String, Object> arguments = Maps.newHashMapWithExpectedSize(3);
       arguments.put("x-message-ttl", dlx-ttl);
       arguments.put("x-dead-letter-exchange", "exchange-name");
       arguments.put("x-dead-letter-routing-key", "routing-key");
       Queue ret = QueueBuilder.burable("queue-name".withArguments(arguments).build();

只需要監(jiān)聽該死信隊列即可處理死信消息。還可以通過死信隊列實現(xiàn)延遲消息。

消費端ACK和NACK

消費端進行消費時,如果由于業(yè)務異??梢赃M行日志的記錄,然后進行補償。由于服務器宕機嚴重,我們需要手動進行ACK保障消費端消費成功。
消費端重回隊列是為了對沒有處理成功消息,把消息重新返回Broker。一般,在實際應用中都會關(guān)閉重回隊列,也就是設置false。
生產(chǎn)者確認機制(confirm)

  • 消息的確認:當生產(chǎn)者投遞消息后,如果Broker收到消息,則會給生產(chǎn)者一個應答。
  • 生產(chǎn)者進行接收應答,用來確認這條消息是否正常到達Broker,這種方式也是消息可靠性的核心保障。


    AFK
  1. 開Channel開啟確認模式,Channel.confirmSelect();
  2. 在Channe上開啟監(jiān)聽,addConfrimListener,監(jiān)聽成功和失敗的處理結(jié)果,對消息進行重發(fā)或記錄日志等待進一步處理。

Return消息機制
Return Listener用于處理一些不可路由消息。
在MQ正常情況下,我們生產(chǎn)者將消息通過指定Exchange和Routing,把消息送達到某一個隊列,然后消費者監(jiān)聽這個隊列進行消息的處理。
但是在某種情況,我們發(fā)送消息時,當前Exchange不存在或指定的路由Key路由不到,這時我們監(jiān)聽不可到達消息就需要Return Listener。
在MQ基礎API中有個關(guān)鍵配置:Mandatory,如果設置true,那么監(jiān)聽器會收到不可到達消息,然后處理,如果設置false,那么Broker默認會自動刪除不可到達消息。
消費端自定義監(jiān)聽(推模式和拉模式pull/push)

  • 通過while循環(huán)進行consumer.nextDelivery()方法獲取下一條消息進行消費。(通過死循環(huán)將拉模式模擬成推模式,死循環(huán)會消耗CPU資源)。
  • 自定義consumer,實現(xiàn)更加方便、可讀性更強、解耦性更強的方式。(默認使用的模式,直接訂閱到Queue,等待MQ推送消息)。

如果是高并發(fā)場景,要實現(xiàn)高吞吐量,消費者應該使用basic.Consume方法,直接訂閱隊列,將信道設置為接收模式,直到取消隊列的訂閱。在訂閱期間,MQ會不間斷推送消息到消費者。推送的消息受到Basic.Qos限制。
如果只想從隊列獲取單條消息,那么應該使用Basic.Get,但是不能講Basic.Get放到死循環(huán)中,這樣會嚴重影響MQ性能。

消息冪等性保證

消費者實現(xiàn)了消息的冪等性,可以防止消息被多次消費。

  • 利用數(shù)據(jù)庫唯一約束去重。
  1. 創(chuàng)建消息去重庫,把全局唯一ID+指紋碼作為唯一約束,如果插入成功則表示沒有消費這條消息,插入失敗則消息已消費。
  2. 優(yōu)點:實現(xiàn)簡單
  3. 缺點:高并發(fā)場景存在數(shù)據(jù)庫寫入瓶頸
  4. 解決方案:根據(jù)ID進行分庫分表進行算法路由
  • 使用狀態(tài)機或者版本號,基于數(shù)據(jù)庫樂觀鎖CAS方式
  • 版本號機制
update t_payment set orderId = #{orderId} , money=#{money},  payStatus=#{payStatus}  version=#{ version } +1 
where id=#{id} and version=#{version}
  • 狀態(tài)機機制
update table set status = 2... where status = 1
if(!update)
進行業(yè)務處理
else
重復處理
  • 利用redis原子性實現(xiàn)
    redis是單線程的,且性能非常好,并且提供許多原子命名。利用redis.setnx命令,將消息唯一ID作為主鍵執(zhí)行setnx,
    如果執(zhí)行成功,那么消息未被消費,如果執(zhí)行失敗表示消息已消費

消息可靠性保證

  • 消息成功發(fā)出
  • 保障MQ節(jié)點成功接受到消息
  • 發(fā)送端收到MQ節(jié)點(Broker)確認應答
  • 完善補償機制

解決方案

  1. 消息落庫,對消息狀態(tài)進行變更



    缺點是對數(shù)據(jù)庫有多次操作,不適合高并發(fā)場景。

  2. 消息的延遲投遞,做二次確認,回調(diào)檢查


拆分出一個回調(diào)服務,將落庫、狀態(tài)檢查等操作安排到回調(diào)服務中。
1:消息發(fā)送者發(fā)送信息到MQ,消費者為下游業(yè)務方。
1.1 成功后,作為發(fā)送方發(fā)送消息到MQ,消費者為回調(diào)服務
1.1.1 回調(diào)服務接受數(shù)據(jù)后,落庫。
2.2 失敗,等待發(fā)送者的延時投遞信息
2:發(fā)送者發(fā)送延時投遞信息到MQ,消費者為回調(diào)服務
2.1 查庫,確認下游消費已成功
2.2 確認下游消費已失敗,通過RPC接口調(diào)用發(fā)送者的接口重新發(fā)送
回調(diào)模式減少了數(shù)據(jù)庫操作,但是不能保證消息的百分之百可靠。

MQ限流

當海量消息瞬時推送過來,消費者無法同時處理那么多數(shù)據(jù),嚴重甚至導致宕機,這時需要流量削峰。
RabbitMQ提供了一直Qos(服務質(zhì)量保證)功能。即在非自動確認消息的前提下(非ACK),如果一定數(shù)目的消息(通過基于consume或channel的qos參數(shù)設置)未被確認前,不進行消費新的消息。

RabbitMQ集群

RabbitMQ最大的優(yōu)勢就是內(nèi)建集群,其設計目的是允許消費者和生產(chǎn)者在節(jié)點崩潰的情況下繼續(xù)運行,以及添加更多節(jié)點來線性擴展消息通信的吞吐量。
RabbitMQ會始終記錄四種內(nèi)部元數(shù)據(jù):

  • 隊列元數(shù)據(jù):包括隊列名稱和屬性,比如是否支持持久化、是否自動刪除。
  • 交換器元數(shù)據(jù):交換器名稱、類型、屬性。
  • 綁定元數(shù)據(jù):內(nèi)部是一張表格,記錄如何將消息路由到隊列。
  • vhost元數(shù)據(jù): 為vhost內(nèi)部的隊列、交換器、綁定提供命名空間和安全屬性。
    RabbitMQ會將這些信息全部保存到內(nèi)存中,同時將標記為可持久化的隊列、交換器、綁定存儲到磁盤中。存儲到磁盤保證節(jié)點重啟隊列和交換器能夠重建。
    集群配置方式
    http://www.itdecent.cn/p/b7cc32b94d2a
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容