[TOC]
概述
RocketMQ 4.9.1 版本 針對(duì) Broker 做了一系列性能優(yōu)化,提升了消息發(fā)送的 TPS。前文曾就 4.9.1 版本的優(yōu)化做了深入分析。
在 2022 年的 2 月底,RocketMQ 4.9.3 版本 發(fā)布,其對(duì) Broker 做了更進(jìn)一步的性能優(yōu)化,本次優(yōu)化中也包含了生產(chǎn)和消費(fèi)性能的提升。
本文將會(huì)詳解 4.9.3 版本中的性能優(yōu)化點(diǎn)。在 4.9.3 版本中對(duì)延遲消息的優(yōu)化已經(jīng)在另一篇文章中詳解。
本次和上次的性能優(yōu)化主要由快手的黃理老師提交,在 ISSUE#3585 中集中記錄。先來看一下本次性能優(yōu)化的所有優(yōu)化項(xiàng)
We have some performance improvements based on 4.9.2
- [Part A] eliminate reverse DNS lookup in MessageExt
- [Part B] Improve header encode/decode performance
- [Part B] Improve RocketMQSerializable performance with zero-copy
- [Part C] cache result for parseChannelRemoteAddr()
- [Part D] improve performance of createUniqID()
- [Part E] eliminate duplicated getNamespace() call when where is no namespace
- [Part F] eliminate regex match in topic/group name check
- [Part G] [Work in progress] support send batch message with different topic/queue
- [Part H] eliminate StringBuilder auto resize in PullRequestHoldService.buildKey() when topic length is greater than 14, this method called twice for each message.
- [Part I] Avoid unnecessary StringBuffer resizing and String Formatting
- [Part J] Use mmap buffer instead of FileChannel when writing consume queue and slave commit log, which greatly speed up consume tps.
- Part K move execution of notifyMessageArriving() from ReputMessageService thread to PullRequestHoldService thread.
These commits almost eliminate bad performance methods in the cpu flame graph in producer side.
下面來逐條剖析
性能優(yōu)化
想要優(yōu)化性能,首先需要找到 RocketMQ 的 Broker 在處理消息時(shí)性能損耗的點(diǎn)。使用火焰圖可以清晰地看出當(dāng)前耗時(shí)比較多的方法,從耗時(shí)較多的方法想辦法入手優(yōu)化,可以更大程度上提升性能。
具體的做法是開啟 Broker 的火焰圖采樣,然后對(duì)其進(jìn)行壓測(cè)(同時(shí)生產(chǎn)和消費(fèi)),然后觀察其火焰圖中方法的時(shí)間占用百分比,優(yōu)化占用時(shí)間高且可以優(yōu)化的地方。
A. 移除 MessageExt 中的反向 DNS 查找
eliminate reverse DNS lookup in MessageExt

