RocketMQ 4.9.3 版本 性能優(yōu)化 源碼剖析

[TOC]

概述

RocketMQ 4.9.1 版本 針對(duì) Broker 做了一系列性能優(yōu)化,提升了消息發(fā)送的 TPS。前文曾就 4.9.1 版本的優(yōu)化做了深入分析。

在 2022 年的 2 月底,RocketMQ 4.9.3 版本 發(fā)布,其對(duì) Broker 做了更進(jìn)一步的性能優(yōu)化,本次優(yōu)化中也包含了生產(chǎn)和消費(fèi)性能的提升。

本文將會(huì)詳解 4.9.3 版本中的性能優(yōu)化點(diǎn)。在 4.9.3 版本中對(duì)延遲消息的優(yōu)化已經(jīng)在另一篇文章中詳解。

本次和上次的性能優(yōu)化主要由快手的黃理老師提交,在 ISSUE#3585 中集中記錄。先來看一下本次性能優(yōu)化的所有優(yōu)化項(xiàng)

We have some performance improvements based on 4.9.2

  1. [Part A] eliminate reverse DNS lookup in MessageExt
  2. [Part B] Improve header encode/decode performance
  3. [Part B] Improve RocketMQSerializable performance with zero-copy
  4. [Part C] cache result for parseChannelRemoteAddr()
  5. [Part D] improve performance of createUniqID()
  6. [Part E] eliminate duplicated getNamespace() call when where is no namespace
  7. [Part F] eliminate regex match in topic/group name check
  8. [Part G] [Work in progress] support send batch message with different topic/queue
  9. [Part H] eliminate StringBuilder auto resize in PullRequestHoldService.buildKey() when topic length is greater than 14, this method called twice for each message.
  10. [Part I] Avoid unnecessary StringBuffer resizing and String Formatting
  11. [Part J] Use mmap buffer instead of FileChannel when writing consume queue and slave commit log, which greatly speed up consume tps.
  12. Part K move execution of notifyMessageArriving() from ReputMessageService thread to PullRequestHoldService thread.

These commits almost eliminate bad performance methods in the cpu flame graph in producer side.

下面來逐條剖析

性能優(yōu)化

想要優(yōu)化性能,首先需要找到 RocketMQ 的 Broker 在處理消息時(shí)性能損耗的點(diǎn)。使用火焰圖可以清晰地看出當(dāng)前耗時(shí)比較多的方法,從耗時(shí)較多的方法想辦法入手優(yōu)化,可以更大程度上提升性能。

具體的做法是開啟 Broker 的火焰圖采樣,然后對(duì)其進(jìn)行壓測(cè)(同時(shí)生產(chǎn)和消費(fèi)),然后觀察其火焰圖中方法的時(shí)間占用百分比,優(yōu)化占用時(shí)間高且可以優(yōu)化的地方。

A. 移除 MessageExt 中的反向 DNS 查找

eliminate reverse DNS lookup in MessageExt

#3586

image-20220411212011338

inetAddress.getHostName() 方法中會(huì)有反向 DNS 查找,可能耗時(shí)較多。于是優(yōu)化成沒有反向 DNS 查找的 getHostString() 方法

MessageExt#getBornHostNameString() 方法在一個(gè)異常流程中被調(diào)用,優(yōu)化此方法其實(shí)對(duì)性能沒有什么提升)

B.1. 優(yōu)化 RocketMQ 通信協(xié)議 Header 解碼性能

[Part B] Improve header encode/decode performance

#3588

(該提交未合入 4.9.3 版本,將于 4.9.4 版本發(fā)布)

PartB 有兩個(gè)提交,其實(shí)作用不同,但是由于第二個(gè)提交依賴第一個(gè)所以只能放到一起

尋找優(yōu)化點(diǎn)

RocketMQ 的通信協(xié)議定義了各種指令(消息發(fā)送、拉取等等)。其中 Header 是協(xié)議頭,數(shù)據(jù)是序列化后的json。json 的每個(gè) key 字段都是固定的,不同的通訊請(qǐng)求字段不一樣,但是其中有一個(gè) extField 是完全自定義的,每個(gè)指令都不一樣。所有指令當(dāng)前共用了一個(gè)通用的解析方法 RemotingCommand#decodeCommandCustomHeader,基于反射來解析和設(shè)置消息 Header。

