(一)消息隊(duì)列

引用:?https://www.cnblogs.com/tianqing/p/7110468.html

消息隊(duì)列都應(yīng)用到了哪些實(shí)際的應(yīng)用場(chǎng)景中?

一、談消息隊(duì)列的應(yīng)用場(chǎng)景

異步處理:例如短信通知、終端狀態(tài)推送、App推送、用戶注冊(cè)等

數(shù)據(jù)同步:業(yè)務(wù)數(shù)據(jù)推送同步

重試補(bǔ)償:記賬失敗重試

系統(tǒng)解耦:通訊上下行、終端異常監(jiān)控、分布式事件中心

流量消峰:秒殺場(chǎng)景下的下單處理

發(fā)布訂閱:HSF的服務(wù)狀態(tài)變化通知、分布式事件中心

高并發(fā)緩沖:日志服務(wù)、監(jiān)控上報(bào)

但是,我們對(duì)消息隊(duì)列的底層技術(shù)和原理還是不了解,那么我們馬上開(kāi)始吧…

二、消息隊(duì)列的一些基本概念和簡(jiǎn)單原理

1. Broker

Broker的概念來(lái)自與Apache ActiveMQ,通俗的講就是MQ的服務(wù)器。

2.?消息的生產(chǎn)者、消費(fèi)者

消息生產(chǎn)者Producer:發(fā)送消息到消息隊(duì)列。

消息消費(fèi)者Consumer:從消息隊(duì)列接收消息。

3.?點(diǎn)對(duì)點(diǎn)消息隊(duì)列模型

消息生產(chǎn)者向一個(gè)特定的隊(duì)列發(fā)送消息,消息消費(fèi)者從該隊(duì)列中接收消息;

消息的生產(chǎn)者和消費(fèi)者可以不同時(shí)處于運(yùn)行狀態(tài)。

每一個(gè)成功處理的消息都由消息消費(fèi)者簽收確認(rèn)(Acknowledge)。如圖:

4.?發(fā)布訂閱消息模型-Topic

發(fā)布訂閱消息模型中,支持向一個(gè)特定的主題Topic發(fā)布消息,0個(gè)或多個(gè)訂閱者接收來(lái)自這個(gè)消息主題的消息。在這種模型下,發(fā)布者和訂閱者彼此不知道對(duì)方。實(shí)際操作過(guò)程中,

發(fā)布訂閱消息模型中,支持向一個(gè)特定的主題Topic發(fā)布消息,0個(gè)或多個(gè)訂閱者接收來(lái)自這個(gè)消息主題的消息。在這種模型下,發(fā)布者和訂閱者彼此不知道對(duì)方。實(shí)際操作過(guò)程中,

必須先訂閱,再發(fā)送消息,而后接收訂閱的消息,這個(gè)順序必須保證。

5.?消息的順序性保證

基于Queue消息模型,利用FIFO先進(jìn)先出的特性,可以保證消息的順序性。

6.?消息的ACK確認(rèn)機(jī)制

即消息的Ackownledge確認(rèn)機(jī)制,

為了保證消息不丟失,消息隊(duì)列提供了消息Acknowledge機(jī)制,即ACK機(jī)制,當(dāng)Consumer確認(rèn)消息已經(jīng)被消費(fèi)處理,發(fā)送一個(gè)ACK給消息隊(duì)列,此時(shí)消息隊(duì)列便可以刪除這個(gè)消息了。如果Consumer宕機(jī)/關(guān)閉,沒(méi)有發(fā)送ACK,消息隊(duì)列將認(rèn)為這個(gè)消息沒(méi)有被處理,會(huì)將這個(gè)消息重新發(fā)送給其他的Consumer重新消費(fèi)處理。

7.?消息的持久化

消息的持久化,對(duì)于一些關(guān)鍵的核心業(yè)務(wù)來(lái)說(shuō)是非常重要的,啟用消息持久化后,消息隊(duì)列宕機(jī)重啟后,消息可以從持久化存儲(chǔ)恢復(fù),消息不丟失,可以繼續(xù)消費(fèi)處理。