inetAddress.getHostName() 方法中會(huì)有反向 DNS 查找,可能耗時(shí)較多。于是優(yōu)化成沒有反向 DNS 查找的 getHostString() 方法
(MessageExt#getBornHostNameString() 方法在一個(gè)異常流程中被調(diào)用,優(yōu)化此方法其實(shí)對(duì)性能沒有什么提升)
B.1. 優(yōu)化 RocketMQ 通信協(xié)議 Header 解碼性能
[Part B] Improve header encode/decode performance
(該提交未合入 4.9.3 版本,將于 4.9.4 版本發(fā)布)
PartB 有兩個(gè)提交,其實(shí)作用不同,但是由于第二個(gè)提交依賴第一個(gè)所以只能放到一起
尋找優(yōu)化點(diǎn)

RocketMQ 的通信協(xié)議定義了各種指令(消息發(fā)送、拉取等等)。其中 Header 是協(xié)議頭,數(shù)據(jù)是序列化后的json。json 的每個(gè) key 字段都是固定的,不同的通訊請(qǐng)求字段不一樣,但是其中有一個(gè) extField 是完全自定義的,每個(gè)指令都不一樣。所有指令當(dāng)前共用了一個(gè)通用的解析方法 RemotingCommand#decodeCommandCustomHeader,基于反射來解析和設(shè)置消息 Header。
// SendMessageRequestHeaderV2
{
"code":310,
"extFields":{
"f":"0",
"g":"1482158310125",
"d":"4",
"e":"0",
"b":"TopicTest",
"c":"TBW102",
"a":"please_rename_unique_group_name",
"j":"0",
"k":"false",
"h":"0",
"i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
},
"flag":0,
"language":"JAVA",
"opaque":206,
"version":79
}
上面是一個(gè)發(fā)送消息的請(qǐng)求 Header。由于各種指令對(duì)應(yīng)的 Header 的 extField 不同,這個(gè)解析 Header 方法內(nèi)部大量使用反射來設(shè)置屬性,效率很低。而且這個(gè)解碼方法應(yīng)用廣泛,在 RocketMQ 網(wǎng)絡(luò)通信時(shí)都會(huì)用到(如發(fā)送消息、拉取消息),所以很有優(yōu)化的必要。
優(yōu)化方案
優(yōu)化的方案是盡量減少反射的使用,將常用的指令解碼方法抽象出來。
這里引入了 FastCodesHeader 接口,只要實(shí)現(xiàn)這個(gè)接口,解碼時(shí)就走具體的實(shí)現(xiàn)類而不用反射。

然后為生產(chǎn)消息和消費(fèi)消息的協(xié)議單獨(dú)實(shí)現(xiàn)解碼方法,內(nèi)部可以不用反射而是直接進(jìn)行字段賦值,這樣雖然繁瑣但是執(zhí)行速度最快。
// SendMessageRequestHeaderV2.java
@Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {
String str = getAndCheckNotNull(fields, "a");
if (str != null) {
a = str;
}
str = getAndCheckNotNull(fields, "b");
if (str != null) {
b = str;
}
str = getAndCheckNotNull(fields, "c");
if (str != null) {
c = str;
}
str = getAndCheckNotNull(fields, "d");
if (str != null) {
d = Integer.parseInt(str);
}
// ......
}
B.2. 提高編解碼性能
[Part B] Improve RocketMQSerializable performance with zero-copy
(該提交未合入 4.9.3 版本,將于 4.9.4 版本發(fā)布)
改動(dòng)背景

RocketMQ 的協(xié)議 Header 序列化協(xié)議有倆
- RemotingSerializable:內(nèi)部用 fastjson 進(jìn)行序列化反序列化,為當(dāng)前版本使用的序列化協(xié)議。
- RocketMQSerializable:RocketMQ 實(shí)現(xiàn)的序列化協(xié)議,性能對(duì)比 fastjson 沒有決定性優(yōu)勢(shì),當(dāng)前默認(rèn)沒有使用。
// RemotingCommand.java
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
優(yōu)化方法
這個(gè)提交優(yōu)化了 RocketMQSerializable 的性能,具體的方法是消除了 RocketMQSerializable 中多余的拷貝和對(duì)象創(chuàng)建,使用 Netty 的 ByteBuf 替換 Java 的 ByteBuffer,性能更高。
- 對(duì)于寫字符串:Netty 的
ByteBuf有直接 put 字符串的方法writeCharSequence(CharSequence sequence, Charset charset),少一次內(nèi)存拷貝,效率更高。 - 對(duì)于寫 Byte:Netty 的
writeByte(int value)傳入一個(gè)int,Java 傳入一個(gè)字節(jié)put(byte b)。當(dāng)前 CPU 都是 32 位、64 位的,對(duì) int 處理更高效。
(該改動(dòng)要在 Producer 和 Consumer 設(shè)置使用 RocketMQ 序列化協(xié)議才能生效)
System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());
提交說明上的 zero-copy 說的不是操作系統(tǒng)層面上的零拷貝,而是對(duì)于 ByteBuf 的零拷貝。
在 NettyEncoder 中用 fastEncodeHeader 替換原來的 encodeHeader 方法,直接傳入 ByteBuf 進(jìn)行操作,不需要用 Java 的 ByteBuffer 中轉(zhuǎn)一下,少了一次拷貝。

