使用場景
異步處理、服務(wù)解耦、流量控制。
消息隊列的典型應(yīng)用場景:
? 訂單系統(tǒng):在電商系統(tǒng)中,訂單的創(chuàng)建、支付、發(fā)貨等步驟可以通過消息隊列進行異步處理和解耦。
? 日志處理:使用消息隊列將日志從應(yīng)用系統(tǒng)傳輸?shù)饺罩咎幚硐到y(tǒng),實現(xiàn)實時分析和監(jiān)控。
? 任務(wù)調(diào)度:在批量任務(wù)處理、任務(wù)調(diào)度系統(tǒng)中,通過消息隊列將任務(wù)分發(fā)給多個工作節(jié)點,進行并行處理。
? 數(shù)據(jù)同步:在數(shù)據(jù)同步系統(tǒng)中,消息隊列可以用于將變更的數(shù)據(jù)異步同步到不同的存儲系統(tǒng)或服務(wù)。
整體架構(gòu)

● Producer: 負責生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)生產(chǎn)消息,可通過集群方式部署。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認信息,單向發(fā)送不需要。
● Consumer: 負責消費消息,一般是后臺系統(tǒng)負責異步消費,可通過集群方式部署。一個消息消費者會從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。提供pull/push兩者消費模式。
● Broker Server: 負責存儲消息、轉(zhuǎn)發(fā)消息。RocketMQ系統(tǒng)中負責接收從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準備,存儲消息相關(guān)的元數(shù)據(jù),包括消費者組、消費進度偏移和主題和隊列消息等。
● Name Server: 名字服務(wù),充當路由消息的提供者。生產(chǎn)者或消費者能夠通過名字服務(wù)查找各主題相應(yīng)的Broker IP列表。多個NameServer實例組成集群,相互獨立,沒有信息交換。
● Topic(邏輯概念):表示一類消息的集合,每個Topic包含若干條消息,每條消息只能屬于一個Topic,是RocketMQ進行消息訂閱的基本單位。創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發(fā)送消息時自動創(chuàng)建Topic。一個Topic可以分片在多個Broker集群上,每一個Topic分片包含多個queue,具體結(jié)構(gòu)可以參考下圖:

● CommitLog(物理文件):消息存儲文件,所有Topic的消息都存儲在CommitLog文件中。單個文件大小默認1G, 文件名長度為 20 位,左邊補零,剩余為起始偏移量,比如 00000000000000000000 代表了第一個文件,起始偏移量為 0,文件大小為 1G=1073741824;當?shù)谝粋€文件寫滿了,第二個文件為 00000000001073741824,起始偏移量為 1073741824,以此類推。數(shù)據(jù)結(jié)構(gòu)如下圖:

下面表格說明下每條消息包含哪些字段,以及這些字段占用空間大小和字段簡介。

