Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析

Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析

原創(chuàng)文章,轉(zhuǎn)載請(qǐng)務(wù)必將下面這段話置于文章開頭處。(已授權(quán)InfoQ中文站發(fā)布
本文轉(zhuǎn)發(fā)自技術(shù)世界,原文鏈接 http://www.jasongj.com/2015/08/09/KafkaColumn4

摘要

本文主要介紹了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer實(shí)現(xiàn)的語(yǔ)義,以及適用場(chǎng)景。以及未來(lái)版本中對(duì)High Level Consumer的重新設(shè)計(jì)–使用Consumer Coordinator解決Split Brain和Herd等問題。

1 High Level Consumer

很多時(shí)候,客戶程序只是希望從Kafka讀取數(shù)據(jù),不太關(guān)心消息offset的處理。同時(shí)也希望提供一些語(yǔ)義,例如同一條消息只被某一個(gè)Consumer消費(fèi)(單播)或被所有Consumer消費(fèi)(廣播)。因此,Kafka Hight Level Consumer提供了一個(gè)從Kafka消費(fèi)數(shù)據(jù)的高層抽象,從而屏蔽掉其中的細(xì)節(jié)并提供豐富的語(yǔ)義。

給用戶提供消費(fèi)數(shù)據(jù)的高層抽象,屏蔽其中的細(xì)節(jié),提供豐富的語(yǔ)義

2 Consumer Group

High Level Consumer將從某個(gè)Partition讀取的最后一條消息的offset存于Zookeeper中(Kafka從0.8.2版本開始同時(shí)支持將offset存于Zookeeper中與將offset存于專用的Kafka Topic中)。這個(gè)offset基于客戶程序提供給Kafka的名字來(lái)保存,這個(gè)名字被稱為Consumer Group。Consumer Group是整個(gè)Kafka集群全局的,而非某個(gè)Topic的。每一個(gè)High Level Consumer實(shí)例都屬于一個(gè)Consumer Group,若不指定則屬于默認(rèn)的Group。
  Zookeeper中Consumer相關(guān)節(jié)點(diǎn)如下圖所示

Consumer Zookeeper Structure

  
  很多傳統(tǒng)的Message Queue都會(huì)在消息被消費(fèi)完后將消息刪除,一方面避免重復(fù)消費(fèi),另一方面可以保證Queue的長(zhǎng)度比較短,提高效率。而如上文所述,Kafka并不刪除已消費(fèi)的消息,為了實(shí)現(xiàn)傳統(tǒng)Message Queue消息只被消費(fèi)一次的語(yǔ)義,Kafka保證每條消息在同一個(gè)Consumer Group里只會(huì)被某一個(gè)Consumer消費(fèi)。與傳統(tǒng)Message Queue不同的是,Kafka還允許不同Consumer Group同時(shí)消費(fèi)同一條消息,這一特性可以為消息的多元化處理提供支持。

kafka有別去傳統(tǒng)的Message Queue,消費(fèi)后不刪除,允許被不同的Consumer Group同時(shí)消費(fèi)(多元化處理)
kafka consumer group

  
  實(shí)際上,Kafka的設(shè)計(jì)理念之一就是同時(shí)提供離線處理和實(shí)時(shí)處理。根據(jù)這一特性,可以使用Storm這種實(shí)時(shí)流處理系統(tǒng)對(duì)消息進(jìn)行實(shí)時(shí)在線處理,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)實(shí)時(shí)備份到另一個(gè)數(shù)據(jù)中心,只需要保證這三個(gè)操作所使用的Consumer在不同的Consumer Group即可。下圖展示了Kafka在LinkedIn的一種簡(jiǎn)化部署模型。

離線處理(Hadoop) 實(shí)時(shí)處理(Storm)
kafka sample deployment in linkedin

  
  為了更清晰展示Kafka Consumer Group的特性,筆者進(jìn)行了一項(xiàng)測(cè)試。創(chuàng)建一個(gè)Topic (名為topic1),再創(chuàng)建一個(gè)屬于group1的Consumer實(shí)例,并創(chuàng)建三個(gè)屬于group2的Consumer實(shí)例,然后通過(guò)Producer向topic1發(fā)送Key分別為1,2,3的消息。結(jié)果發(fā)現(xiàn)屬于group1的Consumer收到了所有的這三條消息,同時(shí)group2中的3個(gè)Consumer分別收到了Key為1,2,3的消息,如下圖所示。


kafka consumer group

  注:上圖中每個(gè)黑色區(qū)域代表一個(gè)Consumer實(shí)例,每個(gè)實(shí)例只創(chuàng)建一個(gè)MessageStream。實(shí)際上,本實(shí)驗(yàn)將Consumer應(yīng)用程序打成jar包,并在4個(gè)不同的命令行終端中傳入不同的參數(shù)運(yùn)行。