public void fastEncodeHeader(ByteBuf out) {
int bodySize = this.body != null ? this.body.length : 0;
int beginIndex = out.writerIndex();
// skip 8 bytes
out.writeLong(0);
int headerSize;
// 如果是 RocketMQ 序列化協(xié)議
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
this.makeCustomHeaderToNet();
}
// 調(diào)用 RocketMQ 序列化協(xié)議編碼
headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out);
} else {
this.makeCustomHeaderToNet();
byte[] header = RemotingSerializable.encode(this);
headerSize = header.length;
out.writeBytes(header);
}
out.setInt(beginIndex, 4 + headerSize + bodySize);
out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
}
rocketMQProtocolEncode 中直接操作 ByteBuf,沒有拷貝和新對(duì)象的創(chuàng)建。
public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) {
int beginIndex = out.writerIndex();
// int code(~32767)
out.writeShort(cmd.getCode());
// LanguageCode language
out.writeByte(cmd.getLanguage().getCode());
// int version(~32767)
out.writeShort(cmd.getVersion());
// int opaque
out.writeInt(cmd.getOpaque());
// int flag
out.writeInt(cmd.getFlag());
// String remark
String remark = cmd.getRemark();
if (remark != null && !remark.isEmpty()) {
writeStr(out, false, remark);
} else {
out.writeInt(0);
}
int mapLenIndex = out.writerIndex();
out.writeInt(0);
if (cmd.readCustomHeader() instanceof FastCodesHeader) {
((FastCodesHeader) cmd.readCustomHeader()).encode(out);
}
HashMap<String, String> map = cmd.getExtFields();
if (map != null && !map.isEmpty()) {
map.forEach((k, v) -> {
if (k != null && v != null) {
writeStr(out, true, k);
writeStr(out, false, v);
}
});
}
out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4);
return out.writerIndex() - beginIndex;
}
C. 緩存 parseChannelRemoteAddr() 方法的結(jié)果
cache the result of parseChannelRemoteAddr()
尋找優(yōu)化點(diǎn)

從火焰圖中可以看到,parseChannelRemoteAddr() 這個(gè)方法占用了 5% 左右的總耗時(shí)。
這個(gè)方法被客戶端在發(fā)送消息時(shí)調(diào)用,每次發(fā)送消息都會(huì)調(diào)用到這個(gè)方法,這也是他占用如此高 CPU 耗時(shí)百分比的原因。
那么這個(gè)方法做了什么?Netty 的 Channel 相當(dāng)于一個(gè) HTTP 連接,這個(gè)方法試圖從 Channel 中獲取遠(yuǎn)端的地址。
從火焰圖上看出,該方法的 toString占用大量時(shí)間,其中主要包含了復(fù)雜的 String 拼接和處理方法。
那么想要優(yōu)化這個(gè)方法最直接的方式就是——緩存其結(jié)果,避免多次調(diào)用。
具體優(yōu)化方法
Netty 提供了 AttributeKey 這個(gè)類,用于將 HTTP 連接的狀態(tài)保存在 Channel 上。AttributeKey 相當(dāng)于一個(gè) Key-Value 對(duì),用來存儲(chǔ)狀態(tài)。
要使用 AttributeKey,需要先初始化它的 Key,這樣它就可以預(yù)先計(jì)算 Key 的 HashCode,查詢?cè)?Key 的時(shí)候效率就很高了。
private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");
然后優(yōu)化該方法,第一次調(diào)用該方法時(shí)嘗試從 Channel 上獲取屬性RemoteAddr,如果獲取不到,則調(diào)用原來的邏輯去獲取并且緩存到該 AttributeKey 中。