8.?消息的同步和異步收發(fā)

同步:消息的收發(fā)支持同步收發(fā)的方式。

同時(shí)還有另一種同步方式:同步收發(fā)場(chǎng)景下,消息生產(chǎn)者和消費(fèi)者雙向應(yīng)答模式,例如:張三寫封信送到郵局中轉(zhuǎn)站,然后李四從中轉(zhuǎn)站獲得信,然后在寫一份回執(zhí)信,放到中轉(zhuǎn)站,然后張三去取,當(dāng)然張三寫信的時(shí)候就得寫明回信地址

消息的接收如果以同步的方式(Pull)進(jìn)行接收,如果隊(duì)列中為空,此時(shí)接收將處于同步阻塞狀態(tài),會(huì)一直等待,直到消息的到達(dá)。

異步:消息的收發(fā)同樣支持異步方式:異步發(fā)送消息,不需要等待消息隊(duì)列的接收確認(rèn);異步接收消息,以Push的方式觸發(fā)消息消費(fèi)者接收消息。

9.?消息的事務(wù)支持

消息的收發(fā)處理支持事務(wù),例如:在任務(wù)中心場(chǎng)景中,一次處理可能涉及多個(gè)消息的接收、處理,這處于同一個(gè)事務(wù)范圍內(nèi),如果一個(gè)消息處理失敗,事務(wù)回滾,消息重新回到隊(duì)列中。

三、我們對(duì)消息隊(duì)列的實(shí)際使用

我們使用了兩種消息隊(duì)列組件:

RabbitMQ:高可用、高可靠消息應(yīng)用場(chǎng)景,例如記賬失敗重試、通知服務(wù),消息不允許丟

Kafka:高性能消息應(yīng)用場(chǎng)景,例如日志、監(jiān)控,消息允許丟失。

在此之上,我們封裝了消息應(yīng)用中心、日志服務(wù)等核心組件和服務(wù)。那么,消息應(yīng)用中心和日志都用到了消息隊(duì)列什么技術(shù)? 干貨來(lái)了…

1.?消息應(yīng)用中心

消息應(yīng)用中心(任務(wù)中心)使用了消息隊(duì)列的異步處理、數(shù)據(jù)同步、重試補(bǔ)償、系統(tǒng)解耦、流量消峰等特性。其中:

消息應(yīng)用中心(任務(wù)中心),支持RabbitMQ和Kafka兩種消息通道,支持在任務(wù)元數(shù)據(jù)層面設(shè)置

任務(wù):就是一個(gè)包含了任務(wù)執(zhí)行上下文的消息,同時(shí)代表了異步處理

任務(wù)發(fā)送者(ITaskSender)發(fā)送任務(wù):消息的生產(chǎn)者將任務(wù)消息發(fā)送的消息隊(duì)列

任務(wù)類型:消息隊(duì)列名稱,例如:HaKeepAcco***Queue,充電補(bǔ)償記賬隊(duì)列

消息隊(duì)列:任務(wù)的臨時(shí)存儲(chǔ)

任務(wù)中心:任務(wù)集中處理,消息消費(fèi)者

任務(wù)處理完成:消息Ack確認(rèn)

任務(wù)的多級(jí)重試:多個(gè)重試消息隊(duì)列,HaSysTaskStore2Queue

2.?日志組件

日志組件,使用了消息隊(duì)列的高并發(fā)緩沖和發(fā)布訂閱特性。其中:

日志組件使用Kafka作為消息通道,因?yàn)镵afka的性能好,吞吐量大, 可以容忍偶爾的消息數(shù)據(jù)丟失

日志組件使用發(fā)布訂閱的消息模型

日志組件包含日志服務(wù)SDK和日志HSF服務(wù),二者都是消息的生產(chǎn)者Producer

日志類型:消息的Topic主題