// SendMessageRequestHeaderV2
{  
    "code":310,
    "extFields":{  
        "f":"0",
        "g":"1482158310125",
        "d":"4",
        "e":"0",
        "b":"TopicTest",
        "c":"TBW102",
        "a":"please_rename_unique_group_name",
        "j":"0",
        "k":"false",
        "h":"0",
        "i":"TAGS\u0001TagA\u0002WAIT\u0001true\u0002"
    },
    "flag":0,
    "language":"JAVA",
    "opaque":206,
    "version":79
}

上面是一個(gè)發(fā)送消息的請(qǐng)求 Header。由于各種指令對(duì)應(yīng)的 Header 的 extField 不同,這個(gè)解析 Header 方法內(nèi)部大量使用反射來設(shè)置屬性,效率很低。而且這個(gè)解碼方法應(yīng)用廣泛,在 RocketMQ 網(wǎng)絡(luò)通信時(shí)都會(huì)用到(如發(fā)送消息、拉取消息),所以很有優(yōu)化的必要。

優(yōu)化方案

優(yōu)化的方案是盡量減少反射的使用,將常用的指令解碼方法抽象出來。

這里引入了 FastCodesHeader 接口,只要實(shí)現(xiàn)這個(gè)接口,解碼時(shí)就走具體的實(shí)現(xiàn)類而不用反射。

然后為生產(chǎn)消息和消費(fèi)消息的協(xié)議單獨(dú)實(shí)現(xiàn)解碼方法,內(nèi)部可以不用反射而是直接進(jìn)行字段賦值,這樣雖然繁瑣但是執(zhí)行速度最快。

// SendMessageRequestHeaderV2.java
@Override
public void decode(HashMap<String, String> fields) throws RemotingCommandException {

    String str = getAndCheckNotNull(fields, "a");
    if (str != null) {
        a = str;
    }

    str = getAndCheckNotNull(fields, "b");
    if (str != null) {
        b = str;
    }

    str = getAndCheckNotNull(fields, "c");
    if (str != null) {
        c = str;
    }

    str = getAndCheckNotNull(fields, "d");
    if (str != null) {
        d = Integer.parseInt(str);
    }

    // ......
}

B.2. 提高編解碼性能

[Part B] Improve RocketMQSerializable performance with zero-copy

#3588

(該提交未合入 4.9.3 版本,將于 4.9.4 版本發(fā)布)

改動(dòng)背景

RocketMQ 的協(xié)議 Header 序列化協(xié)議有倆

  • RemotingSerializable:內(nèi)部用 fastjson 進(jìn)行序列化反序列化,為當(dāng)前版本使用的序列化協(xié)議。
  • RocketMQSerializable:RocketMQ 實(shí)現(xiàn)的序列化協(xié)議,性能對(duì)比 fastjson 沒有決定性優(yōu)勢(shì),當(dāng)前默認(rèn)沒有使用。
// RemotingCommand.java
private static SerializeType serializeTypeConfigInThisServer = SerializeType.JSON;

private byte[] headerEncode() {
    this.makeCustomHeaderToNet();
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {
        return RemotingSerializable.encode(this);
    }
}

優(yōu)化方法

這個(gè)提交優(yōu)化了 RocketMQSerializable 的性能,具體的方法是消除了 RocketMQSerializable 中多余的拷貝和對(duì)象創(chuàng)建,使用 Netty 的 ByteBuf 替換 Java 的 ByteBuffer,性能更高。

  • 對(duì)于寫字符串:Netty 的 ByteBuf 有直接 put 字符串的方法 writeCharSequence(CharSequence sequence, Charset charset),少一次內(nèi)存拷貝,效率更高。
  • 對(duì)于寫 Byte:Netty 的 writeByte(int value) 傳入一個(gè) int,Java 傳入一個(gè)字節(jié) put(byte b)。當(dāng)前 CPU 都是 32 位、64 位的,對(duì) int 處理更高效。

(該改動(dòng)要在 Producer 和 Consumer 設(shè)置使用 RocketMQ 序列化協(xié)議才能生效)

System.setProperty(RemotingCommand.SERIALIZE_TYPE_PROPERTY, SerializeType.ROCKETMQ.name());

