19 消息隊列:如何降低消息隊列系統(tǒng)中消息的延遲?

你好,我是唐揚。

學完前面兩節(jié)課之后,相信你對在垂直電商項目中,如何使用消息隊列應對秒殺時的峰值流量已經(jīng)有所了解。當然了,你也應該知道要如何做,才能保證消息不會丟失,盡量避免消息重復帶來的影響。那么我想讓你思考一下:除了這些內(nèi)容,你在使用消息隊列時還需要關(guān)注哪些點呢?

先來看一個場景:在你的垂直電商項目中,你會在用戶下單支付之后,向消息隊列里面發(fā)送一條消息,隊列處理程序消費了消息后,會增加用戶的積分,或者給用戶發(fā)送優(yōu)惠券。那么用戶在下單之后,等待幾分鐘或者十幾分鐘拿到積分和優(yōu)惠券是可以接受的,但是一旦消息隊列出現(xiàn)大量堆積,用戶消費完成后幾小時還拿到優(yōu)惠券,那就會有用戶投訴了。

這時,你要關(guān)注的就是消息隊列中,消息的延遲了,這其實是消費性能的問題,那么你要如何提升消費性能,保證更短的消息延遲呢?在我看來,你首先需要掌握如何來監(jiān)控消息的延遲,因為有了數(shù)據(jù)之后,你才可以知道目前的延遲數(shù)據(jù)是否滿足要求,也可以評估優(yōu)化之后的效果。然后,你要掌握使用消息隊列的正確姿勢,以及關(guān)注消息隊列本身是如何保證消息盡快被存儲和投遞的。

接下來,我們先來看看第一點:如何監(jiān)控消息延遲。

如何監(jiān)控消息延遲

在我看來,監(jiān)控消息的延遲有兩種方式:

使用消息隊列提供的工具,通過監(jiān)控消息的堆積來完成;

通過生成監(jiān)控消息的方式來監(jiān)控消息的延遲情況。

接下來,我?guī)銓嶋H了解一下。

假設(shè)在開篇的場景之下,電商系統(tǒng)中的消息隊列已經(jīng)堆積了大量的消息,那么你要想監(jiān)控消息的堆積情況,首先需要從原理上了解,在消息隊列中消費者的消費進度是多少,因為這樣才方便計算當前的消費延遲是多少。比方說,生產(chǎn)者向隊列中一共生產(chǎn)了 1000 條消息,某一個消費者消費進度是 900 條,那么這個消費者的消費延遲就是 100 條消息。

在 Kafka 中,消費者的消費進度在不同的版本上是不同的。

在 Kafka0.9 之前的版本中,消費進度是存儲在 ZooKeeper 中的,消費者在消費消息的時候,先要從 ZooKeeper 中獲取最新的消費進度,再從這個進度的基礎(chǔ)上消費后面的消息。

在 Kafka0.9 版本之后,消費進度被遷入到 Kakfa 的一個專門的 topic 叫“__consumer_offsets”里面。所以,如果你了解 kafka 的原理,你可以依據(jù)不同的版本,從不同的位置,獲取到這個消費進度的信息。

當然,作為一個成熟的組件,Kafka 也提供了一些工具來獲取這個消費進度的信息,幫助你實現(xiàn)自己的監(jiān)控,這個工具主要有兩個:

首先,Kafka 提供了工具叫做“kafka-consumer-groups.sh”(它在 Kafka 安裝包的 bin 目錄下)。

為了幫助你理解,我簡單地搭建了一個 Kafka 節(jié)點,并且寫入和消費了一些信息,然后我來使用命令看看消息累積情況,具體的命令如下:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-consumer-group

結(jié)果如下:


image.png

圖中的前兩列是隊列的基本信息,包括話題名和分區(qū)名;

第三列是當前消費者的消費進度;

第四列是當前生產(chǎn)消息的總數(shù);

第五列就是消費消息的堆積數(shù)(也就是第四列與第三列的差值)。

通過這個命令你可以很方便地了解消費者的消費情況。

其次,第二個工具是 JMX。

Kafka 通過 JMX 暴露了消息堆積的數(shù)據(jù),我在本地啟動了一個 console consumer,然后使用 jconsole 連接這個 consumer,你就可以看到這個 consumer 的堆積數(shù)據(jù)了(就是下圖中紅框里的數(shù)據(jù))。這些數(shù)據(jù)你可以寫代碼來獲取,這樣也可以方便地輸出到監(jiān)控系統(tǒng)中,我比較推薦這種方式。