修改過后在火焰圖上已經(jīng)幾乎看不到該方法的用時(shí)。
D. 提升 createUniqID() 的性能
Improve performance of createUniqID().
尋找優(yōu)化點(diǎn)

createUniqID() 這個(gè)方法用于創(chuàng)建消息的全局唯一 ID,在客戶端每次發(fā)送消息時(shí)會(huì)調(diào)用,為每個(gè)消息創(chuàng)建全局唯一 ID。
RocketMQ 中包含兩個(gè)消息 ID,分別為全局唯一 ID(UNIQUE_ID,消息發(fā)送時(shí)由客戶端生產(chǎn))和偏移量 ID(offsetMsgId,Broker 保存消息時(shí)由保存的偏移量生成),關(guān)于這兩個(gè) ID 的生成方法和使用可以看丁威老師的 RocketMQ msgId與offsetMsgId釋疑。
原本生成全局 ID 的方法將客戶端 IP、進(jìn)程 ID 等信息組合計(jì)算生成一個(gè)字符串。方法邏輯里面包含了大量字符串和 ByteBuffer 操作,所以耗時(shí)較高。
優(yōu)化方法
原先的方法實(shí)現(xiàn)中,每次調(diào)用都會(huì)創(chuàng)建 StringBuilder 、ByteBuffer、多個(gè)字符串……包含大量字符串操作,字符串操作的 CPU 耗時(shí)開銷很大。
優(yōu)化的方法主要通過字符數(shù)組運(yùn)算替代字符串操作,避免多余的字符串對(duì)象產(chǎn)生;使用緩存,避免每次調(diào)用都重新計(jì)算和創(chuàng)建字符串對(duì)象。
-
將原來的
FIX_STRING字符串換成char[]字符數(shù)組,然后可以使用System.arraycopy替換原來的StringBuilder操作,避免多余對(duì)象產(chǎn)生。image-20220411221546009 -
新增了
void writeInt(char[] buffer, int pos, int value)和writeShort(char[] buffer, int pos, int value)方法,用于寫入字符串?dāng)?shù)組。image-20220411222306938原先的
byte2string方法創(chuàng)建了char[]對(duì)象和String對(duì)象,并且 String 對(duì)象構(gòu)造時(shí)需要拷貝一遍 char[]。優(yōu)化之后完全沒有新對(duì)象產(chǎn)生。image-20220411222509675
E. 當(dāng)沒有用到 namespace 時(shí),避免其被多次調(diào)用
eliminate duplicated getNamespace() call when where is no namespace
尋找優(yōu)化點(diǎn)

客戶端在發(fā)送消息時(shí)會(huì)調(diào)用 getNamespace 方法。Namespace 功能在 RocketMQ 中用的很少,它在 4.5.1 版本中被引進(jìn),具體可以看 #1120。它的作用是引入 Namespace 的概念,相同名稱的 Topic 如果 Namespace 不同,那么可以表示不同的 Topic。
優(yōu)化方法
由于大部分情況下都用不到 Namespace,所以可以增加一個(gè)判斷,如果不用 Namespace,就不走 Namespace 的一些驗(yàn)證和匹配邏輯。
具體的方法很簡(jiǎn)單,在 ClientConfig 設(shè)一個(gè)布爾值,用來表示 Namespace 是否初始化(是否使用),如果不使用,則跳過 getNamespace() 方法中后面的邏輯。

F. 去除 Topic/Group 名稱的正則匹配檢查
eliminate regex match in topic/group name check
每次發(fā)消息時(shí),無論是客戶端還是服務(wù)端都需要檢查一次這個(gè)消息的 Topic/Group 是否合法。檢查通過正則表達(dá)式匹配來進(jìn)行,匹配規(guī)則很簡(jiǎn)單,就是檢查這個(gè)名稱的字符是否在一些字符范圍內(nèi) String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"。那么就可以把這個(gè)正則表達(dá)式匹配給優(yōu)化掉,使用字符來匹配,將正則匹配簡(jiǎn)化成位圖查表的過程,優(yōu)化性能。
因?yàn)檎齽t表達(dá)式匹配的字符編碼都在 128 范圍內(nèi),所以先創(chuàng)建一個(gè)位圖,大小為 128。
public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];
然后用位圖匹配的方式替換正則匹配:檢查的字符串的每一個(gè)字符是否在位圖中。