提交說明上的 zero-copy 說的不是操作系統(tǒng)層面上的零拷貝,而是對(duì)于 ByteBuf 的零拷貝。

NettyEncoder 中用 fastEncodeHeader 替換原來的 encodeHeader 方法,直接傳入 ByteBuf 進(jìn)行操作,不需要用 Java 的 ByteBuffer 中轉(zhuǎn)一下,少了一次拷貝。

public void fastEncodeHeader(ByteBuf out) {
    int bodySize = this.body != null ? this.body.length : 0;
    int beginIndex = out.writerIndex();
    // skip 8 bytes
    out.writeLong(0);
    int headerSize;
    // 如果是 RocketMQ 序列化協(xié)議
    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
        if (customHeader != null && !(customHeader instanceof FastCodesHeader)) {
            this.makeCustomHeaderToNet();
        }
        // 調(diào)用 RocketMQ 序列化協(xié)議編碼
        headerSize = RocketMQSerializable.rocketMQProtocolEncode(this, out);
    } else {
        this.makeCustomHeaderToNet();
        byte[] header = RemotingSerializable.encode(this);
        headerSize = header.length;
        out.writeBytes(header);
    }
    out.setInt(beginIndex, 4 + headerSize + bodySize);
    out.setInt(beginIndex + 4, markProtocolType(headerSize, serializeTypeCurrentRPC));
}

rocketMQProtocolEncode 中直接操作 ByteBuf,沒有拷貝和新對(duì)象的創(chuàng)建。

public static int rocketMQProtocolEncode(RemotingCommand cmd, ByteBuf out) {
    int beginIndex = out.writerIndex();
    // int code(~32767)
    out.writeShort(cmd.getCode());
    // LanguageCode language
    out.writeByte(cmd.getLanguage().getCode());
    // int version(~32767)
    out.writeShort(cmd.getVersion());
    // int opaque
    out.writeInt(cmd.getOpaque());
    // int flag
    out.writeInt(cmd.getFlag());
    // String remark
    String remark = cmd.getRemark();
    if (remark != null && !remark.isEmpty()) {
        writeStr(out, false, remark);
    } else {
        out.writeInt(0);
    }

    int mapLenIndex = out.writerIndex();
    out.writeInt(0);
    if (cmd.readCustomHeader() instanceof FastCodesHeader) {
        ((FastCodesHeader) cmd.readCustomHeader()).encode(out);
    }
    HashMap<String, String> map = cmd.getExtFields();
    if (map != null && !map.isEmpty()) {
        map.forEach((k, v) -> {
            if (k != null && v != null) {
                writeStr(out, true, k);
                writeStr(out, false, v);
            }
        });
    }
    out.setInt(mapLenIndex, out.writerIndex() - mapLenIndex - 4);
    return out.writerIndex() - beginIndex;
}

C. 緩存 parseChannelRemoteAddr() 方法的結(jié)果

cache the result of parseChannelRemoteAddr()

#3589

尋找優(yōu)化點(diǎn)

image-20220411213226971

從火焰圖中可以看到,parseChannelRemoteAddr() 這個(gè)方法占用了 5% 左右的總耗時(shí)。

這個(gè)方法被客戶端在發(fā)送消息時(shí)調(diào)用,每次發(fā)送消息都會(huì)調(diào)用到這個(gè)方法,這也是他占用如此高 CPU 耗時(shí)百分比的原因。

那么這個(gè)方法做了什么?Netty 的 Channel 相當(dāng)于一個(gè) HTTP 連接,這個(gè)方法試圖從 Channel 中獲取遠(yuǎn)端的地址。

從火焰圖上看出,該方法的 toString占用大量時(shí)間,其中主要包含了復(fù)雜的 String 拼接和處理方法。

那么想要優(yōu)化這個(gè)方法最直接的方式就是——緩存其結(jié)果,避免多次調(diào)用。

具體優(yōu)化方法

Netty 提供了 AttributeKey 這個(gè)類,用于將 HTTP 連接的狀態(tài)保存在 Channel 上。AttributeKey 相當(dāng)于一個(gè) Key-Value 對(duì),用來存儲(chǔ)狀態(tài)。