RocketMQ中CommitLog的生命周期及其清理時機如下:
- 默認情況下,Broker會清理單個CommitLog文件中最后一條消息超過 72小時的CommitLog文件。除了用戶手動清理外,以下幾種情況會被默認清理。
- 如果CommitLog所在磁盤分區(qū)的磁盤占用率超過75%,則會觸發(fā)CommitLog文件清理。
- 如果CommitLog所在磁盤分區(qū)的磁盤占用率超過85%,則會強制刪除CommitLog文件。
- 如果磁盤占用率達到系統(tǒng)危險警戒線(默認90%),Broker將拒絕消息寫入。
- Broker在啟動的時候會注冊定時任務(wù),定時清理過期的數(shù)據(jù),默認是每10s執(zhí)行一次,分別清理CommitLog文件和ConsumeQueue文件。
- 通常情況下每天凌晨4點刪除超過72小時的CommitLog;如果CommitLog所在磁盤分區(qū)的磁盤占用率超過75%,則會觸發(fā)CommitLog文件清理。
在RocketMQ中,可以調(diào)整以下參數(shù)來設(shè)置CommitLog的生命周期:
- fileReservedTime:文件的保留時間,默認72小時。這個參數(shù)可以設(shè)置消息在CommitLog中保留的時間。
- deleteWhen:清理過期日志時間,例如設(shè)置為"04"表示凌晨4點開始清理。
- diskMaxUsedSpaceRatio:磁盤最大使用率,超過使用率會發(fā)起日志清理操作。
● ConsumeQueue:消息消費隊列,消息到達CommitLog文件后,將異步轉(zhuǎn)發(fā)ConsumeQueue,供消費者消費。ConsumeQueue僅記錄消息的偏移位置以及消息長度,以及tag信息.實際消息內(nèi)容存在CommitLog。ConsumeQueue 文件采取定長設(shè)計,每一個條目共 20 個字節(jié),分別為 8 字節(jié)的 CommitLog 物理偏移量、4 字節(jié)的消息長度、8 字節(jié) tag hashcode,單個文件由 30W 個條目組成,可以像數(shù)組一樣隨機訪問每一個條目,每個 ConsumeQueue 文件大小約 5.72M。數(shù)據(jù)結(jié)構(gòu)如下圖:

● IndexFile:CommitLog的索引文件,提供了一種可以通過 key 或時間區(qū)間來查詢消息的方法。單個 IndexFile 文件大小約為 400M,一個 IndexFile 可以保存 2000W 個索引,IndexFile 的底層存儲設(shè)計類似 JDK 的 HashMap 數(shù)據(jù)結(jié)構(gòu)。主要由 Header、Slot Table、Index Linked List 三部分組成。

Header:IndexFile 的頭部,占 40 個字節(jié)。主要包含以下字段:

Slot Table:默認包含 500w 個 Hash 槽,每個 Hash 槽存儲的是相同 hash 值的第一個 IndexItem 存儲位置 。
Index Linked List:默認最多包含 2000w 個 IndexItem