注意這里有一句
// 將位圖從堆復(fù)制到棧里(本地變量),提高下面循環(huán)的變量訪問速度
boolean[] bitMap = VALID_CHAR_BIT_MAP;
將靜態(tài)變量位圖復(fù)制到局部變量中,這樣做的用意是將堆中的變量復(fù)制到棧上(因?yàn)榫植孔兞慷嘉挥跅#?,提高下面循環(huán)中訪問該位圖的速度。
- 棧上存儲(chǔ)的數(shù)據(jù),很大機(jī)會(huì)會(huì)被虛擬機(jī)分配至物理機(jī)器的高速寄存器中存儲(chǔ)。因而讀寫效率比從堆內(nèi)存中讀寫高很多。
- 棧上分配的數(shù)據(jù),釋放時(shí)只需要通過移動(dòng)棧頂指針,就可以隨著棧幀出棧而自動(dòng)銷毀。而堆內(nèi)存的釋放由垃圾回收器負(fù)責(zé)完成,這一過程中整理和回收內(nèi)存都需要消耗更多的時(shí)間。
- 棧操作可以被 JIT 優(yōu)化,得到 CPU 指令的加速
- 棧沒有碎片,尋址間距短,可以被 CPU 預(yù)測(cè)行為
- 棧無需釋放內(nèi)存和進(jìn)行隨機(jī)尋址
G. 支持發(fā)送 batch 消息時(shí)支持不同的 Topic/Queue
support send batch message with different topic/queue
該改動(dòng)依賴 Part.B ,還未提交 PR
H. 避免無謂的 StringBuilder 擴(kuò)容
eliminate StringBuilder auto resize in PullRequestHoldService.buildKey() when topic length is greater than 14, this method called twice for each message
在 Broker 處理消息消費(fèi)邏輯時(shí),如果長輪詢被啟用,PullRequestHoldService#buildKey 每條消息會(huì)被調(diào)用 2 次。長輪詢相關(guān)邏輯請(qǐng)移步之前的分析
該方法中初始化一個(gè) StringBuilder,默認(rèn)長度為 16。StringBuilder 會(huì)將 Topic 和 QueueId 進(jìn)行拼接,如果 Topic 名稱過長,會(huì)造成 StringBuilder 的擴(kuò)容,內(nèi)部包含字符串的拷貝。在比較壞的情況下,擴(kuò)容可能會(huì)發(fā)生多次。
那么既然已經(jīng)直到 Topic 的長度,為什么不在 StringBuilder 初始化的時(shí)候就設(shè)定長度呢?這就是這個(gè)優(yōu)化的改動(dòng)。

為什么這里是 toipic.length() + 5?因?yàn)橐话?QueueId 不會(huì)超過 4 位數(shù)(一個(gè) Topic 下面不會(huì)超過 9999 個(gè)隊(duì)列),再加上一個(gè)分隔符,得到 5。
I. 避免無謂的 StringBuffer 擴(kuò)容和 String 格式化
Avoid unnecessary StringBuffer resizing and String Formatting
尋找優(yōu)化點(diǎn)

從火焰圖上看出,在 Broker 處理消息消費(fèi)消息請(qǐng)求時(shí),有許多 String.format 方法開銷非常大,這些方法都是數(shù)據(jù)統(tǒng)計(jì)用的,用來拼接數(shù)據(jù)統(tǒng)計(jì)字典的 Key??梢韵朕k法進(jìn)行優(yōu)化。
優(yōu)化方法
首先這里面有使用 StringBuffer 拼接的邏輯,也沒有預(yù)先設(shè)定長度,存在擴(kuò)容可能性。這里也沒有多線程的情況,所以改成 StringBuilder,并且先計(jì)算好長度,避免擴(kuò)容。