要使用 AttributeKey,需要先初始化它的 Key,這樣它就可以預(yù)先計(jì)算 Key 的 HashCode,查詢?cè)?Key 的時(shí)候效率就很高了。

    private static final AttributeKey<String> REMOTE_ADDR_KEY = AttributeKey.valueOf("RemoteAddr");

然后優(yōu)化該方法,第一次調(diào)用該方法時(shí)嘗試從 Channel 上獲取屬性RemoteAddr,如果獲取不到,則調(diào)用原來的邏輯去獲取并且緩存到該 AttributeKey 中。

image-20220411215152793

修改過后在火焰圖上已經(jīng)幾乎看不到該方法的用時(shí)。

D. 提升 createUniqID() 的性能

Improve performance of createUniqID().

#3590

尋找優(yōu)化點(diǎn)

image-20220411222721408

createUniqID() 這個(gè)方法用于創(chuàng)建消息的全局唯一 ID,在客戶端每次發(fā)送消息時(shí)會(huì)調(diào)用,為每個(gè)消息創(chuàng)建全局唯一 ID。

RocketMQ 中包含兩個(gè)消息 ID,分別為全局唯一 ID(UNIQUE_ID,消息發(fā)送時(shí)由客戶端生產(chǎn))和偏移量 ID(offsetMsgId,Broker 保存消息時(shí)由保存的偏移量生成),關(guān)于這兩個(gè) ID 的生成方法和使用可以看丁威老師的 RocketMQ msgId與offsetMsgId釋疑。

原本生成全局 ID 的方法將客戶端 IP、進(jìn)程 ID 等信息組合計(jì)算生成一個(gè)字符串。方法邏輯里面包含了大量字符串和 ByteBuffer 操作,所以耗時(shí)較高。

優(yōu)化方法

原先的方法實(shí)現(xiàn)中,每次調(diào)用都會(huì)創(chuàng)建 StringBuilder 、ByteBuffer、多個(gè)字符串……包含大量字符串操作,字符串操作的 CPU 耗時(shí)開銷很大。

優(yōu)化的方法主要通過字符數(shù)組運(yùn)算替代字符串操作,避免多余的字符串對(duì)象產(chǎn)生;使用緩存,避免每次調(diào)用都重新計(jì)算和創(chuàng)建字符串對(duì)象。

  1. 將原來的 FIX_STRING 字符串換成 char[] 字符數(shù)組,然后可以使用 System.arraycopy 替換原來的 StringBuilder 操作,避免多余對(duì)象產(chǎn)生。

    image-20220411221546009
  2. 新增了 void writeInt(char[] buffer, int pos, int value)writeShort(char[] buffer, int pos, int value) 方法,用于寫入字符串?dāng)?shù)組。

    image-20220411222306938

    原先的 byte2string 方法創(chuàng)建了 char[] 對(duì)象和 String 對(duì)象,并且 String 對(duì)象構(gòu)造時(shí)需要拷貝一遍 char[]。優(yōu)化之后完全沒有新對(duì)象產(chǎn)生。

    image-20220411222509675

E. 當(dāng)沒有用到 namespace 時(shí),避免其被多次調(diào)用

eliminate duplicated getNamespace() call when where is no namespace

#3591

尋找優(yōu)化點(diǎn)

image-20220411223612434

客戶端在發(fā)送消息時(shí)會(huì)調(diào)用 getNamespace 方法。Namespace 功能在 RocketMQ 中用的很少,它在 4.5.1 版本中被引進(jìn),具體可以看 #1120。它的作用是引入 Namespace 的概念,相同名稱的 Topic 如果 Namespace 不同,那么可以表示不同的 Topic。

優(yōu)化方法

由于大部分情況下都用不到 Namespace,所以可以增加一個(gè)判斷,如果不用 Namespace,就不走 Namespace 的一些驗(yàn)證和匹配邏輯。

具體的方法很簡(jiǎn)單,在 ClientConfig 設(shè)一個(gè)布爾值,用來表示 Namespace 是否初始化(是否使用),如果不使用,則跳過 getNamespace() 方法中后面的邏輯。

image-20220411224424160

F. 去除 Topic/Group 名稱的正則匹配檢查

eliminate regex match in topic/group name check

#3594