Topic,CommitLog,ConsumeQueue 三者的關(guān)系如下圖:
生產(chǎn)端選擇需要發(fā)送的隊列(Topic),一個Topic可以對應(yīng)多個ConsumeQueue,消息是順序?qū)懙紺ommmitLog里面.ConsumeQueue、IndexFile都是基于CommitLog文件構(gòu)建的。一個完整的消息寫入流程包括:同步寫入 Commitlog 文件緩存區(qū),異步構(gòu)建 ConsumeQueue、IndexFile 文件。
RocketMQ通過開啟一個線程ReputMessageServcie來實時讀取CommitLog文件新增內(nèi)容,使用reputFromOffset來標記已經(jīng)追蹤到的位置。
image.png
功能
普通消息
SendReceipt sendReceipt = producer.send(message);
順序消息
? 生產(chǎn)者:使用 MessageQueueSelector 將具有相同 orderId 的消息發(fā)送到同一個消息隊列(MessageQueue),確保這些消息被發(fā)送到同一隊列,從而保證順序。
? 消費者:通過 DefaultMQPushConsumer 訂閱 OrderTopic,RocketMQ 保證每個隊列中的消息是按順序消費的。只要相同訂單 ID 的消息在同一隊列中,就可以保證它們的消費順序。
實現(xiàn)MQ順序消息關(guān)鍵點
在發(fā)送時設(shè)置分片路由規(guī)則,讓相同key的消息只落到指定queue上,然后消費過程中對順序消息所在的queue加鎖,保證消息的有序性,讓這個queue上的消息就按照FIFO順序來進行消費。因此我們滿足以下三個條件是否就可以呢?
1)消息順序發(fā)送: 多線程發(fā)送的消息無法保證有序性,因此,需要業(yè)務(wù)方在發(fā)送時,針對同一個業(yè)務(wù)編號(如同一筆訂單)的消息需要保證在一個線程內(nèi)順序發(fā)送,在上一個消息發(fā)送成功后,在進行下一個消息的發(fā)送。對應(yīng)到mq中,消息發(fā)送方法就得使用同步發(fā)送,異步發(fā)送無法保證順序性。
//采用的同步發(fā)送方式,在一個線程內(nèi)順序發(fā)送,異步發(fā)送方式為:producer.send(msg, new SendCallback() {...})
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {//…}
2)消息順序存儲:MQ 的topic下會存在多個queue,要保證消息的順序存儲,同一個業(yè)務(wù)編號的消息需要被發(fā)送到一個queue中。對應(yīng)到mq中,需要使用MessageQueueSelector來選擇要發(fā)送的queue。即可以對業(yè)務(wù)編號設(shè)置路由規(guī)則,像根據(jù)隊列數(shù)量對業(yè)務(wù)字段hash取余,將消息發(fā)送到一個queue中。
//使用"%"操作,使得訂單id取余后相同的數(shù)據(jù)路由到同一個queue中,也可以自定義路由規(guī)則
long index = id % mqs.size();
return mqs.get((int) index);
3)消息順序消費:要保證消息順序消費,同一個queue就只能被一個消費者所消費,因此對broker中消費隊列加鎖是無法避免的。同一時刻,一個消費隊列只能被一個消費者消費,消費者內(nèi)部,也只能有一個消費線程來消費該隊列。這里RocketMQ已經(jīng)為我們實現(xiàn)好了。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//....省略
}
}
消費者重新負載,并且分配完消費隊列后,需要向mq服務(wù)器發(fā)起消息拉取請求,代碼實現(xiàn)在RebalanceImpl#updateProcessQueueTableInRebalance()中,針對順序消息的消息拉取,mq做了以上判斷,即消費客戶端先向broker端發(fā)起對messageQueue的加鎖請求,只有加鎖成功時才創(chuàng)建pullRequest進行消息拉取
public boolean lock(final MessageQueue mq) {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
lockOK ? "OK" : "Failed",
this.consumerGroup,
mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
}
return false;
}
可以看到,就是調(diào)用lockBatchMQ方法發(fā)送了一個加鎖請求,成功獲取到消息處理隊列就設(shè)為獲取到鎖,返回鎖定成功,如果加鎖成功,同一時刻只有一個線程進行消息消費。加鎖失敗,會延遲1000ms重新嘗試向broker端申請鎖定messageQueue,鎖定成功后重新提交消費請求。
順序消息可能存在的坑
經(jīng)過我們正常發(fā)送順序消息,存儲順序消息,消費者只需要老老實實地按照拉取到地消息順序消費,就可以保證順序性。
但是,仍然會出現(xiàn)一些問題,比如消費者如果消費失敗了,對于一個普通消息來說,它就會進入消息重試,如果默認設(shè)置的16次重試都失敗了,消息就會進入死信隊列,需要人工介入處理。
這樣對于順序消息來說可能是不可接受的,因為前置的消息消費失敗了,后續(xù)的消息若是消費成功,消息的順序性就被打亂了。
比如下單操作,用戶下單的消息消費失敗了,相當于訂單未能創(chuàng)建,但是付款的消息卻執(zhí)行成功了,用戶一看,錢都扣了咋訂單沒了??于是客服電話又雙叒叕被打爆了!
因此我們需要考慮順序消息能不能重試,能不能放入死信隊列?
RocketMQ 對于順序消息的重試默認實現(xiàn)是執(zhí)行 Integer.MAX_VALUE 次。 若是一直無法消費成功,那么就可能會把業(yè)務(wù)堵塞了。
因此順序消息消費失敗的處理的一個方法是將可能需要支持相關(guān)聯(lián)的所有消息都直接失敗,然后找個地方持久化保存這些消息,等待后續(xù)修復后再重新消費。
重平衡機制的影響
消費者的重平衡機制也會影響消息的順序消費。當一個消費者突然宕機了,當前隊列沒有消費者,那么就會觸發(fā)重平衡機制,讓其他消費者頂替這個掛掉的消費者的位置。
如果這個掛掉的消費者此時沒有消費消息倒是還好,新來的消費者就能繼續(xù)完成后續(xù)消費。但是如果掛掉的消費者剛好消費消息到一半,那么此時它還未去修改 Broker 的消費位點,那么就會導致新來的消費者重復消費,甚至可能使得順序不一致。
順序消費的三把鎖
第一把鎖:分布式鎖
RocketMQ 提供了一個 ConsumeMessageOrderlyService 類來保證順序消費。
這個 service 啟動的時候會向 Broker 申請當前消費者負責的隊列鎖,會將消費組、客戶端ID、以及負責的隊列發(fā)往 Broker。 Broker 會將這個隊列與當前消費者進行綁定,將關(guān)系存儲到本地。
這個鎖實現(xiàn)了同一個消費者組內(nèi),只有一個消費者可以消費這個隊列。
這個鎖有過期時間,消費者會定期(默認20s)給這個鎖續(xù)期,確保對分布式鎖的占有。
別的消費者要想消費隊列,就必須也來加分布式鎖,但是如果隊列已經(jīng)被別的消費者給綁定了,那么就無法消費。
第二把鎖: Synchronized
這把鎖確保了同一時刻只能有一個線程去消費這個隊列。
這里需要了解一個前提:
一個消費者可以同時消費多個隊列,而一個隊列某刻只能被一個消費者消費。
由于占有分布式鎖的消費者拿到消息后,它仍然是丟到線程池(因為消費者可能在消費多個隊列,并發(fā)消費不同隊列可以增加性能)去并發(fā)消費,但是我們的順序消息必須保證有序,因此只能讓一個線程去消費順序消息。
第三把鎖:ReentrantLock
線程獲取到 Synchronized 鎖之后,還需要再到 ProcessQueue 中獲取到 consumeLock 鎖。這是一個 ReentrantLock。
這把鎖是為了表明消費者正在消費消息
由于 RocketMQ 存在重平衡機制,我們前面了解到,如果當前消費者正在處理消費消息,還未向 Broker 提交消費點位,如果當前的消費者掛了,觸發(fā)重平衡機制,那么新來的消費者就可能導致重復消費。
這個鎖的功能就是表明當前這個隊列還有消息正在被消費,無法重平衡,等待下一次重平衡。
當消費者去請求占有這個鎖的時候,如果獲取失敗,就說明隊列正在被消費,則重平衡失敗。如果獲取鎖成功,那么就表明當前隊列沒有被消費消息,就可以去 Broker 中解除分布式鎖,讓新的消費者接管這個隊列了。
實際上,這三把鎖并不能完全保障順序性和不重復
例如:當一個 Broker 掛了,那么 Broker 對應(yīng)的隊列就全部不可用了,此時會讓集群中其他的 Broker 頂替這個掛掉的 Broker,原來隊列中相關(guān)的消息只能被發(fā)送到別的隊列里,那么就會被別的消費者消費。
要想保證順序性,那么就得犧牲可用性,不能讓消息發(fā)送到別得隊列。要想保證可用性又會犧牲順序性。
RocketMQ 對這兩個模式都提供了方案:如果要絕對的順序性,則創(chuàng)建 Topic 時要指定 -o 參數(shù)(–order)為true,且 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必須是 true。
列中相關(guān)的消息只能被發(fā)送到別的隊列里,那么就會被別的消費者消費。
要想保證順序性,那么就得犧牲可用性,不能讓消息發(fā)送到別得隊列。要想保證可用性又會犧牲順序性。
RocketMQ 對這兩個模式都提供了方案:如果要絕對的順序性,則創(chuàng)建 Topic 時要指定 -o 參數(shù)(–order)為true,且 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 必須是 true。
延時消息
message.setDelayTimeLevel(2);
SendResult sendResult = producer.send(message);
1)預定義延時級別
● RocketMQ不支持任意時間的延時消息,而是提供了18個等級的延遲時間,包括1秒、5秒、10秒、30秒、1分鐘、2分鐘、3分鐘、4分鐘、5分鐘、6分鐘、7分鐘、8分鐘、9分鐘、10分鐘、20分鐘、30分鐘、1小時和2小時。這種設(shè)計主要是出于性能考慮,如果支持任意時間的延遲,就會涉及到消息的排序,會有一定的性能損耗。而RocketMQ這種利用固定延遲級別到單個隊列的實現(xiàn)方式是一種妥協(xié),靈活性和極致性能的妥協(xié)。此外,RocketMQ 5.0版本引入了新的時間輪算法,簡單理解就是把時間按照精度劃分成N個Slot,消息會按照延遲時間加入到對應(yīng)的Slot,然后線程定時掃描時間輪,把Slot對應(yīng)的到期消息重新投遞即可。
2)存儲轉(zhuǎn)換
? 當一條延時消息被發(fā)送到 RocketMQ 后,并不會立即存入目標主題(Topic)的消息隊列中,而是首先被發(fā)送到了一個特殊的內(nèi)部主題 SCHEDULE_TOPIC_XXXX 中。
? 在這個內(nèi)部主題中,消息根據(jù)其延時級別被分配到不同的隊列里,每個隊列對應(yīng)一個延時級別。
3)定時調(diào)度
? RocketMQ 內(nèi)部有一個調(diào)度服務(wù)(Schedule Service),它會定期掃描 SCHEDULE_TOPIC_XXXX 主題下的消息隊列。
? 根據(jù)消息的延時級別以及當前時間判斷是否已經(jīng)到達了該消息應(yīng)該被投遞的時間點。如果達了,則將這條消息重新發(fā)送到最初指定的目標 Topic 中,供消費者消費。
事務(wù)消息