image.png

除了使用消息隊列提供的工具以外,你還可以通過生成監(jiān)控消息的方式,來監(jiān)控消息的延遲。具體怎么做呢?

你先定義一種特殊的消息,然后啟動一個監(jiān)控程序,將這個消息定時地循環(huán)寫入到消息隊列中,消息的內(nèi)容可以是生成消息的時間戳,并且也會作為隊列的消費者消費數(shù)據(jù)。業(yè)務(wù)處理程序消費到這個消息時直接丟棄掉,而監(jiān)控程序在消費到這個消息時,就可以和這個消息的生成時間做比較,如果時間差達到某一個閾值就可以向我們報警。

image.png

這兩種方式都可以監(jiān)控消息的消費延遲情況,而從我的經(jīng)驗出來,我比較推薦兩種方式結(jié)合來使用。比如在我的實際項目中,我會優(yōu)先在監(jiān)控程序中獲取 JMX 中的隊列堆積數(shù)據(jù),做到 dashboard 報表中,同時也會啟動探測進程,確認消息的延遲情況是怎樣的。

在我看來,消息的堆積是對于消息隊列的基礎(chǔ)監(jiān)控,這是你無論如何都要做的。但是,了解了消息的堆積情況,并不能很直觀地了解消息消費的延遲,你也只能利用經(jīng)驗來確定堆積的消息量到了多少才會影響到用戶的體驗;而第二種方式對于消費延遲的監(jiān)控則更加直觀,而且從時間的維度來做監(jiān)控也比較容易確定報警閾值。

了解了消息延遲的監(jiān)控方式之后,我們再來看看如何提升消息的寫入和消費性能,這樣才會讓異步的消息得到盡快的處理。

減少消息延遲的正確姿勢

想要減少消息的處理延遲,我們需要在消費端和消息隊列兩個層面來完成。

在消費端,我們的目標是提升消費者的消息處理能力,你能做的是:

優(yōu)化消費代碼提升性能;

增加消費者的數(shù)量(這個方式比較簡單)。

不過,第二種方式會受限于消息隊列的實現(xiàn)。比如說,如果消息隊列使用的是 Kafka 就無法通過增加消費者數(shù)量的方式,來提升消息處理能力。

因為在 Kafka 中,一個 Topic(話題)可以配置多個 Partition(分區(qū)),數(shù)據(jù)會被平均或者按照生產(chǎn)者指定的方式,寫入到多個分區(qū)中,那么在消費的時候,Kafka 約定一個分區(qū)只能被一個消費者消費,為什么要這么設(shè)計呢?在我看來,如果有多個 consumer(消費者)可以消費一個分區(qū)的數(shù)據(jù),那么在操作這個消費進度的時候就需要加鎖,可能會對性能有一定的影響。

所以說,話題的分區(qū)數(shù)量決定了消費的并行度,增加多余的消費者也是沒有用處的,那么你可以通過增加分區(qū)來提高消費者的處理能力。


image.png

那么,如何在不增加分區(qū)的前提下提升消費能力呢?

既然不能增加 consumer,那么你可以在一個 consumer 中提升處理消息的并行度,所以可以考慮使用多線程的方式來增加處理能力:你可以預先創(chuàng)建一個或者多個線程池,在接收到消息之后,把消息丟到線程池中來異步地處理,這樣,原本串行的消費消息的流程就變成了并行的消費,可以提高消息消費的吞吐量,在并行處理的前提下,我們就可以在一次和消息隊列的交互中多拉取幾條數(shù)據(jù),然后分配給多個線程來處理。


image.png

另外,你在消費隊列中數(shù)據(jù)的時候還需要注意消費線程空轉(zhuǎn)的問題。

我是最初在測試自己寫的一個消息中間件的時候發(fā)現(xiàn)的。當時,我發(fā)現(xiàn)運行消費客戶端的進程會偶發(fā)地出現(xiàn) CPU 跑滿的情況,于是打印了 JVM 線程堆棧,找到了那個跑滿 CPU 的線程。這個時候才發(fā)現(xiàn),原來是消息隊列中,有一段時間沒有新的消息,于是消費客戶端拉取不到新的消息就會不間斷地輪詢拉取消息,這個線程就把 CPU 跑滿了。