3 High Level Consumer Rebalance

關(guān)鍵詞:Consumer Group Consumer Partition

(本節(jié)所講述Rebalance相關(guān)內(nèi)容均基于Kafka High Level Consumer)
  Kafka保證同一Consumer Group中只有一個(gè)Consumer會(huì)消費(fèi)某條消息,實(shí)際上,Kafka保證的是穩(wěn)定狀態(tài)下每一個(gè)Consumer實(shí)例只會(huì)消費(fèi)某一個(gè)或多個(gè)特定Partition的數(shù)據(jù),而某個(gè)Partition的數(shù)據(jù)只會(huì)被某一個(gè)特定的Consumer實(shí)例所消費(fèi)。也就是說(shuō)Kafka對(duì)消息的分配是以Partition為單位分配的,而非以每一條消息作為分配單元。這樣設(shè)計(jì)的劣勢(shì)是無(wú)法保證同一個(gè)Consumer Group里的Consumer均勻消費(fèi)數(shù)據(jù),優(yōu)勢(shì)是每個(gè)Consumer不用都跟大量的Broker通信,減少通信開銷,同時(shí)也降低了分配難度,實(shí)現(xiàn)也更簡(jiǎn)單。另外,因?yàn)橥粋€(gè)Partition里的數(shù)據(jù)是有序的,這種設(shè)計(jì)可以保證每個(gè)Partition里的數(shù)據(jù)可以被有序消費(fèi)。
  如果某Consumer Group中Consumer(每個(gè)Consumer只創(chuàng)建1個(gè)MessageStream)數(shù)量少于Partition數(shù)量,則至少有一個(gè)Consumer會(huì)消費(fèi)多個(gè)Partition的數(shù)據(jù),如果Consumer的數(shù)量與Partition數(shù)量相同,則正好一個(gè)Consumer消費(fèi)一個(gè)Partition的數(shù)據(jù)。而如果Consumer的數(shù)量多于Partition的數(shù)量時(shí),會(huì)有部分Consumer無(wú)法消費(fèi)該Topic下任何一條消息。

Consumer Group 比較 Topic 分配結(jié)果
Consumer 少于 Partition 至少有一個(gè)Consumer會(huì)消費(fèi)多個(gè)Partition的數(shù)據(jù)
Consumer 相同 Partition 正好一個(gè)Consumer消費(fèi)一個(gè)Partition的數(shù)據(jù)
Consumer 多于 Partition 部分Consumer無(wú)法消費(fèi)該Topic下任何一條消息

實(shí)例:

如下例所示,如果topic1有0,1,2共三個(gè)Partition,當(dāng)group1只有一個(gè)Consumer(名為consumer1)時(shí),該 Consumer可消費(fèi)這3個(gè)Partition的所有數(shù)據(jù)。

  
kafka rebalance 3 partition 1 consumer

  增加一個(gè)Consumer(consumer2)后,其中一個(gè)Consumer(consumer1)可消費(fèi)2個(gè)Partition的數(shù)據(jù)(Partition 0和Partition 1),另外一個(gè)Consumer(consumer2)可消費(fèi)另外一個(gè)Partition(Partition 2)的數(shù)據(jù)。

  
kafka rebalance 3 partitin 2 consumer

  再增加一個(gè)Consumer(consumer3)后,每個(gè)Consumer可消費(fèi)一個(gè)Partition的數(shù)據(jù)。consumer1消費(fèi)partition0,consumer2消費(fèi)partition1,consumer3消費(fèi)partition2。
  
kafka rebalance 3 partition 3 consumer

  再增加一個(gè)Consumer(consumer4)后,其中3個(gè)Consumer可分別消費(fèi)一個(gè)Partition的數(shù)據(jù),另外一個(gè)Consumer(consumer4)不能消費(fèi)topic1的任何數(shù)據(jù)。
  
kafka rebalance 3 partition 4 consumer

  此時(shí)關(guān)閉consumer1,其余3個(gè)Consumer可分別消費(fèi)一個(gè)Partition的數(shù)據(jù)。

  
kafka rebalance 3 partition 3 consumer

  接著關(guān)閉consumer2,consumer3可消費(fèi)2個(gè)Partition,consumer4可消費(fèi)1個(gè)Partition。
  
kafka rebalance 3 partition 2 consumer

  再關(guān)閉consumer3,僅存的consumer4可同時(shí)消費(fèi)topic1的3個(gè)Partition。
  
kafka rebalance 3 partition 1 consumer

