07 | Kakfa_生產(chǎn)者消息分區(qū)機(jī)制原理剖析

我們?cè)谑褂?Apache Kafka 生產(chǎn)和消費(fèi)消息的時(shí)候,肯定是希望能夠?qū)?shù)據(jù)均勻地分配到所有服務(wù)器上。比如很多公司使用 Kafka 收集應(yīng)用服務(wù)器的日志數(shù)據(jù),這種數(shù)據(jù)都是很多的,特別是對(duì)于那種大批量機(jī)器組成的集群環(huán)境,每分鐘產(chǎn)生的日志量都能以 GB 數(shù),因此如何將這么大的數(shù)據(jù)量均勻地分配到 Kafka 的各個(gè) Broker 上,就成為一個(gè)非常重要的問(wèn)題。

今天我就來(lái)和你說(shuō)說(shuō) Kafka 生產(chǎn)者如何實(shí)現(xiàn)這個(gè)需求,我會(huì)以 Java API 為例進(jìn)行分析,但實(shí)際上其他語(yǔ)言的實(shí)現(xiàn)邏輯也是類似的。

為什么分區(qū)?

如果你對(duì) Kafka 分區(qū)(Partition)的概念還不熟悉。前面我說(shuō)過(guò) Kafka 有主題(Topic)的概念,它是承載真實(shí)數(shù)據(jù)的邏輯容器,而在主題之下還分為若干個(gè)分區(qū),也就是說(shuō) Kafka 的消息組織方式實(shí)際上是三級(jí)結(jié)構(gòu):主題 - 分區(qū) - 消息。主題下的每條消息只會(huì)保存在某一個(gè)分區(qū)中,而不會(huì)在多個(gè)分區(qū)中被保存多份。官網(wǎng)上的這張圖非常清晰地展示了 Kafka 的三級(jí)結(jié)構(gòu),如下所示:


現(xiàn)在我拋出一個(gè)問(wèn)題你可以先思考一下:你覺(jué)得為什么 Kafka 要做這樣的設(shè)計(jì)?為什么使用分區(qū)的概念而不是直接使用多個(gè)主題呢?

其實(shí)分區(qū)的作用就是提供負(fù)載均衡的能力,或者說(shuō)對(duì)數(shù)據(jù)進(jìn)行分區(qū)的主要原因,就是為了實(shí)現(xiàn)系統(tǒng)的高伸縮性(Scalability)。不同的分區(qū)能夠被放置到不同節(jié)點(diǎn)的機(jī)器上,而數(shù)據(jù)的讀寫操作也都是針對(duì)分區(qū)這個(gè)粒度而進(jìn)行的,這樣每個(gè)節(jié)點(diǎn)的機(jī)器都能獨(dú)立地執(zhí)行各自分區(qū)的讀寫請(qǐng)求處理。并且,我們還可以通過(guò)添加新的節(jié)點(diǎn)機(jī)器來(lái)增加整體系統(tǒng)的吞吐量。

實(shí)際上分區(qū)的概念以及分區(qū)數(shù)據(jù)庫(kù)早在 1980 年就已經(jīng)有大牛們?cè)谧隽?,比如那時(shí)候有個(gè)叫 Teradata 的數(shù)據(jù)庫(kù)就引入了分區(qū)的概念。

值得注意的是,不同的分布式系統(tǒng)對(duì)分區(qū)的叫法也不盡相同。比如在 Kafka 中叫分區(qū),在 MongoDB 和 Elasticsearch 中就叫分片 Shard,而在 HBase 中則叫 Region,在 Cassandra 中又被稱作 vnode。從表面看起來(lái)它們實(shí)現(xiàn)原理可能不盡相同,但對(duì)底層分區(qū)(Partitioning)的整體思想?yún)s從未改變。

除了提供負(fù)載均衡這種最核心的功能之外,利用分區(qū)也可以實(shí)現(xiàn)其他一些業(yè)務(wù)級(jí)別的需求,比如實(shí)現(xiàn)業(yè)務(wù)級(jí)別的消息順序的問(wèn)題,這一點(diǎn)我今天也會(huì)分享一個(gè)具體的案例來(lái)說(shuō)明。

都有哪些分區(qū)策略?

下面我們說(shuō)說(shuō) Kafka 生產(chǎn)者的分區(qū)策略。所謂分區(qū)策略是決定生產(chǎn)者將消息發(fā)送到哪個(gè)分區(qū)的算法。Kafka 為我們提供了默認(rèn)的分區(qū)策略,同時(shí)它也支持你自定義分區(qū)策略。

如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class。這個(gè)參數(shù)該怎么設(shè)定呢?方法很簡(jiǎn)單,在編寫生產(chǎn)者程序時(shí),你可以編寫一個(gè)具體的類實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口。這個(gè)接口也很簡(jiǎn)單,只定義了兩個(gè)方法:partition()close(),通常你只需要實(shí)現(xiàn)最重要的 partition 方法。我們來(lái)看看這個(gè)方法的方法簽名:


int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

這里的topic、key、keyBytes、valuevalueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當(dāng)前 Kafka 集群共有多少主題、多少 Broker 等)。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對(duì)消息進(jìn)行分區(qū),計(jì)算出它要被發(fā)送到哪個(gè)分區(qū)中。只要你自己的實(shí)現(xiàn)類定義好了 partition 方法,同時(shí)設(shè)置partitioner.class參數(shù)為你自己實(shí)現(xiàn)類的 Full Qualified Name,那么生產(chǎn)者程序就會(huì)按照你的代碼邏輯對(duì)消息進(jìn)行分區(qū)。雖說(shuō)可以有無(wú)數(shù)種分區(qū)的可能,但比較常見(jiàn)的分區(qū)策略也就那么幾種,下面我來(lái)詳細(xì)介紹一下。

輪詢策略

也稱 Round-robin 策略,即順序分配。比如一個(gè)主題下有 3 個(gè)分區(qū),那么第一條消息被發(fā)送到分區(qū) 0,第二條被發(fā)送到分區(qū) 1,第三條被發(fā)送到分區(qū) 2,以此類推。當(dāng)生產(chǎn)第 4 條消息時(shí)又會(huì)重新開(kāi)始,即將其分配到分區(qū) 0,就像下面這張圖展示的那樣。

這就是所謂的輪詢策略。輪詢策略是 Kafka Java 生產(chǎn)者 API 默認(rèn)提供的分區(qū)策略。如果你未指定partitioner.class參數(shù),那么你的生產(chǎn)者程序會(huì)按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息。

輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一。

隨機(jī)策略

也稱 Randomness 策略。所謂隨機(jī)就是我們隨意地將消息放置到任意一個(gè)分區(qū)上,如下面這張圖所示。


如果要實(shí)現(xiàn)隨機(jī)策略版的 partition 方法,很簡(jiǎn)單,只需要兩行代碼即可:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return ThreadLocalRandom.current().nextInt(partitions.size());

先計(jì)算出該主題總的分區(qū)數(shù),然后隨機(jī)地返回一個(gè)小于它的正整數(shù)。

本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來(lái)看,它要遜于輪詢策略,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。

按消息鍵保序策略

也稱 Key-ordering 策略。有點(diǎn)尷尬的是,這個(gè)名詞是我自己編的,Kafka 官網(wǎng)上并無(wú)這樣的提法。

Kafka 允許為每條消息定義消息鍵,簡(jiǎn)稱為 Key。這個(gè) Key 的作用非常大,它可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門編號(hào)或是業(yè)務(wù) ID 等;也可以用來(lái)表征消息元數(shù)據(jù)。特別是在 Kafka 不支持時(shí)間戳的年代,在一些場(chǎng)景中,工程師們都是直接將消息創(chuàng)建時(shí)間封裝進(jìn) Key 里面的。一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如下圖所示。

實(shí)現(xiàn)這個(gè)策略的 partition 方法同樣簡(jiǎn)單,只需要下面兩行代碼即可:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return Math.abs(key.hashCode()) % partitions.size();

前面提到的 Kafka 默認(rèn)分區(qū)策略實(shí)際上同時(shí)實(shí)現(xiàn)了兩種策略:如果指定了 Key,那么默認(rèn)實(shí)現(xiàn)按消息鍵保序策略;如果沒(méi)有指定 Key,則使用輪詢策略。

在你了解了 Kafka 默認(rèn)的分區(qū)策略之后,我來(lái)給你講一個(gè)真實(shí)的案例,希望能加強(qiáng)你對(duì)分區(qū)策略重要性的理解。

我曾經(jīng)給一個(gè)國(guó)企進(jìn)行過(guò) Kafka 培訓(xùn),當(dāng)時(shí)碰到的一個(gè)問(wèn)題就是如何實(shí)現(xiàn)消息的順序問(wèn)題。這家企業(yè)發(fā)送的 Kafka 的消息是有因果關(guān)系的,故處理因果關(guān)系也必須要保證有序性,否則先處理了“果”后處理“因”必然造成業(yè)務(wù)上的混亂。

當(dāng)時(shí)那家企業(yè)的做法是給 Kafka 主題設(shè)置單分區(qū),也就是 1 個(gè)分區(qū)。這樣所有的消息都只在這一個(gè)分區(qū)內(nèi)讀寫,因此保證了全局的順序性。這樣做雖然實(shí)現(xiàn)了因果關(guān)系的順序性,但也喪失了 Kafka 多分區(qū)帶來(lái)的高吞吐量和負(fù)載均衡的優(yōu)勢(shì)。