J. 在寫 ConsumeQueue 和 從節(jié)點(diǎn)的 CommitLog 時(shí),使用 MMap 而不是 FileChannel,提升消息消費(fèi) TPS
Use MappedByteBuffer instead of FileChannel to write consume queue and slave commitlog.
當(dāng)消費(fèi)的 Queue 數(shù)量特別多時(shí)( 600 個(gè)),消費(fèi)的 TPS 跟不上。即在 Queue 比較少時(shí)(72 個(gè))消費(fèi)速度可以跟上生產(chǎn)速度(20W),但是當(dāng) Queue 比較多時(shí),消費(fèi)速度只有 7W。
這個(gè)修改可以提升 Queue 特別多時(shí)的消費(fèi)速度。
- 72 個(gè) Queue,消費(fèi)速度從 7W 提升到 20W
- 600 個(gè) Queue,消費(fèi)速度從 7W 提升到 11W
尋找優(yōu)化點(diǎn)
對(duì) Broker 進(jìn)行采樣,發(fā)現(xiàn)創(chuàng)建消費(fèi)索引的 reput 線程中有較大的耗時(shí)占比。

從火焰圖上可以看出,F(xiàn)ileChannel 寫數(shù)據(jù)的耗時(shí)占比比較大,有沒有辦法來優(yōu)化一下?
優(yōu)化方法
我們知道 RocketMQ 寫 CommitLog 是利用 MMap 來提升寫入速度。但是在寫 ConsumeQueue 時(shí)原先用的是 FileChannel 來寫,于是這里改成也使用 MMap 來寫入。
MappedFile.java


具體修改如上兩圖所示,這樣修改之后會(huì)影響兩個(gè)地方:ConsumeQueue (消費(fèi)索引)的寫入和 Slave 節(jié)點(diǎn) CommitLog 的寫入


優(yōu)化過后構(gòu)建 ConsumeQueue 的時(shí)間占比大大減少

K. 將 notifyMessageArriving() 的調(diào)用從 ReputMessageService 線程移到 PullRequestHoldService 線程
move execution of notifyMessageArriving() from ReputMessageService thread to PullRequestHoldService thread
This commit speed up consume qps greatly, in our test up to 200,000 qps.
(該提交未合入 4.9.3 版本,當(dāng)前仍未合入)
這一部分其實(shí)也是為了優(yōu)化 Part.J 中所說的消費(fèi)速度所做的另一個(gè)改動(dòng)。經(jīng)過 Part.J 的修改,600 隊(duì)列下的消費(fèi) TPS 能夠達(dá)到 10w(生產(chǎn) 20w)。這個(gè)修改將消費(fèi) TPS 提升到 20w。
尋找優(yōu)化點(diǎn)

依然是通過查看火焰圖的方法,查看到構(gòu)造消費(fèi)索引的方法中包含了 notifyMessageArriving() 這樣一個(gè)方法,占用較大快的 CPU 時(shí)間。
這個(gè)方法具體在 輪詢機(jī)制 這篇文章中有詳細(xì)解釋。消息消費(fèi)的輪詢機(jī)制指的是在 Push 消費(fèi)時(shí),如果沒有新消息不會(huì)馬上返回,而是掛起一段時(shí)間再重試查詢。
notifyMessageArriving() 的作用是在收到消息時(shí)提醒消費(fèi)者,有新消息來了可以消費(fèi)了,這樣消費(fèi)者會(huì)馬上解除掛起狀態(tài)開始消費(fèi)消息。
這里的優(yōu)化點(diǎn)就是想辦法把這個(gè)方法邏輯從構(gòu)建消費(fèi)索引的邏輯中抽離出去。
優(yōu)化方案 1
首先想到的方法是將 notifyMessageArriving() 用一個(gè)單獨(dú)的線程異步調(diào)用。于是在 PullRequestHoldService 里面采用生產(chǎn)-消費(fèi)模式,啟動(dòng)了一個(gè)新的工作線程,將 notify 任務(wù)扔到一個(gè)隊(duì)列中,讓工作線程去處理,主線程直接返回。
工作線程每次從隊(duì)列中 poll 一批任務(wù),批量進(jìn)行處理(1000 個(gè))。經(jīng)過這個(gè)改動(dòng),TPS 可以上升到 20w,但這帶來了另一個(gè)問題——消息消費(fèi)的延遲變高,達(dá)到 40+ms。


