kafka生產(chǎn)者生產(chǎn)的消息需要存儲在服務(wù)端,那么服務(wù)端就需要保證消息的健壯性,需要保證其線性擴展,負(fù)載均衡,故障容錯等。那么kafka是怎么來做的喃?
分區(qū):指消息按照分區(qū)分布在kafka集群中所有的節(jié)點上;復(fù)制:指分區(qū)都會有多個副本存儲在不同的節(jié)點上。消息存儲:指新的消息總是以追加的方式進行文件存儲。分區(qū)可以做到線性擴展和負(fù)載均衡。復(fù)制可以做到故障容錯。消息追加的方式進行信息存儲可以提供很高的寫效率。
分區(qū) 副本
消息按照主題分類,為了提高消息的并行處理能力,每個主題會有多個分區(qū),為了保證消息的可用性,每個分區(qū)都會有多個副本。主題以分區(qū)的形式存儲在多個代理節(jié)點上,ZK記錄了主題和分區(qū)的對應(yīng)關(guān)系,集群中每個代理節(jié)點都會管理多個主題的多個分區(qū)。
主題采用多個分區(qū),可控制消息寫往不同的節(jié)點,從而分散每個節(jié)點的壓力??蛻舳艘苑謪^(qū)作為最小的處理單位,生產(chǎn)者將消息同時寫入不同的節(jié)點,多個消費者可以同時讀取不同節(jié)點的不同分區(qū)數(shù)據(jù),加快消費消息,降低消息的延遲。
為了保證分區(qū)的可用性,采用副本機制為一個分區(qū)備份多個副本,一個分區(qū)只有一個主副本(Leader),多個備份副本(Follower)。主副本負(fù)責(zé)客戶端的讀寫,備份副本負(fù)責(zé)同步主副本的數(shù)據(jù)。當(dāng)主副本掛掉之后,在多個備份副本中選擇一個作為主副本,繼續(xù)為客戶端提供讀寫服務(wù)。分區(qū)有兩個重要的集合:AR(分區(qū)的所有副本集合)和ISR(和主副本正在同步的副本集合)。副本是真正存儲在消息代理節(jié)點上,持有日志文件對象。客戶端訪問分區(qū),先獲取分區(qū)的主副本,然后獲取主副本所在的消息代理節(jié)點編號,最后從消息代理節(jié)點讀寫主副本對應(yīng)的日志文件。
將分區(qū)數(shù)據(jù)存儲到日志文件上時,每個分區(qū)對應(yīng)一個目錄,目錄下有多個日志分段。同一個目錄下的所有日志分段都屬于同一個分區(qū)。每個日志分段在物理上由一個數(shù)據(jù)文件和一個索引文件組成。數(shù)據(jù)文件存儲的是消息的真正內(nèi)容,索引文件存儲的是數(shù)據(jù)文件的索引信息。為數(shù)據(jù)文件建立索引文件目的是更快的訪問數(shù)據(jù)文件。生產(chǎn)者采用追加的方式將消息寫入日志文件,順序?qū)懕P性能很高,同樣消費者也是順序讀盤,但是消費者在異常恢復(fù)情況下可能需要重新處理消息,這個時候可以利用索引文件重新定位到數(shù)據(jù)文件中的消息。
消息寫入日志文件
每個分區(qū)都有一個日志對象管理分區(qū)的所有日志分段。
生產(chǎn)者在發(fā)送消息時,會在客戶端將屬于同一個分區(qū)的一批消息作為一個生產(chǎn)請求發(fā)送給服務(wù)端。java版本的生產(chǎn)者產(chǎn)生的消息內(nèi)容是字節(jié)緩沖區(qū)(ByteBuffer)。
消息集
消息集中的每條消息都會被分配一個相對偏移量,每一批消息集中消息的相對偏移量都從0開始:第一批消息:[0,1,2,3]。第二批消息:[0,1,2]。
消息集中的每條消息有三個部分組成:偏移量,大小,消息內(nèi)容。消息內(nèi)容包含:鍵值,鍵值的長度,校驗值等數(shù)據(jù)(Record)。
kafka服務(wù)端在存儲消息時,會為每條消息都指定一個唯一的偏移量。同一個分區(qū)的所有日志分段中消息的偏移量從0開始不斷遞增,不同分區(qū)的偏移量直接沒關(guān)系。這也就是kafka只能保證同一個分區(qū)的消息有序性,不能保證跨分區(qū)消息的有序性。
客戶端創(chuàng)建消息集中每條消息的偏移量,都是相對于本批次消息集的偏移量,每批消息的偏移量都是從0開始,但是這個偏移量不能直接存儲在日志文件中。在服務(wù)端要對此偏移量進行轉(zhuǎn)換,計算消息的偏移量時,采用下一個偏移量(nextOffsetMetadata)的值來替換消息中的相對偏移量,這時消息中偏移量就是保存在文件中的絕對偏移量。
疑問:既然要在服務(wù)端進行替換偏移量,為什么還要在客戶端進行設(shè)置,而不在服務(wù)端直接設(shè)置喃?
說明:客戶端生產(chǎn)的消息傳到服務(wù)端時都是轉(zhuǎn)化成了二進制內(nèi)容保存在字節(jié)緩沖區(qū)中,假設(shè)我們在客戶端沒有設(shè)置偏移量寫入字節(jié)緩沖區(qū)中,那么在服務(wù)端存儲消息時,就需要在字節(jié)緩沖區(qū)每條消息前面添加偏移量,這樣就需要修改字節(jié)緩沖區(qū)的大小。不能直接使用原來的緩沖區(qū)。假如我們在客戶端設(shè)置了偏移量,消息格式是固定了。在服務(wù)端存儲消息時,直接修改字節(jié)緩沖區(qū)中每條消息的偏移量的值就行了,其他數(shù)據(jù)內(nèi)容不變,字節(jié)緩沖區(qū)的大小也不會發(fā)生變化。
服務(wù)端將每個分區(qū)的消息追加到日志中是以日志分段為單位的。日志分段中數(shù)據(jù)文件的大小的閥值1G,數(shù)據(jù)文件中存儲的消息達到閥值后會創(chuàng)建一個新的日志分段文件來存儲信息,分區(qū)的消息總是追加到最新的日志分段中,也就是說,一個分區(qū)的日志文件,在任何時刻都只有一個活動的日志分段。每個日志分段都有一個基準(zhǔn)偏移量,在一個日志分段中是固定值,用它來計算出每條消息在當(dāng)前這個日志分段中的絕對偏移量,最后把消息寫到日志分段中。更新日志的下一個偏移量的值(用來設(shè)置下一個消息的絕對偏移量),滿足條件時調(diào)用flush()把消息刷寫到磁盤。
下一個偏移量(nextOffsetMetadata)聲明類型是volatile,volatile類型的變量被修改時,其他所有使用到此變量的線程都能立即看到。服務(wù)端為每條消息指定絕對偏移量,會用nextOffsetMetadata的值作為起始偏移量,將消息寫入日志分段中,獲取到這批消息中最后一條消息的偏移量加上一后更新nextOffsetMetadata。消費者或者備份副本會根據(jù)nextOffsetMetadata最新的值拉取到新寫入的消息。
為消息集分配絕對偏移量時(更新每條消息的偏移量數(shù)據(jù):offset),以nextOffsetMetadata的偏移量作為起始偏移量。分配完后還要更新nextOffsetMetadata的偏移量值,因此獲取nextOffsetMetadata的偏移量值并加一是一個原子操作。
消息的大小和消息內(nèi)容都不變動,如何在字節(jié)緩沖區(qū)中定位到每條消息的偏移量所在的位置:先讀取出消息大小的值,然后計算出下一條消息的起始偏移量,使用字節(jié)緩沖區(qū)提供的定位方法(position())直接定位到下一條消息的起始位置。并不需要按照順序完整的讀取每條消息的實際內(nèi)容,這樣代價太大。????每條消息的長度:8個字節(jié)+4個字節(jié)+消息大小。