每次發(fā)消息時(shí),無論是客戶端還是服務(wù)端都需要檢查一次這個(gè)消息的 Topic/Group 是否合法。檢查通過正則表達(dá)式匹配來進(jìn)行,匹配規(guī)則很簡(jiǎn)單,就是檢查這個(gè)名稱的字符是否在一些字符范圍內(nèi) String VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$"。那么就可以把這個(gè)正則表達(dá)式匹配給優(yōu)化掉,使用字符來匹配,將正則匹配簡(jiǎn)化成位圖查表的過程,優(yōu)化性能。

因?yàn)檎齽t表達(dá)式匹配的字符編碼都在 128 范圍內(nèi),所以先創(chuàng)建一個(gè)位圖,大小為 128。

public static final boolean[] VALID_CHAR_BIT_MAP = new boolean[128];

然后用位圖匹配的方式替換正則匹配:檢查的字符串的每一個(gè)字符是否在位圖中。

image-20220411231805018

注意這里有一句

// 將位圖從堆復(fù)制到棧里(本地變量),提高下面循環(huán)的變量訪問速度
boolean[] bitMap = VALID_CHAR_BIT_MAP;

將靜態(tài)變量位圖復(fù)制到局部變量中,這樣做的用意是將堆中的變量復(fù)制到棧上(因?yàn)榫植孔兞慷嘉挥跅#?,提高下面循環(huán)中訪問該位圖的速度。

  • 棧上存儲(chǔ)的數(shù)據(jù),很大機(jī)會(huì)會(huì)被虛擬機(jī)分配至物理機(jī)器的高速寄存器中存儲(chǔ)。因而讀寫效率比從堆內(nèi)存中讀寫高很多。
  • 棧上分配的數(shù)據(jù),釋放時(shí)只需要通過移動(dòng)棧頂指針,就可以隨著棧幀出棧而自動(dòng)銷毀。而堆內(nèi)存的釋放由垃圾回收器負(fù)責(zé)完成,這一過程中整理和回收內(nèi)存都需要消耗更多的時(shí)間。
  • 棧操作可以被 JIT 優(yōu)化,得到 CPU 指令的加速
  • 棧沒有碎片,尋址間距短,可以被 CPU 預(yù)測(cè)行為
  • 棧無需釋放內(nèi)存和進(jìn)行隨機(jī)尋址

G. 支持發(fā)送 batch 消息時(shí)支持不同的 Topic/Queue

support send batch message with different topic/queue

該改動(dòng)依賴 Part.B ,還未提交 PR

H. 避免無謂的 StringBuilder 擴(kuò)容

eliminate StringBuilder auto resize in PullRequestHoldService.buildKey() when topic length is greater than 14, this method called twice for each message

#3612

在 Broker 處理消息消費(fèi)邏輯時(shí),如果長輪詢被啟用,PullRequestHoldService#buildKey 每條消息會(huì)被調(diào)用 2 次。長輪詢相關(guān)邏輯請(qǐng)移步之前的分析

該方法中初始化一個(gè) StringBuilder,默認(rèn)長度為 16。StringBuilder 會(huì)將 Topic 和 QueueId 進(jìn)行拼接,如果 Topic 名稱過長,會(huì)造成 StringBuilder 的擴(kuò)容,內(nèi)部包含字符串的拷貝。在比較壞的情況下,擴(kuò)容可能會(huì)發(fā)生多次。

那么既然已經(jīng)直到 Topic 的長度,為什么不在 StringBuilder 初始化的時(shí)候就設(shè)定長度呢?這就是這個(gè)優(yōu)化的改動(dòng)。

image-20220411232605135

為什么這里是 toipic.length() + 5?因?yàn)橐话?QueueId 不會(huì)超過 4 位數(shù)(一個(gè) Topic 下面不會(huì)超過 9999 個(gè)隊(duì)列),再加上一個(gè)分隔符,得到 5。

I. 避免無謂的 StringBuffer 擴(kuò)容和 String 格式化

Avoid unnecessary StringBuffer resizing and String Formatting

#3619

尋找優(yōu)化點(diǎn)

從火焰圖上看出,在 Broker 處理消息消費(fèi)消息請(qǐng)求時(shí),有許多 String.format 方法開銷非常大,這些方法都是數(shù)據(jù)統(tǒng)計(jì)用的,用來拼接數(shù)據(jù)統(tǒng)計(jì)字典的 Key??梢韵朕k法進(jìn)行優(yōu)化。