延遲變高的原因是—— RocketMQ 中 ServiceThread 工作線程的 wakeup() 和 waitForRunning() 是弱一致的,沒有加鎖而是采用 CAS 的方法,造成多線程情況下可能會(huì)等待直到超時(shí)。
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
優(yōu)化方案 2
這個(gè)方案是實(shí)際提交的優(yōu)化方案,方案比較復(fù)雜。主要的思想就是將原先的每條消息都通知一次轉(zhuǎn)化為批通知,減少通知次數(shù),減少通知開銷以提升性能。
同樣用生產(chǎn)-消費(fèi)模式,為了同時(shí)保證低延遲和高吞吐引入了 PullNotifyQueue。生產(chǎn)者和消費(fèi)者仍然是操作通知任務(wù)
生產(chǎn)者線程將消息 put 到隊(duì)列中,消費(fèi)者調(diào)用 drain 方法消費(fèi)。
drain 方法中根據(jù)消費(fèi) TPS 做了判斷
- 如果 TPS 小于閾值,則拉到一個(gè)任務(wù)馬上進(jìn)行處理
- 如果 TPS 大于閾值(默認(rèn) 10w),批量拉任務(wù)進(jìn)行通知。一批任務(wù)只需要一次 notify(原先每個(gè)消息都會(huì)通知一次)。此時(shí)會(huì)略微增加消費(fèi)時(shí)延,換來的是消費(fèi)性能大幅提升。
小結(jié)
本文介紹了 RocketMQ 4.9.3 版本中的性能優(yōu)化,主要優(yōu)化了消息生產(chǎn)的速度和大量隊(duì)列情況下消息消費(fèi)的速度。
優(yōu)化的步驟是根據(jù) CPU 耗時(shí)進(jìn)行采樣形成火焰圖,觀察火焰圖中時(shí)間占比較高的方法進(jìn)行針對(duì)性優(yōu)化。
總結(jié)一下用到的優(yōu)化方法主要有
- 代碼硬編碼屬性,用代碼復(fù)雜度換性能
- 對(duì)字符串和字節(jié)數(shù)組操作時(shí)減少創(chuàng)建和拷貝
- 對(duì)于要多次計(jì)算的操作,緩存其結(jié)果
- 鎖內(nèi)的操作盡量移動(dòng)到鎖外進(jìn)行,提前進(jìn)行計(jì)算或者用函數(shù)式接口懶加載
- 使用更高效的容器,如 Netty
ByteBuf - 使用容器時(shí)在初始化時(shí)指定長度,避免動(dòng)態(tài)擴(kuò)容
- 主流程上的分支操作,使用異步而非同步
- 對(duì)于磁盤 I/O,MMap 和 FileChannel 的選擇,需要實(shí)際壓測(cè),大部分情況下 MMap 速度更快且更穩(wěn)定;每次寫入較大數(shù)據(jù)長度時(shí)(4k 左右) FileChannel 速度才更快。具體壓測(cè)結(jié)果請(qǐng)看 java-io-benchmark
歡迎關(guān)注公眾號(hào)【消息中間件】,更新消息中間件的源碼解析和最新動(dòng)態(tài)!
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!