日志分段
服務(wù)端處理每批追加到日志分段中的消息集,都是以nextOffsetMetadata作為起始的絕對偏移量。同一個分區(qū)的所有日志分段中,所有消息的偏移量都是遞增的。
消息代理節(jié)點上的一個主題分區(qū)(TopicPartition)對應(yīng)一個日志(Log)。每個日志有多個日志分段(LogSegment),一個日志管理該分區(qū)的所有日志分段。
多個日志分段中,只有一個活動日志分段(activeSegment)來存儲當(dāng)前接收的消息集。其中有幾個重要的方法和變量:
1,activeSegment():獲取segments(所有日志分段集合)中最后一個元素,作為日志最新的活動分段。如果有新的日志分段產(chǎn)生就會加入到segments的最后一個。
2,nextOffsetMetadata:下一個偏移量元數(shù)據(jù),LogOffsetMetadata對象的實例,nextOffsetMetadata的構(gòu)造方法中的值來源于活動日志分段:下一個偏移量的值(nextOffset),基準(zhǔn)偏移量(baseOffset),活動日志分段的大小(size)。
3,logEndOffset:日志的最新偏移量表示下一條消息的偏移量,它的值是nextOffsetMetadata.nextOffset的值。實際上logEndOffset就是當(dāng)前活動日志分段的下一個偏移量的值。
客戶端對消息的讀寫操作會用到日志偏移量元數(shù)據(jù)(LogOffsetMetadata),寫入消息集到日志,下一個偏移量元數(shù)據(jù)(nextOffsetMetadata)中的消息偏移量(nextOffset)會作為消息集的起始偏移量;從日志中讀取消息時,不能超過日志結(jié)束偏移量元數(shù)據(jù)(logEndOffsetMetadata)中的結(jié)束偏移量(logEndOffset)或日志最高水位偏移量元數(shù)據(jù)(highWatermarkMetadata)的最高水位(highWatermark)。
日志偏移量元數(shù)據(jù)(LogOffsetMetadata)包含:消息偏移量(messageOffset),日志分段的基準(zhǔn)偏移量(segmentBaseOffset),消息在日志分段中的物理位置(relativePositionInSement)。
下一個偏移量元數(shù)據(jù)(nextOffsetMetadata),日志結(jié)束偏移量元數(shù)據(jù)(logEndOffsetMetadata),日志最高水位偏移量元數(shù)據(jù)(highWatermarkMetadata)都繼承自日志偏移量元數(shù)據(jù)(LogOffsetMetadata)。
nextOffsetMetadata其實和logEndOffsetMetadata的值是一樣的。但是他們面向的使用對象不一樣,nextOffsetMetadata是寫入時使用面向生產(chǎn)者;logEndOffsetMetadata是讀取時,但是是備份副本使用;highWatermarkMetadata讀取時,但是是消費者讀取使用。
追加消息集到活動日志分段,如果當(dāng)前活動日志分段放不下新追加的消息集,日志會采用滾動方式創(chuàng)建一個新的日志分段,并將消息集追加到新的日志分段。activeSegment指向新創(chuàng)建的日志分段。
達到以下條件會創(chuàng)建新的日志分段
1,追加的消息集的大小加上原有消息集的大小超過了日志分段的閥值,也就是放不下新追加的消息集了。
2,離上次創(chuàng)建日志分段時間到達一定需要滾動創(chuàng)建的時間。
3,索引文件滿了
新創(chuàng)建的日志分段的基準(zhǔn)偏移量的值是nextOffsetMetadata中的消息偏移量的值,也是當(dāng)前活動日志分段的下一個偏移量的值。
每個日志分段由數(shù)據(jù)文件和索引文件組成,數(shù)據(jù)文件保存消息集的具體內(nèi)容,索引文件保存了消息偏移量到物理位置的索引。追加一批消息集到日志分段,數(shù)據(jù)文件實時保存消息集內(nèi)容,而間隔一定的字節(jié)大小才會寫入一個索引條目到索引文件中,索引文件的目的就是:快速定位指定偏移量消息在數(shù)據(jù)文件中的物理位置?;舅悸肥牵航⑾⒔^對偏移量到消息在數(shù)據(jù)文件中的物理位置的映射關(guān)系。
索引文件存儲的形式和特點:
1,不會為每條消息都建立索引。稀疏索引
2,索引條目的偏移量存儲的是相對于基準(zhǔn)偏移量的相對偏移量,不是消息的絕對偏移量。
3,索引條目的相對偏移量和物理位置各占4字節(jié),即一個索引條目占用8個字節(jié)(消息集中的消息絕對偏移量占用8個字節(jié))。
4,索引條目中的相對偏移量是有序的,查詢指定偏移量時,使用二分法查找可以快速確定偏移量的位置。
5,如果指定的偏移量在索引文件中不存在,可以找小于指定偏移量的最大偏移量。
6,稀疏索引可以使用內(nèi)存映射方式,將整個索引文件放入內(nèi)存中,加快偏移量的查詢。
總結(jié):
1,一個日志由多個日志分段組成,日志管理了所有的日志分段。
2,日志用segments保存每個日志分段的基準(zhǔn)偏移量到日志分段的映射關(guān)系。
3,日志分段的基準(zhǔn)偏移量是分區(qū)級別的絕對偏移量。
4,日志分段中第一條消息的絕對偏移量等于日志分段的基準(zhǔn)偏移量。
5,每個日志分段都由一個索引文件和一個數(shù)據(jù)文件組成。
6,日志分段的數(shù)據(jù)文件和索引文件的文件名稱以基準(zhǔn)偏移量命名。
7,數(shù)據(jù)文件保存每天消息的格式是:消息的絕對偏移量,消息的大小,消息的內(nèi)容。
8,索引文件保存消息偏移量和消息在數(shù)據(jù)文件中的物理位置。
9,索引文件中索引條目的鍵存儲的值是:消息的絕對偏移量減去基準(zhǔn)偏移量。
10,索引文件通過內(nèi)存映射方式,將整個索引文件加載到內(nèi)存中,加快文件的讀取。
讀取日志
客戶端拉取分區(qū)主副本的消息集,一定會指定拉取偏移量,發(fā)送拉取請求給服務(wù)端。服務(wù)端處理客戶端的拉取請求,會返回從客戶端指定偏移量開始讀取的消息集,當(dāng)然不能有多少消息集就返回多少消息集,客戶端會指定拉取的消息量,默認(rèn)大小為1M(max.partition.fetch.bytes),而日志分段對應(yīng)的數(shù)據(jù)文件大小默認(rèn)是1G,所以服務(wù)端在讀取消息集的時候只需要選擇其中一個分段,就可以滿足客戶端的一次拉取請求。
選擇日志分段要結(jié)合客戶端指定的偏移量和日志分段的基準(zhǔn)偏移量,因為日志分段的基準(zhǔn)偏移量是日志分段中第一條消息的偏移量,所以可以根據(jù)客戶端指定的拉取偏移量從所有基準(zhǔn)偏移量中選擇出小于等于拉取偏移量的最大值的日志分段,這樣的選擇叫做floor。
讀日志分段應(yīng)該先讀取索引文件再讀取數(shù)據(jù)文件:根據(jù)起始偏移量(startOffset)到讀取索引文件中對應(yīng)的最近的物理位置(startPosition),根據(jù)起始位置直接定位到數(shù)據(jù)文件,開始讀取數(shù)據(jù)文件消息,最多只能讀到數(shù)據(jù)文件的結(jié)束位置(maxPosition)。以下圖片來自于《Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計與實現(xiàn)》