優(yōu)化方法

首先這里面有使用 StringBuffer 拼接的邏輯,也沒有預(yù)先設(shè)定長度,存在擴(kuò)容可能性。這里也沒有多線程的情況,所以改成 StringBuilder,并且先計(jì)算好長度,避免擴(kuò)容。

image-20220411234502755

J. 在寫 ConsumeQueue 和 從節(jié)點(diǎn)的 CommitLog 時(shí),使用 MMap 而不是 FileChannel,提升消息消費(fèi) TPS

Use MappedByteBuffer instead of FileChannel to write consume queue and slave commitlog.

#3657

當(dāng)消費(fèi)的 Queue 數(shù)量特別多時(shí)( 600 個(gè)),消費(fèi)的 TPS 跟不上。即在 Queue 比較少時(shí)(72 個(gè))消費(fèi)速度可以跟上生產(chǎn)速度(20W),但是當(dāng) Queue 比較多時(shí),消費(fèi)速度只有 7W。

這個(gè)修改可以提升 Queue 特別多時(shí)的消費(fèi)速度。

  • 72 個(gè) Queue,消費(fèi)速度從 7W 提升到 20W
  • 600 個(gè) Queue,消費(fèi)速度從 7W 提升到 11W

尋找優(yōu)化點(diǎn)

對(duì) Broker 進(jìn)行采樣,發(fā)現(xiàn)創(chuàng)建消費(fèi)索引的 reput 線程中有較大的耗時(shí)占比。

從火焰圖上可以看出,F(xiàn)ileChannel 寫數(shù)據(jù)的耗時(shí)占比比較大,有沒有辦法來優(yōu)化一下?

優(yōu)化方法

我們知道 RocketMQ 寫 CommitLog 是利用 MMap 來提升寫入速度。但是在寫 ConsumeQueue 時(shí)原先用的是 FileChannel 來寫,于是這里改成也使用 MMap 來寫入。

MappedFile.java

image-20220411235301250
image-20220411235759752

具體修改如上兩圖所示,這樣修改之后會(huì)影響兩個(gè)地方:ConsumeQueue (消費(fèi)索引)的寫入和 Slave 節(jié)點(diǎn) CommitLog 的寫入

image-20220411235323472
image-20220411235923338

優(yōu)化過后構(gòu)建 ConsumeQueue 的時(shí)間占比大大減少

K. 將 notifyMessageArriving() 的調(diào)用從 ReputMessageService 線程移到 PullRequestHoldService 線程

move execution of notifyMessageArriving() from ReputMessageService thread to PullRequestHoldService thread

This commit speed up consume qps greatly, in our test up to 200,000 qps.

#3659

(該提交未合入 4.9.3 版本,當(dāng)前仍未合入)

這一部分其實(shí)也是為了優(yōu)化 Part.J 中所說的消費(fèi)速度所做的另一個(gè)改動(dòng)。經(jīng)過 Part.J 的修改,600 隊(duì)列下的消費(fèi) TPS 能夠達(dá)到 10w(生產(chǎn) 20w)。這個(gè)修改將消費(fèi) TPS 提升到 20w。

尋找優(yōu)化點(diǎn)

依然是通過查看火焰圖的方法,查看到構(gòu)造消費(fèi)索引的方法中包含了 notifyMessageArriving() 這樣一個(gè)方法,占用較大快的 CPU 時(shí)間。

這個(gè)方法具體在 輪詢機(jī)制 這篇文章中有詳細(xì)解釋。消息消費(fèi)的輪詢機(jī)制指的是在 Push 消費(fèi)時(shí),如果沒有新消息不會(huì)馬上返回,而是掛起一段時(shí)間再重試查詢。

notifyMessageArriving() 的作用是在收到消息時(shí)提醒消費(fèi)者,有新消息來了可以消費(fèi)了,這樣消費(fèi)者會(huì)馬上解除掛起狀態(tài)開始消費(fèi)消息。

這里的優(yōu)化點(diǎn)就是想辦法把這個(gè)方法邏輯從構(gòu)建消費(fèi)索引的邏輯中抽離出去。

優(yōu)化方案 1