后來(lái)經(jīng)過(guò)了解和調(diào)研,我發(fā)現(xiàn)這種具有因果關(guān)系的消息都有一定的特點(diǎn),比如在消息體中都封裝了固定的標(biāo)志位,后來(lái)我就建議他們對(duì)此標(biāo)志位設(shè)定專門的分區(qū)策略,保證同一標(biāo)志位的所有消息都發(fā)送到同一分區(qū),這樣既可以保證分區(qū)內(nèi)的消息順序,也可以享受到多分區(qū)帶來(lái)的性能紅利。

這種基于個(gè)別字段的分區(qū)策略本質(zhì)上就是按消息鍵保序的思想,其實(shí)更加合適的做法是把標(biāo)志位數(shù)據(jù)提取出來(lái)統(tǒng)一放到 Key 中,這樣更加符合 Kafka 的設(shè)計(jì)思想。經(jīng)過(guò)改造之后,這個(gè)企業(yè)的消息處理吞吐量一下提升了 40 多倍,從這個(gè)案例你也可以看到自定制分區(qū)策略的效果可見(jiàn)一斑。

其他分區(qū)策略

上面這幾種分區(qū)策略都是比較基礎(chǔ)的策略,除此之外你還能想到哪些有實(shí)際用途的分區(qū)策略?其實(shí)還有一種比較常見(jiàn)的,即所謂的基于地理位置的分區(qū)策略。當(dāng)然這種策略一般只針對(duì)那些大規(guī)模的 Kafka 集群,特別是跨城市、跨國(guó)家甚至是跨大洲的集群。

我就拿“極客時(shí)間”舉個(gè)例子吧,假設(shè)極客時(shí)間的所有服務(wù)都部署在北京的一個(gè)機(jī)房(這里我假設(shè)它是自建機(jī)房,不考慮公有云方案。其實(shí)即使是公有云,實(shí)現(xiàn)邏輯也差不多),現(xiàn)在極客時(shí)間考慮在南方找個(gè)城市(比如廣州)再創(chuàng)建一個(gè)機(jī)房;另外從兩個(gè)機(jī)房中選取一部分機(jī)器共同組成一個(gè)大的 Kafka 集群。顯然,這個(gè)集群中必然有一部分機(jī)器在北京,另外一部分機(jī)器在廣州。

假設(shè)極客時(shí)間計(jì)劃為每個(gè)新注冊(cè)用戶提供一份注冊(cè)禮品,比如南方的用戶注冊(cè)極客時(shí)間可以免費(fèi)得到一碗“甜豆腐腦”,而北方的新注冊(cè)用戶可以得到一碗“咸豆腐腦”。如果用 Kafka 來(lái)實(shí)現(xiàn)則很簡(jiǎn)單,只需要?jiǎng)?chuàng)建一個(gè)雙分區(qū)的主題,然后再創(chuàng)建兩個(gè)消費(fèi)者程序分別處理南北方注冊(cè)用戶邏輯即可。

但問(wèn)題是你需要把南北方注冊(cè)用戶的注冊(cè)消息正確地發(fā)送到位于南北方的不同機(jī)房中,因?yàn)樘幚磉@些消息的消費(fèi)者程序只可能在某一個(gè)機(jī)房中啟動(dòng)著。換句話說(shuō),送甜豆腐腦的消費(fèi)者程序只在廣州機(jī)房啟動(dòng)著,而送咸豆腐腦的程序只在北京的機(jī)房中,如果你向廣州機(jī)房中的 Broker 發(fā)送北方注冊(cè)用戶的消息,那么這個(gè)用戶將無(wú)法得到禮品!

此時(shí)我們就可以根據(jù) Broker 所在的 IP 地址實(shí)現(xiàn)定制化的分區(qū)策略。比如下面這段代碼:


List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

我們可以從所有分區(qū)中找出那些 Leader 副本在南方的所有分區(qū),然后隨機(jī)挑選一個(gè)進(jìn)行消息發(fā)送。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文目標(biāo) Topic&Partition 消息分發(fā)策略 消息消費(fèi)原理 消息的存儲(chǔ)策略 Partition 副本機(jī)制...
    JavaEdge閱讀 1,205評(píng)論 1 3
  • 簡(jiǎn)介 ? Kafka起初是由LinkedIn公司采用Scala語(yǔ)言開(kāi)發(fā)的一個(gè)多分區(qū)、多副本且基于Zookeep...
    四夕_y閱讀 889評(píng)論 0 1
  • 本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端,我們可以通過(guò)這個(gè)客...
    printf200閱讀 8,231評(píng)論 0 3
  • 本章我們將會(huì)討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的。Kafka項(xiàng)目有一個(gè)生產(chǎn)者客戶端,我們可以通過(guò)這個(gè)客...
    zwb_jianshu閱讀 503評(píng)論 0 0
  • 媽媽,我生日的時(shí)候,你送我禮物嗎?一聽(tīng)這話,我怎么有被挖陷阱的感覺(jué)呢?立刻打起十二萬(wàn)分的精神,目不轉(zhuǎn)睛地盯著女兒。...
    碎碎妖閱讀 395評(píng)論 2 6

友情鏈接更多精彩內(nèi)容