分區(qū)日志、日志段、日志段索引、日志段時(shí)間索引、日志段位移索引、稀疏索引文件
消息設(shè)計(jì)
JMM要求Java對(duì)象必須按照8字節(jié)對(duì)齊,未對(duì)齊部分會(huì)填充空白字節(jié)進(jìn)行補(bǔ)齊,該操作稱為padding。
Kafka使用Java NIO的ByteBuffer來(lái)保存消息,同時(shí)依賴文件系統(tǒng)的頁(yè)緩存機(jī)制,而非Java的堆緩存。ByteBuffer是緊湊的二進(jìn)制字節(jié)結(jié)構(gòu),不需要padding操作。
Kafka消息格式有3個(gè)版本:V0、V1和V2。
消息層次分為兩層:消息集合(message set)和消息。
一個(gè)消息集合包含若干個(gè)日志項(xiàng),日志項(xiàng)封裝實(shí)際的消息和一組元數(shù)據(jù)信息。Kafka日志文件是一系列消息集合日志項(xiàng)構(gòu)成的。Kafka不會(huì)在消息層面上直接操作,而是在消息集合上進(jìn)行寫入。
V2之前的版本使用的是日志項(xiàng)(log entry),V2版本則使用消息批次(record batch)。
V2版本借鑒了Google ProtoBuffer中的Zig-zag編碼方式。
PID、producer epoch和序列號(hào)等信息都是0.11.0.0版本為了實(shí)現(xiàn)冪等性producer和支持事務(wù)而引入的。PID標(biāo)識(shí)一個(gè)冪等性producer的ID值,producer epoch標(biāo)識(shí)某個(gè)PID攜帶的當(dāng)前版本號(hào),broker使用PID和epoch來(lái)確定當(dāng)前合法的producer實(shí)例,并以此阻止過(guò)期producer向broker生產(chǎn)消息。序列號(hào)主要實(shí)現(xiàn)消息生產(chǎn)的冪等性。Kafka依靠序列號(hào)來(lái)辨別消息是否已經(jīng)提交成功。
副本與ISR設(shè)計(jì)
一個(gè)Kafka分區(qū)本質(zhì)上是一個(gè)備份日志,即利用多份相同的備份共同提供冗余機(jī)制來(lái)保證系統(tǒng)高可用性。這些備份被稱為副本(replica)。Kafka把分區(qū)的所有副本均勻的分配到所有broker上,并從中選一個(gè)作為leader副本對(duì)外提供服務(wù);其余的作為follow副本,被動(dòng)的向leader請(qǐng)求數(shù)據(jù),從而保持與leader副本的同步。
ISR,是Kafka集群動(dòng)態(tài)維護(hù)的同步副本集合(in-sync replicas)。每個(gè)topic分區(qū)都有自己的ISR列表,ISR中的所有副本都與leader保持同步狀態(tài)。只有ISR中的副本才能被選舉為leader。
producer寫入的一條消息只有被ISR中的所有副本都接收到,才被視為“已提交”狀態(tài)。
0.9.0.0版本之前,參數(shù)replica.lag.max.messages用于控制follower副本落后leader副本的消息數(shù)。超過(guò)這個(gè)消息數(shù),則該follower被視為“不同步”狀態(tài),從而被踢出ISR。
下列原因?qū)е耭olllower與leader不同步:
- 請(qǐng)求速度追不上。
- 進(jìn)程卡住。
- 新創(chuàng)建的副本。
在0.9.0.0版本之前,提供參數(shù)replica.lag.time.max.ms用于檢測(cè)后面兩種情況。前面的參數(shù)只檢測(cè)第一種情況。
自0.9.0.0版本之后,Kafka去掉了replica.lag.max.messages參數(shù),改用replica.lag.time.max.ms參數(shù)同時(shí)檢測(cè)由于慢以及進(jìn)程卡殼而導(dǎo)致的滯后(lagging)---follower副本落后leader副本的時(shí)間間隔。默認(rèn)是10秒。對(duì)于“請(qǐng)求速度追不上”的情況,檢測(cè)機(jī)制也發(fā)生了變化---如果一個(gè)follower副本落后leader的時(shí)間持續(xù)性的超過(guò)這個(gè)參數(shù)值,則該follower副本被認(rèn)為是“不同步”的。
在Kafka中,水印表示的是位置信息,即位移(offset)。
任意時(shí)刻,HW指向的是實(shí)實(shí)在在的消息位置,而LEO指向下一條待寫入的消息位置。消費(fèi)者無(wú)法消費(fèi)分區(qū)leader副本上位移大于分區(qū)HW的消息。分區(qū)HW就是leader副本的HW值。
Kafka使用HW值來(lái)決定副本備份的進(jìn)度,而HW值的更新需要另一輪FETCH請(qǐng)求才能完成,該設(shè)計(jì)會(huì)導(dǎo)致下面問(wèn)題:
- 備份數(shù)據(jù)丟失。
- 備份數(shù)據(jù)不一致。
Kafka在0.11.0.0版本中引入leader epoch值解決上面的基于水印備份機(jī)制的兩個(gè)問(wèn)題。
造成上面兩個(gè)問(wèn)題的根本原因在于HW值被用于衡量副本備份的成功與否,以及在出現(xiàn)崩潰時(shí)作為日志截?cái)嗟囊罁?jù)。但HW值的更新是異步延遲,特別是需要額外的FETCH請(qǐng)求流程才能更新,故這中間發(fā)生任何崩潰都可能導(dǎo)致HW值的過(guò)期。
leader端用一段內(nèi)存區(qū)域保持leader的epoch消息。所謂領(lǐng)導(dǎo)者epoch,實(shí)際是一對(duì)值(epoch,offset)。epoch表示leader的版本號(hào),從0開始,當(dāng)leader變更時(shí),epoch自動(dòng)加1,offset對(duì)應(yīng)該epoch版本的leader第一條消息的位移。
每個(gè)副本引入新的狀態(tài)來(lái)保持自己當(dāng)leader時(shí)開始寫入的第一條消息的offset以及l(fā)eader版本,這樣恢復(fù)的時(shí)候使用這些信息而非水位來(lái)判斷是否需要截?cái)嗳罩尽?/p>
日志存儲(chǔ)設(shè)計(jì)
Kafka不會(huì)直接將原生消息寫入日志文件,而是將消息和必要的元數(shù)據(jù)信息打包在一起封裝成一個(gè)record寫入日志。也就是消息集合或消息batch。
Kafka自定義了消息格式并且在寫入日志前序列化成緊湊的二進(jìn)制字節(jié)數(shù)組來(lái)保存日志。
Kafka的日志是以分區(qū)為單位,每個(gè)分區(qū)都有自己的日志,該日志被稱為分區(qū)日志(partition log)。具體對(duì)每個(gè)日志而言,細(xì)分為日志段文件(log segment file)以及日志段索引文件。
日志段索引文件包括位移索引文件(.index)和時(shí)間索引文件(.timeindex)。它們都屬于稀疏索引文件(sparse index file),Kafka不會(huì)為每條消息保存對(duì)應(yīng)的索引項(xiàng)。而是寫入若干條記錄后才增加一個(gè)索引項(xiàng)。由broker端的log.index.interval.bytes參數(shù)控制。默認(rèn)值是4KB。索引文件默認(rèn)是10MB。
通過(guò)時(shí)間索引查找后,還需要到位移索引文件中定位真正的物理文件位置。
底層文件系統(tǒng)
Kafka使用日志段文件的第一條記錄對(duì)應(yīng)的offset來(lái)命名.log文件。默認(rèn)大小是1GB。日志段文件填滿后,會(huì)自動(dòng)創(chuàng)建一組新的日志段文件和索引文件---這被稱為日志切分(log rolling)。
- 日志留存。默認(rèn)清除7天前的日志信息?;蛘呋谌罩疚募笮。J(rèn)不清除)。
- 日志壓實(shí)。確保topic每個(gè)分區(qū)下的每個(gè)相同key的消息都保存最新value。日志壓實(shí)是topic級(jí)別的。
clients端請(qǐng)求處理流程
- 確認(rèn)目標(biāo)broker。
- 創(chuàng)建與broker的TCP連接并一直保持
JAVA版本clients采有了類似于Linux select和epoll的實(shí)現(xiàn)機(jī)制,在底層把I/O操作完全托管給JAVA NIO的Selector,故在輪詢這一步中clients會(huì)檢查有無(wú)真正的I/O事件發(fā)生。比如發(fā)送請(qǐng)求或獲取請(qǐng)求甚至是連接重建或斷開。若不是連接斷開,clients會(huì)對(duì)接收到的響應(yīng)指向?qū)?yīng)的回調(diào)邏輯;如果是連接斷開,則clients會(huì)自動(dòng)重建連接。
broker端請(qǐng)求處理流程
broker啟動(dòng)時(shí)會(huì)創(chuàng)建一個(gè)請(qǐng)求阻塞隊(duì)列,用于接收從clients端發(fā)送的請(qǐng)求。創(chuàng)建若干個(gè)請(qǐng)求處理線程來(lái)獲取并處理該阻塞隊(duì)列中的請(qǐng)求。
controller設(shè)計(jì)
在Kafka集群中,某個(gè)broker作為控制器。用于管理集群中所有分區(qū)的狀態(tài)并執(zhí)行相應(yīng)的管理操作。
controller維護(hù)兩類狀態(tài):每臺(tái)broker上的分區(qū)副本和每個(gè)分區(qū)的leader副本信息。從維度上看,這些狀態(tài)分為副本狀態(tài)和分區(qū)狀態(tài)。
broker請(qǐng)求處理流程
Kafka broker處理請(qǐng)求的模式是reactor設(shè)計(jì)模式。reactor設(shè)計(jì)模式是一種事件處理模式,旨在處理多個(gè)輸入源同時(shí)發(fā)送過(guò)來(lái)的請(qǐng)求。reactor模式中的服務(wù)請(qǐng)求器或分發(fā)器將入站請(qǐng)求按照多路復(fù)用的方式分發(fā)到對(duì)應(yīng)的請(qǐng)求處理器。
在Kafka中對(duì)應(yīng)于reactor模式的“事件”實(shí)際上是連向broker的Socket連接通道,而不是clients端發(fā)送過(guò)來(lái)的真實(shí)請(qǐng)求。
多路復(fù)用(multiplexing),用很少的線程維護(hù)多個(gè)連接。clients端通常會(huì)保存與broker的長(zhǎng)連接,因此不需要頻繁的重建socket連接,故broker端固定使用一個(gè)acceptor線程來(lái)唯一的監(jiān)聽入站連接。
目前,broker以線程組而非線程池的方式來(lái)實(shí)現(xiàn)process線程組,之后使用數(shù)組索引輪詢方式依次給每個(gè)process線程分配任務(wù),實(shí)現(xiàn)了負(fù)載均衡。
Kafka創(chuàng)建KafkaRequestHandler線程池專門處理真正的請(qǐng)求。process只是將socket連接上接收到的請(qǐng)求放入請(qǐng)求隊(duì)列中。這個(gè)請(qǐng)求隊(duì)列默認(rèn)大小是500,超過(guò)后會(huì)阻塞。
broker還會(huì)創(chuàng)建與process線程數(shù)等量的響應(yīng)隊(duì)列。
Kafka在設(shè)計(jì)上使用了JAVA NIO的Selector + Channel + Buffer的思想,在每個(gè)process線程中維護(hù)一個(gè)Selector實(shí)例,并通過(guò)這個(gè)Selector管理多個(gè)通道上的數(shù)據(jù)交互,這就是多路復(fù)用在process線程上的應(yīng)用。
實(shí)現(xiàn)精確一次處理語(yǔ)義
冪等性producer(idempotent producer)
若一個(gè)操作執(zhí)行多次的結(jié)果與只執(zhí)行一次的結(jié)果是相同的,那么稱該操作為冪等操作。
如果要啟用冪等性producer以及獲取其提供的EOS語(yǔ)義,需要設(shè)置producer端的參數(shù)enable.idempotent為true。
冪等性producer的設(shè)計(jì)思路類似于TCP的工作方式。發(fā)送到broker端的每批消息都會(huì)被賦予一個(gè)序列號(hào)用于消息去重。與TCP不同的是,這個(gè)序列號(hào)不會(huì)被丟棄,而是會(huì)被保存在底層日志中,這樣即使分區(qū)的leader副本掛掉,新選出來(lái)的leader broker也能執(zhí)行消息去重。
除了序列號(hào),Kafka還為每個(gè)producer實(shí)例分配一個(gè)producer id(PID)。
對(duì)于PID、分區(qū)和序列號(hào)的關(guān)系,設(shè)想一個(gè)Map,key是(PID,分區(qū)號(hào)),value就是序列號(hào)。即每對(duì)(PID,分區(qū)號(hào))都有對(duì)應(yīng)的序列號(hào)值。若發(fā)送消息的序列號(hào)小于或等于broker端保存的序列號(hào),那么broker會(huì)拒絕這條消息的寫入。
由于每個(gè)新的producer實(shí)例都會(huì)被分配不同的PID。當(dāng)前設(shè)計(jì)只能保證單個(gè)producer實(shí)例的EOS語(yǔ)義,而無(wú)法實(shí)現(xiàn)多個(gè)producer實(shí)例一起提供EOS語(yǔ)義。
事務(wù)
對(duì)事務(wù)的支持是Kafka實(shí)現(xiàn)EOS的第二個(gè)利器。事務(wù)使得clients端程序能夠?qū)⒁唤M消息放入一個(gè)原子性單元中統(tǒng)一處理。
Kakfa Broker Leader的選舉
Kakfa Broker集群受zk管理。broker節(jié)點(diǎn)去zk注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),因?yàn)橹挥幸粋€(gè)broker會(huì)注冊(cè)成功,這個(gè)成功注冊(cè)臨時(shí)節(jié)點(diǎn)的Broker會(huì)成為Broker Controller,其他的broker角色是follower。(這個(gè)過(guò)程叫Controller在ZooKeeper注冊(cè)Watch)。
這個(gè)Controller會(huì)監(jiān)聽其他的Broker的所有信息,如果broker controller宕機(jī)了,在zookeeper上面的那個(gè)臨時(shí)節(jié)點(diǎn)就會(huì)消失,此時(shí)所有的broker又會(huì)一起去zk上注冊(cè)一個(gè)臨時(shí)節(jié)點(diǎn),因?yàn)橹挥幸粋€(gè)broker會(huì)注冊(cè)成功。
例如:一旦有某個(gè)broker宕機(jī),broker controller會(huì)讀取該宕機(jī)broker上所有的partition在zookeeper上的狀態(tài),并選取ISR列表中的一個(gè)replica作為partition leader(如果ISR列表中的replica全掛,選一個(gè)幸存的replica作為leader; 如果該partition的所有的replica都宕機(jī)了,則將新的leader設(shè)置為-1,等待恢復(fù),等待ISR中的任一個(gè)replica“活”過(guò)來(lái),并且選它作為leader;或選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader),這個(gè)broker宕機(jī)的事情,broker controller也會(huì)通知zk,zk就會(huì)通知其他的kafka broker。
Broker Leader的任務(wù)
(1)傳送消息:producer先把message發(fā)送到partition leader,再由leader發(fā)送給其他partition follower。(如果讓producer發(fā)送給每個(gè)replica那就太慢了)
(2)在向Producer發(fā)送ACK前需要保證有多少個(gè)Replica已經(jīng)收到該消息:根據(jù)ack配的個(gè)數(shù)而定。
(3)處理某個(gè)Replica不工作的情況:如果這個(gè)不工作的partition replica不在ack列表中,就是producer在發(fā)送消息到partition leader上,partition leader向partition follower發(fā)送message沒(méi)有響應(yīng)而已,這個(gè)不會(huì)影響整個(gè)系統(tǒng),也不會(huì)有什么問(wèn)題。如果這個(gè)不工作的partition replica在ack列表中的話,producer發(fā)送的message的時(shí)候會(huì)等待這個(gè)不工作的partition replca寫message成功,但是會(huì)等到time out,然后返回失敗因?yàn)槟硞€(gè)ack列表中的partition replica沒(méi)有響應(yīng),此時(shí)kafka會(huì)自動(dòng)的把這個(gè)不工作的partition replica從ack列表中移除,以后的producer發(fā)送message的時(shí)候就不會(huì)有這個(gè)ack列表下的這個(gè)部工作的partition replica了。
(4)處理Failed Replica恢復(fù)回來(lái)的情況:如果partition replica之前在ack列表中,重啟后,需要把這個(gè)partition replica手動(dòng)加到ack列表中。(ack列表是手動(dòng)添加的,出現(xiàn)某個(gè)不工作的partition replica的時(shí)候自動(dòng)從ack列表中移除的)
Partition leader與follower
partition也有l(wèi)eader和follower之分。leader是主partition,producer寫kafka的時(shí)候先寫partition leader,再由partition leader push給其他的partition follower。partition leader與follower的信息受zk控制,一旦partition leader所在的broker節(jié)點(diǎn)宕機(jī),zookeeper會(huì)從其他的broker的partition follower上選擇follower變?yōu)閜arition leader。
集群管理
ZK臨時(shí)節(jié)點(diǎn)的生命周期和客戶端會(huì)話綁定,從而用ZK臨時(shí)節(jié)點(diǎn)來(lái)管理broker生命周期。
通信協(xié)議
Kafka的通信協(xié)議是基于TCP的二進(jìn)制協(xié)議。
跨機(jī)房備份
MirroMaker可以實(shí)現(xiàn)在兩個(gè)Kafka集群間拷貝數(shù)據(jù)。從而實(shí)現(xiàn)跨機(jī)房的數(shù)據(jù)傳輸。
參考
《Apache Kafka實(shí)戰(zhàn)》
http://www.itdecent.cn/p/3d2bbbeea14f