日志處理器:消息的消費(fèi)者、Topic的訂閱、日志數(shù)據(jù)處理(Hbase\ES\其他)

3.???? RPC服務(wù)狀態(tài)變化通知

RPC服務(wù)狀態(tài)變化通知,使用了消息隊(duì)列的發(fā)布訂閱特性。其中:

RPC服務(wù)狀態(tài)變化通知,使用了RabbitMQ消息隊(duì)列技術(shù)

使用發(fā)布訂閱的消息模型

Topic:RPCServiceState

RPCService.Proxy:RPC服務(wù)狀態(tài)變化消息的訂閱者

RPC服務(wù)注冊(cè)、發(fā)布:消息的生產(chǎn)者,發(fā)送RPC服務(wù)狀態(tài)變化消息。

四、消息隊(duì)列使用的最佳實(shí)踐

1. ?RabbitMQ的連接,底層都是Socket連接,長(zhǎng)連接?or?短連接?

RabbitMQ每個(gè)在創(chuàng)建每個(gè)連接的同時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)監(jiān)視線程來(lái)定時(shí)(默認(rèn)60s)偵測(cè)連接的狀態(tài),如果連接斷開(kāi),觸發(fā)ConnectionShutdown事件。

??? 用長(zhǎng)連接,還是用短連接??

??? 發(fā)送端:建議使用短連接,用完即釋放,避免長(zhǎng)連接帶來(lái)的端口占用,因?yàn)榘l(fā)送端無(wú)處不在,發(fā)送操作短而急促。

接收端:建議使用長(zhǎng)連接,時(shí)刻接收處理消息,因?yàn)橄⒌慕邮障M(fèi)比較集中,接收操作久而彌堅(jiān)。

2.?網(wǎng)絡(luò)是有抖動(dòng)的,連接的斷開(kāi)是正常的,如何應(yīng)對(duì)?

??? 發(fā)送端:發(fā)送失敗重試

接收端:注冊(cè)ConnectionShutdown事件同時(shí)捕獲消息接收異常,重新建立連接,接收消費(fèi)消息

3. RabbitMQ Exchange(Topic)模式下帶來(lái)的消息隊(duì)列數(shù)量激增

??? 只是創(chuàng)建了一個(gè)Exchange(Topic),為什么會(huì)增加這么多Queue。

?? 因?yàn)?,每個(gè)Topic的訂閱都是綁定一個(gè)Queue用作消息的消費(fèi)。

4.?需求的演變,消息結(jié)構(gòu)的變更,如何平滑過(guò)度?

??? 消息是byte[]數(shù)組,我們將復(fù)雜對(duì)象消息二進(jìn)制序列化。

??? 接收到消息后,我們將二進(jìn)制數(shù)組反序列化為實(shí)體類。

??? 當(dāng)我們的實(shí)體類消息體的結(jié)構(gòu)發(fā)生變化后,因?yàn)槭芏M(jìn)制序列化處理的

影響,導(dǎo)致無(wú)法反序列化。

??? 解決方案:

??? 消息體預(yù)留一些string類型的擴(kuò)展字段

?? 消息隊(duì)列版本化,支持多個(gè)版本的消息體。

5. Kafka Consumer Group

?? 同一Topic的一條消息只能被同一個(gè)Group內(nèi)的一個(gè)Consumer消費(fèi)

?? 多個(gè)Consumer Group可同時(shí)消費(fèi)同一條消息


6.?消息的積壓

消息的積壓產(chǎn)生的原因:消息接收消費(fèi)的速率低,發(fā)送的速度>接收的速度。

消息積壓后的影響:

消息大量積壓后,當(dāng)新的消費(fèi)者連接上MQ并開(kāi)始接收消息時(shí),發(fā)送速率會(huì)大幅降低。

消息隊(duì)列集群的壓力增加,大量的消息要持久化存儲(chǔ)和同步。

如何減少消息積壓:快速消費(fèi)消息,同時(shí)保持消息體的不要過(guò)大。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容