由于索引文件采用稀疏索引方式存儲,所有給定一個偏移量到索引文件中去查找不一定能找到對應(yīng)的索引條目。但是又因為索引文件中所有索引條目的偏移量都是遞增的,可以找到離目標(biāo)偏移量最近的索引條目偏移量(小于目標(biāo)偏移量的最大值,和找日志分段類似)
從索引文件到數(shù)據(jù)文件的步驟:
1,起始偏移量為350,找到索引文件中偏移量為345的索引條目,對應(yīng)的物理位置是328.
2,根據(jù)物理位置直接定位到數(shù)據(jù)文件的328文件位置。
3,讀取每條消息的偏移量,但是不讀取消息內(nèi)容。
4,步驟3最終會找到偏移量為350的消息,得到物理位置448。
5,定位到數(shù)據(jù)文件的448位置,開始真正的讀取起始偏移量為350的消息內(nèi)容。
追加消息時,索引條目的偏移量是基于日志分段基準(zhǔn)偏移量的相對偏移盤。 由于客戶端讀取消息給的是絕對偏移量,因此在查詢索引文件之前,要先將絕對偏移量減去日志分段的基準(zhǔn)偏移盤,轉(zhuǎn)換為相對偏移量。 另外,數(shù)據(jù)文件每條消息的偏移量存儲的是絕對偏移量,查找索引文件返回值也應(yīng)該是絕對偏移量 。 但索引條目存儲的是相對偏移量,最后返回的偏移量還要再加上基準(zhǔn)偏移量 。
根據(jù)給定的起始偏移量,先調(diào)用索引文件的lookup()查詢方法,獲得離起始偏移量最近的物理位置,然后再調(diào)用數(shù)據(jù)文件的searchFor()方法,從指定的物理位置開始讀取每條消息,直到找到起始偏移量對應(yīng)的物理位置。
對索引文件進行二分查找會返回兩個信息:偏移量和物理位置。偏移量不一定和起始偏移量對應(yīng),物理位置也不會和起始位置對應(yīng)。還需要搜索數(shù)據(jù)文件才能確定起始偏移量對應(yīng)的起始位置。索引文件中每個索引條目占用8字節(jié),索引條目中偏移量和物理位置各向占用4字節(jié)。在查找過程中,需要讀取出偏移量的值,然后和目標(biāo)偏移量進行比較。
由于消息集中的每條消息都是由:絕對偏移量,消息大小,消息內(nèi)容組成。要查詢數(shù)據(jù)文件中的消息偏移量等于目標(biāo)偏移量的消息,我們只需讀取每條消息的前兩部分?jǐn)?shù)據(jù),因為它們占用的字節(jié)緩沖區(qū)是固定的12個字節(jié)(8個字節(jié)的絕對偏移量+4個字節(jié)的消息大?。_@種方式不會讀取消息內(nèi)容,跳躍式的查找。讀出絕對偏移量是為了和目標(biāo)偏移量進行比較,讀出消息大小是為了當(dāng)絕對偏移量是為了和目標(biāo)偏移量不相等的時候,需要跳到下一個消息,再進行比較。
文件消息集是數(shù)據(jù)文件的實現(xiàn)類,文件消息集的讀取方法根據(jù)起始位置和讀取大小,創(chuàng)建一個新的文件消息集視圖,每次調(diào)用讀取方法都會生成一個新的文件消息集對象。如果客戶端每次拉取消息都在同一個日志分段中,數(shù)據(jù)文件是同一個,說明同一個文件消息集會調(diào)用多次讀取方法,每次都會創(chuàng)建一個新的文件消息集,但所有的文件消息集都共用同一個文件和文件通道。和日志分段相關(guān)的文件消息集叫做原始文件消息集,調(diào)用原始文件消息集生成的新文件消息集叫做文件消息集視圖。兩種消息集有關(guān)的操作:
1,生產(chǎn)者生產(chǎn)的字節(jié)緩沖區(qū)消息集會追加到日志分段對應(yīng)的原始文件消息集。
2,原始文件消息集會將自己緩沖區(qū)消息集寫入到數(shù)據(jù)文件底層的文件通道中。
3,服務(wù)端處理客戶端的拉取請求,讀取日志分段,調(diào)用原始文件消息集的讀取方法。
4,原始文件消息集讀取方法會生成一個局部的文件消息集視圖,和數(shù)據(jù)文件底層的文件通道相關(guān)。
5,局部文件消息集視圖發(fā)送拉取響應(yīng)結(jié)果給客戶端,會將文件通道的字節(jié)直接傳輸給網(wǎng)絡(luò)通道。
全局的可變的原始文件消息集可以接受消息集的追加,會在每次處理客戶端的拉取請求時,生成不可變的局部的文件消息集視圖。文件消息集視圖的消息集表示一次拉取請求的分區(qū)數(shù)據(jù),最終被封裝到拉取響應(yīng)中,通過服務(wù)端的網(wǎng)絡(luò)通道發(fā)送給客戶端。
參考資料:
Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計與實現(xiàn)