Consumer Rebalance的算法如下:

  • 將目標(biāo)Topic下的所有Partirtion排序,存于PTPT
  • 對(duì)某Consumer Group下所有Consumer排序,存于CG于CG,第ii個(gè)Consumer記為CiCi
  • N=size(PT)/size(CG)N=size(PT)/size(CG),向上取整
  • 解除CiCi對(duì)原來(lái)分配的Partition的消費(fèi)權(quán)(i從0開始)
  • 將第i?Ni?N到(i+1)?N?1(i+1)?N?1個(gè)Partition分配給CiCi

目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一個(gè)Consumer通過(guò)在Zookeeper上注冊(cè)Watch完成的。每個(gè)Consumer被創(chuàng)建時(shí)會(huì)觸發(fā)Consumer Group的Rebalance,具體啟動(dòng)流程如下:

  • High Level Consumer啟動(dòng)時(shí)將其ID注冊(cè)到其Consumer Group下,在Zookeeper上的路徑為/consumers/[consumer group]/ids/[consumer id]
  • /consumers/[consumer group]/ids上注冊(cè)Watch
  • /brokers/ids上注冊(cè)Watch
  • 如果Consumer通過(guò)Topic Filter創(chuàng)建消息流,則它會(huì)同時(shí)在/brokers/topics上也創(chuàng)建Watch
  • 強(qiáng)制自己在其Consumer Group內(nèi)啟動(dòng)Rebalance流程

在這種策略下,每一個(gè)Consumer或者Broker的增加或者減少都會(huì)觸發(fā)Consumer Rebalance。因?yàn)槊總€(gè)Consumer只負(fù)責(zé)調(diào)整自己所消費(fèi)的Partition,為了保證整個(gè)Consumer Group的一致性,當(dāng)一個(gè)Consumer觸發(fā)了Rebalance時(shí),該Consumer Group內(nèi)的其它所有其它Consumer也應(yīng)該同時(shí)觸發(fā)Rebalance。

該方式有如下缺陷:

根據(jù)Kafka社區(qū)wiki,Kafka作者正在考慮在還未發(fā)布的0.9.x版本中使用中心協(xié)調(diào)器(Coordinator)。大體思想是為所有Consumer Group的子集選舉出一個(gè)Broker作為Coordinator,由它Watch Zookeeper,從而判斷是否有Partition或者Consumer的增減,然后生成Rebalance命令,并檢查是否這些Rebalance在所有相關(guān)的Consumer中被執(zhí)行成功,如果不成功則重試,若成功則認(rèn)為此次Rebalance成功(這個(gè)過(guò)程跟Replication Controller非常類似)。具體方案將在后文中詳細(xì)闡述。

Low Level Consumer

使用Low Level Consumer (Simple Consumer)的主要原因是,用戶希望比Consumer Group更好的控制數(shù)據(jù)的消費(fèi)。比如:

  • 同一條消息讀多次
  • 只讀取某個(gè)Topic的部分Partition
  • 管理事務(wù),從而確保每條消息被處理一次,且僅被處理一次

與Consumer Group相比,Low Level Consumer要求用戶做大量的額外工作。

  • 必須在應(yīng)用程序中跟蹤offset,從而確定下一條應(yīng)該消費(fèi)哪條消息
  • 應(yīng)用程序需要通過(guò)程序獲知每個(gè)Partition的Leader是誰(shuí)
  • 必須處理Leader的變化

使用Low Level Consumer的一般流程如下

  • 查找到一個(gè)“活著”的Broker,并且找出每個(gè)Partition的Leader
  • 找出每個(gè)Partition的Follower
  • 定義好請(qǐng)求,該請(qǐng)求應(yīng)該能描述應(yīng)用程序需要哪些數(shù)據(jù)
  • Fetch數(shù)據(jù)
  • 識(shí)別Leader的變化,并對(duì)之作出必要的響應(yīng)

Consumer重新設(shè)計(jì)

設(shè)計(jì)方向

1 簡(jiǎn)化消費(fèi)者客戶端
2 中心Coordinator
3 允許手工管理offset
4 Rebalance后觸發(fā)用戶指定的回調(diào)
5 非阻塞式Consumer API :Consumer  Consumer狀態(tài)機(jī)
6 故障檢測(cè)機(jī)制  Coordinator  Coordinator狀態(tài)機(jī)  Coordinator Failover

Kafka系列文章

Kafka設(shè)計(jì)解析(一)- Kafka簡(jiǎn)介及架構(gòu)介紹
Kafka設(shè)計(jì)解析(二)- Kafka High Availability (上)
Kafka設(shè)計(jì)解析(三)- Kafka High Availability (下)
Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析
Kafka設(shè)計(jì)解析(五)- Kafka性能測(cè)試方法及Benchmark報(bào)告
Kafka設(shè)計(jì)解析(六)- Kafka高性能架構(gòu)之道
Kafka設(shè)計(jì)解析(七)- Kafka Stream

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容