所以,你在寫消費客戶端的時候要考慮這種場景,拉取不到消息可以等待一段時間再來拉取,等待的時間不宜過長,否則會增加消息的延遲。我一般建議固定的 10ms~100ms,也可以按照一定步長遞增,比如第一次拉取不到消息等待 10ms,第二次 20ms,最長可以到 100ms,直到拉取到消息再回到 10ms。

說完了消費端的做法之后,再來說說消息隊列本身在讀取性能優(yōu)化方面做了哪些事情。

我曾經(jīng)也做過一個消息中間件,在最初設(shè)計中間件的時候,我主要從兩方面考慮讀取性能問題:

消息的存儲;

零拷貝技術(shù)。

針對第一點,我最初在設(shè)計的時候為了實現(xiàn)簡單,使用了普通的數(shù)據(jù)庫來存儲消息,但是受限于數(shù)據(jù)庫的性能瓶頸,讀取 QPS 只能到 2000,后面我重構(gòu)了存儲模塊,使用本地磁盤作為存儲介質(zhì)。Page Cache 的存在就可以提升消息的讀取速度,即使要讀取磁盤中的數(shù)據(jù),由于消息的讀取是順序的,并且不需要跨網(wǎng)絡(luò)讀取數(shù)據(jù),所以讀取消息的 QPS 提升了一個數(shù)量級。

另外一個優(yōu)化點是零拷貝技術(shù),說是零拷貝,其實,我們不可能消滅數(shù)據(jù)的拷貝,只是盡量減少拷貝的次數(shù)。在讀取消息隊列的數(shù)據(jù)的時候,其實就是把磁盤中的數(shù)據(jù)通過網(wǎng)絡(luò)發(fā)送給消費客戶端,在實現(xiàn)上會有四次數(shù)據(jù)拷貝的步驟:

  1. 數(shù)據(jù)從磁盤拷貝到內(nèi)核緩沖區(qū);
  2. 系統(tǒng)調(diào)用將內(nèi)核緩存區(qū)的數(shù)據(jù)拷貝到用戶緩沖區(qū);
  3. 用戶緩沖區(qū)的數(shù)據(jù)被寫入到 Socket 緩沖區(qū)中;
  4. 操作系統(tǒng)再將 Socket 緩沖區(qū)的數(shù)據(jù)拷貝到網(wǎng)卡的緩沖區(qū)中。
image.png

操作系統(tǒng)提供了 Sendfile 函數(shù),可以減少數(shù)據(jù)被拷貝的次數(shù)。使用了 Sendfile 之后,在內(nèi)核緩沖區(qū)的數(shù)據(jù)不會被拷貝到用戶緩沖區(qū),而是直接被拷貝到 Socket 緩沖區(qū),節(jié)省了一次拷貝的過程,提升了消息發(fā)送的性能。高級語言中對于 Sendfile 函數(shù)有封裝,比如說在 Java 里面的 java.nio.channels.FileChannel 類就提供了 transferTo 方法提供了 Sendfile 的功能。


image.png

課程小結(jié)

本節(jié)課我?guī)懔私饬耍绾翁嵘㈥犃械男阅軄斫档拖⑾M的延遲,這里我想讓你明確的重點是:

我們可以使用消息隊列提供的工具,或者通過發(fā)送監(jiān)控消息的方式,來監(jiān)控消息的延遲情況;

橫向擴展消費者是提升消費處理能力的重要方式;

選擇高性能的數(shù)據(jù)存儲方式,配合零拷貝技術(shù),可以提升消息的消費性能。

其實,隊列是一種常用的組件,只要涉及到隊列,任務(wù)的堆積就是一個不可忽視的問題,我遇到過的很多故障都是源于此。

比如說,前一段時間處理的一個故障,前期只是因為數(shù)據(jù)庫性能衰減有少量的慢請求,結(jié)果這些慢請求占滿了 Tomcat 線程池,導致整體服務(wù)的不可用。如果我們能對 Tomcat 線程池的任務(wù)堆積情況有實時地監(jiān)控,或者說對線程池有一些保護策略,比方說線程全部使用之后丟棄請求,也許就會避免故障的發(fā)生。在此,我希望你在實際的工作中能夠引以為戒,只要有隊列就要監(jiān)控它的堆積情況,把問題消滅在萌芽之中。

轉(zhuǎn)自:19 消息隊列:如何降低消息隊列系統(tǒng)中消息的延遲?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容