1.在消息隊列上開啟一個事務(wù)主題。
2.事務(wù)中第一個執(zhí)行的服務(wù)發(fā)送一條“半消息”(半消息和普通消息的唯一區(qū)別是,在事務(wù)提交之前,對于消費者來說,這個消息是不可見的)給消息隊列。
3.半消息發(fā)送成功后,發(fā)送半消息的服務(wù)就會開始執(zhí)行本地事務(wù),根據(jù)本地事務(wù)執(zhí)行結(jié)果來決定事務(wù)消息提交或者回滾。
補償流程:RocketMQ 提供事務(wù)反查來解決異常情況,如果 RocketMQ 沒有收到提交或者回滾的請求,Broker 會定時到生產(chǎn)者上去反查本地事務(wù)的狀態(tài),然后根據(jù)生產(chǎn)者本地事務(wù)的狀態(tài)來處理這個“半消息”是提交還是回滾。值得注意的是我們需要根據(jù)自己的業(yè)務(wù)邏輯來實現(xiàn)反查邏輯接口,然后根據(jù)返回值 Broker 決定是提交還是回滾。而且這個反查接口需要是無狀態(tài)的,請求到任意一個生產(chǎn)者節(jié)點都會返回正確的數(shù)據(jù)。
4.本地事務(wù)成功后會讓這個“半消息”變成正常消息,供分布式事務(wù)后面的步驟執(zhí)行自己的本地事務(wù)。(這里的事務(wù)消息,Producer 不會因為 Consumer 消費失敗而做回滾,采用事務(wù)消息的應(yīng)用,其所追求的是高可用和最終一致性,消息消費失敗的話,RocketMQ 自己會負責重推消息,直到消費成功。)
其中,補償流程用于解決消息 Commit 或者 Rollback 發(fā)生超時或者失敗的情況。在 RocketMQ 事務(wù)消息的主要流程中,一階段的消息如何對用戶不可見。其中,事務(wù)消息相對普通消息最大的特點就是一階段發(fā)送的消息對用戶是不可見的。那么,如何做到寫入消息但是對用戶不可見呢?RocketMQ 事務(wù)消息的做法是:如果消息是“半消息”,將備份原消息的主題與消息消費隊列,然后改變主題為 RMQ_SYS_TRANS_HALF_TOPIC。由于消費組未訂閱該主題,故消費端無法消費“半消息”的消息,然后 RocketMQ 會開啟一個定時任務(wù),從 Topic 為 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息進行消費,根據(jù)生產(chǎn)者組獲取一個服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請求,根據(jù)事務(wù)狀態(tài)來決定是提交或回滾消息。
常見問題
消息不丟失