首先想到的方法是將 notifyMessageArriving() 用一個(gè)單獨(dú)的線程異步調(diào)用。于是在 PullRequestHoldService 里面采用生產(chǎn)-消費(fèi)模式,啟動(dòng)了一個(gè)新的工作線程,將 notify 任務(wù)扔到一個(gè)隊(duì)列中,讓工作線程去處理,主線程直接返回。

工作線程每次從隊(duì)列中 poll 一批任務(wù),批量進(jìn)行處理(1000 個(gè))。經(jīng)過這個(gè)改動(dòng),TPS 可以上升到 20w,但這帶來了另一個(gè)問題——消息消費(fèi)的延遲變高,達(dá)到 40+ms。

循環(huán)等待 0.1s 直到新消息來喚醒線程
新消息來了創(chuàng)建異步任務(wù)并喚醒線程

延遲變高的原因是—— RocketMQ 中 ServiceThread 工作線程的 wakeup()waitForRunning() 是弱一致的,沒有加鎖而是采用 CAS 的方法,造成多線程情況下可能會(huì)等待直到超時(shí)。

public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

protected void waitForRunning(long interval) {
    if (hasNotified.compareAndSet(true, false)) {
        this.onWaitEnd();
        return;
    }

    //entry to wait
    waitPoint.reset();

    try {
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        hasNotified.set(false);
        this.onWaitEnd();
    }
}

優(yōu)化方案 2

這個(gè)方案是實(shí)際提交的優(yōu)化方案,方案比較復(fù)雜。主要的思想就是將原先的每條消息都通知一次轉(zhuǎn)化為批通知,減少通知次數(shù),減少通知開銷以提升性能。

同樣用生產(chǎn)-消費(fèi)模式,為了同時(shí)保證低延遲和高吞吐引入了 PullNotifyQueue。生產(chǎn)者和消費(fèi)者仍然是操作通知任務(wù)

生產(chǎn)者線程將消息 put 到隊(duì)列中,消費(fèi)者調(diào)用 drain 方法消費(fèi)。

drain 方法中根據(jù)消費(fèi) TPS 做了判斷

  • 如果 TPS 小于閾值,則拉到一個(gè)任務(wù)馬上進(jìn)行處理
  • 如果 TPS 大于閾值(默認(rèn) 10w),批量拉任務(wù)進(jìn)行通知。一批任務(wù)只需要一次 notify(原先每個(gè)消息都會(huì)通知一次)。此時(shí)會(huì)略微增加消費(fèi)時(shí)延,換來的是消費(fèi)性能大幅提升。

小結(jié)

本文介紹了 RocketMQ 4.9.3 版本中的性能優(yōu)化,主要優(yōu)化了消息生產(chǎn)的速度和大量隊(duì)列情況下消息消費(fèi)的速度。

優(yōu)化的步驟是根據(jù) CPU 耗時(shí)進(jìn)行采樣形成火焰圖,觀察火焰圖中時(shí)間占比較高的方法進(jìn)行針對(duì)性優(yōu)化。

總結(jié)一下用到的優(yōu)化方法主要有

  • 代碼硬編碼屬性,用代碼復(fù)雜度換性能
  • 對(duì)字符串和字節(jié)數(shù)組操作時(shí)減少創(chuàng)建和拷貝
  • 對(duì)于要多次計(jì)算的操作,緩存其結(jié)果
  • 鎖內(nèi)的操作盡量移動(dòng)到鎖外進(jìn)行,提前進(jìn)行計(jì)算或者用函數(shù)式接口懶加載
  • 使用更高效的容器,如 Netty ByteBuf
  • 使用容器時(shí)在初始化時(shí)指定長度,避免動(dòng)態(tài)擴(kuò)容
  • 主流程上的分支操作,使用異步而非同步
  • 對(duì)于磁盤 I/O,MMap 和 FileChannel 的選擇,需要實(shí)際壓測(cè),大部分情況下 MMap 速度更快且更穩(wěn)定;每次寫入較大數(shù)據(jù)長度時(shí)(4k 左右) FileChannel 速度才更快。具體壓測(cè)結(jié)果請(qǐng)看 java-io-benchmark

歡迎關(guān)注公眾號(hào)【消息中間件】,更新消息中間件的源碼解析和最新動(dòng)態(tài)!

本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容