1.生產(chǎn)者需要處理好 Broker 的響應(yīng),出錯情況下利用重試、報警等手段。
2.Broker需要控制響應(yīng)的時機,單機情況下是消息刷盤后返回響應(yīng),集群多副本情況下,即發(fā)送至兩個副本及以上的情況下再返回響應(yīng)。
3.消費者需要在執(zhí)行完真正的業(yè)務(wù)邏輯之后再返回響應(yīng)給 Broker。
但是要注意消息可靠性增強了,性能就下降了,等待消息刷盤、多副本同步后返回都會影響性能。因此還是看業(yè)務(wù),例如日志的傳輸可能丟那么一兩條關(guān)系不大,因此沒必要等消息刷盤再響應(yīng)。
消息的ACK模式
消息被消費,那么如何保證被消費成功呢?這里只有使用方控制,只有使用方確認成功了,才會消費成功,否則會重新投遞。
RocketMQ其實是通過ACK機制來對失敗消息進行重試和通知的,具體流程如下所示:

重試間隔
10秒/30秒/1分鐘/2分鐘/3分鐘/4分鐘/5分鐘/6分鐘/7分鐘/8分鐘/9分鐘/10分鐘/20分鐘/30分鐘/1小時/2小時
消息堆積
消息堆積是指在消息隊列中,消息的生產(chǎn)速度遠大于消費速度,導致大量消息積壓在隊列中。
我們需要先定位消費慢的原因,如果是 bug 則處理 bug,同時可以臨時擴容增加消費速率,減少線上的資損。
如果是因為本身消費能力較弱,則可以優(yōu)化下消費邏輯
常見有以下幾種方式提升消費者的消費能力:
- 增加消費者線程數(shù)量:提高并發(fā)消費能力。
- 增加消費實例:在分布式系統(tǒng)中,可以水平擴展多個消費實例,從而提高消費速率。
- 優(yōu)化消費者邏輯:檢查消費者的代碼,減少單個消息的處理時間。例如,減少 I/O 操作、使用批量處理等。
注意上述的第二點:
? 增加消費實例,一定要注意注意 Topic 對應(yīng)的分區(qū)/隊列數(shù)需要大于等于消費實例數(shù),不然新增加的消費者是沒東西消費的。因為一個 Topic中,一個分區(qū)/隊列只會分配給一個消費實例
除此之外還可以進行限流和降級處理:
? 對消息生產(chǎn)端進行限流,降低生產(chǎn)速率,避免消息積壓進一步惡化。
? 對非關(guān)鍵消息進行丟棄或延遲處理,只保留高優(yōu)先級的消息,提高系統(tǒng)的響應(yīng)速度。
優(yōu)化消費者邏輯常見做法
批量消費:
? 通過一次性從隊列中消費多條消息(如批量讀取 100 條),可以減少每次拉取消息的網(wǎng)絡(luò)開銷,提高處理效率。
異步消費:
? 使用異步處理方法,在消費的同時不阻塞后續(xù)消息的消費。處理完一條消息后立即開始處理下一條消息,提升并發(fā)度(但是要注意消息丟失的風險)。
優(yōu)化數(shù)據(jù)庫操作:
? 如果消費者在處理消息時需要頻繁訪問數(shù)據(jù)庫,可以通過數(shù)據(jù)庫連接池、SQL 優(yōu)化、緩存等手段減少數(shù)據(jù)庫操作的時間。
? 使用批量插入或更新操作,而不是逐條處理,可以顯著提升效率。
臨時擴展隊列的策略
臨時擴展多個消費者隊列:
? 在消息積壓嚴重時,可以通過臨時擴展多個消費者隊列,將積壓的消息分配到不同的隊列中進行消費。消費完成后,可以將這些臨時隊列關(guān)閉。
使用多隊列調(diào)度機制:
? 例如,使用 RabbitMQ 的 Exchange 機制,將消息按照特定規(guī)則路由到多個隊列中。這樣可以在消息堆積時,將不同類型的消息分開處理。
限流與降級的實現(xiàn)方式
生產(chǎn)者限流:
? 在生產(chǎn)者端增加限流邏輯,使用令牌桶、漏桶算法等限流策略,限制生產(chǎn)者的發(fā)送速率,從而避免消息隊列被快速填滿。
? 例如,在 Kafka 中可以通過配置生產(chǎn)者的 linger.ms 和 batch.size 來緩解消息發(fā)送的速度。
消費者降級:
? 在消息堆積嚴重時,對低優(yōu)先級的消息進行丟棄或延遲處理。只保留高優(yōu)先級的消費,確保系統(tǒng)核心功能的正常運行。
? 可以在消費端增加優(yōu)先級隊列或通過消息屬性區(qū)分優(yōu)先級,先處理高優(yōu)先級的消息。
RocketMQ常見調(diào)優(yōu)
- 磁盤瓶頸的識別與優(yōu)化:首先需要通過工具來監(jiān)測當前系統(tǒng)的磁盤I/O操作狀況,這有助于理解目前磁盤是否在IOPS或吞吐量上遇到了限制。如果發(fā)現(xiàn)是讀吞吐量達到瓶頸,而IOPS仍有空間,可以嘗試減少預讀設(shè)置。若IOPS已接近極限但吞吐量較低,則可適當增加預讀大小以提高效率。對于無法單純通過調(diào)整配置解決的情況,考慮橫向(添加更多節(jié)點)或縱向(升級單個節(jié)點配置)擴展存儲資源。
- 內(nèi)存利用與“冷讀”處理:由于RocketMQ依賴PageCache緩存數(shù)據(jù),因此部署時應(yīng)優(yōu)先選用內(nèi)存較大的機器,這樣能有效減少直接從磁盤讀取數(shù)據(jù)的機會。對于5.1.2版本及以上版本的RocketMQ,當遇到因拉取長期積壓消息導致的大量磁盤訪問時,可以通過設(shè)置 dataReadAheadEnable=false 來降低CommitLog文件的預讀量,從而緩解這一問題。
- 文件清理機制調(diào)優(yōu):合理設(shè)定參數(shù)值,根據(jù)業(yè)務(wù)特點調(diào)整 deleteWhen 參數(shù)指定的消息清理時間點,設(shè)置合適的 fileReservedTime 控制消息保留周期,調(diào)整 diskMaxUsedSpaceRatio 確保磁盤利用率保持在一個健康水平。
- 集群參數(shù)調(diào)優(yōu):對Broker的幾個屬性可能影響到集群性能的穩(wěn)定性,下面進行特別說明。開啟異步刷盤提高集群吞吐,開啟Slave讀權(quán)限提高Master內(nèi)存利用率,消費一次拉取消息數(shù)量由broker和consumer客戶端共同決定,發(fā)送隊列等待時間由參數(shù)waitTimeMillsInSendQueue設(shè)置,主從異步復制提高集群性能,提高集群穩(wěn)定性。
- 消息延遲問題優(yōu)化:分散定時消息的觸發(fā)時間,合理選擇延遲等級,增加集群資源,監(jiān)控與調(diào)優(yōu)。
