日志存儲
基本概念

為了提高寫入的性能,同一個分區(qū)中的消息是順序寫入的,這就避免了隨機寫入帶來的性能問題。一個Topic可以劃分成多個f分區(qū),而每個分區(qū)又有多個副本。當一個分區(qū)的副本(無論是Leader 副本還是Follower 副本)被劃分到某個Broker上時,Kafka就要在此Broker上為此分區(qū)建立相應的Log,而生產者發(fā)送的消息會存儲在Log中,供消費者拉取后消費。
Kafka中存儲的一般都是海量消息數據,為了避免日志文件太大,Log并不是直接對應于磁盤上的一個日志文件,而是對應磁盤上的一個目錄,這個目錄的命名規(guī)則是<topic_name>_<partition_id>,Log與分區(qū)之間的關系是一一對應的,對應分區(qū)中的全部消息都存儲在此目錄下的日志文件中。
Kafka通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個LogSegment對應磁盤上的一個日志文件和一個索引文件,其中日志文件用于記錄消息,索引文件中保存了消息的索引。隨著消息的不斷寫入,日志文件的大小到達一個閾值時,就創(chuàng)建新的日志文件和索引文件繼續(xù)寫入后續(xù)的消息和索引信息。日志文件的文件名的命名規(guī)則是[baseOffset].log,baseOffset是日志文件中第一條消息的offset。圖4-13展示了一個Log的結構。

為了提高查詢消息的效率,每個日志文件都對應一個索引文件,這個索引文件并沒有為每條消息都建立索引項,而是使用稀疏索引方式為日志文件中的部分消息建立了索引。圖4-14展示了索引文件與日志文件之間的對應關系。

介紹完Kafka日志存儲的基本概念之后,下面來分析實現日志存儲功能的相關代碼。
FileMessageSet
在Kafka中使用FileMessageSet管理上文介紹的日志文件,它對應磁盤上的一個真正的日志文件。FileMessageSet繼承了MessageSet抽象類,如圖4-15(a)所示。MessageSet中保存的數據格式分為三部分,如圖4-15(b)所示:8字節(jié)的offset值,4字節(jié)的size表示message data大小,這兩部分組合成為LogOverhead,message data部分保存了消息的數據,邏輯上對應一個Message對象。

Kafka使用Message類表示消息,Message使用ByteBuffer保存數據,其格式及各個部分的含義如圖4-16所示。

·CRC32:4個字節(jié),消息的校驗碼。
·magic:1字節(jié),魔數標識,與消息格式有關,取值為0或1。當magic為0時,消息的offset使用絕對offset且消息格式中沒有timestamp部分;當magic為1時,消息的offset使用相對offset且消息格式中存在timestamp部分。所以,magic值不同,消息的長度是不同的。
·attributes:1字節(jié),消息的屬性。其中第0~2位的組合表示消息使用的壓縮類型,0表示無壓縮,1表示gzip壓縮,2表示snappy壓縮,3表示lz4壓縮。第3位表示時間戳類型,0表示創(chuàng)建時間,1表示追加時間。
·timestamp:時間戳,其含義由attribute的第3位確定。
·key length:消息key的長度。
·key:消息的key。
·value length:消息的value長度。
·value:消息的value。
MessageSet抽象類中定義了兩個比較關鍵的方法:

這兩個方法說明MessageSet具有順序寫入消息和順序讀取的特性。在后面對FileMessageSet和ByteBufferMessageSet的介紹過程中會介紹這兩個方法的實現。
了解了MessageSet抽象類以及其中保存消息的格式,我們開始分析FileMessageSet實現類。FileMessageSet的核心字段如下所述。
- ·file:java.io.File類型,指向磁盤上對應的日志文件。
- ·channel:FileChannel類型,用于讀寫對應的日志文件。
- ·start和end:FileMessageSet對象除了表示一個完整的日志文件,還可以表示日志文件分片(Slice),start和end表示分片的起始位置和結束位置。
- ·isSlice:Boolean類型,表示當前FileMessageSet是否為日志文件的分片。
- ·_size:FileMessa_geSet大小,單位是字節(jié)。如果FileMessageSet是日志文件的分片,則表示分片大?。磂nd-start的值);如果不是分片,則表示整個日志文件的大小。注意,因為可能有多個Handler線程并發(fā)向同一個分區(qū)寫入消息,所有_size是AtomicInteger類型。
在FileMessageSet中有多個重載的構造方法,這里選擇一個比較重要的方法進行介紹。此構造方法會創(chuàng)建一個非分片的FileMessageSet對象。在Window NTFS文件系統(tǒng)以及老版本的Linux文件系統(tǒng)上,進行文件的預分配會提高后續(xù)寫操作的性能,為此FileMessageSet提供了preallocate的選項,決定是否開啟預分配的功能。我們也可以通過FileMessageSet構造函數的mutable參數決定是否創(chuàng)建只讀的FileMessageSet。

在FileMessageSet對象初始化的過程中,會移動FileChannel的position指針,這是為了實現每次寫入的消息都在日志文件的尾部,從而避免重啟服務后的寫入操作覆蓋之前的操作。對于新創(chuàng)建的且進行了預分配空間的日志文件,其end會初始化為0,所以也是從文件起始寫入數據的。

介紹完FileMessageSet的構造過程,下面來分析其讀寫過程。FileMessageSet.append()方法實現了寫日志文件的功能,需要注意的是其參數必須是ByteBufferMessageSet對象,ByteBufferMessageSet的內容后面介紹。下面是FileMessageSet.append()方法的代碼:

查找指定消息的功能在FileMessageSet.searchFor()方法中實現。searchFor()的邏輯是:從指定的startingPosition開始逐條遍歷FileMessageSet中的消息,并將每個消息的offset與targetOffset進行比較,直到offset大于等于targetOffset,最后返回查找到的offset。在整個遍歷過程中不會將消息的key和value讀取到內存,而是只讀取LogOverhead(即offset和size),并通過size定位到下一條消息的開始位置。FileMessageSet.searchFor()方法的代碼如下:

FileMessageSet.writeTo()方法是將FileMessageSet中的數據寫入指定的其他Channel中,這里先了解此方法的功能,具體實現會在后面介紹“零復制”的時候一起介紹。FileMessageSet.read*()方法是從FileMessageSet中讀取數據,可以將FileMessageSet中的數據讀入到別的ByteBuffer中返回,也可以按照指定位置和長度形成分片的FileMessageSet對象返回。FileMessageSet.delete()方法是將整個日志文件刪除。
FileMessageSet還有一個truncateTo()方法,主要負責將日志文件截斷到targetSize大小。此方法在后面介紹分區(qū)中Leader副本切換時還會提到。下面是truncateTo()方法的具體實現:

FileMessageSet還實現了iterator()方法,返回一個迭代器。FileMessageSet迭代器讀取消息的邏輯是:先讀取消息的LogOverhead部分,然后按照size分配合適的ByteBuffer,再讀取message data部分,最后將message data和offset封裝成MessageOffset對象返回。此迭代器的實現與searchFor()方法類似。
ByteBufferMessageSet
MessageSet的另一個子類是ByteBufferMessageSet,FileMessageSet.append()方法的參數就是此類的對象。為什么必須append()方法的參數是ByteBufferMessageSet,而不是直接使用ByteBuffer呢?
在介紹MemoryRecords時提到,向MemoryRecords寫入消息時,可以使用Compressor對消息批量進行壓縮,然后將壓縮后的消息發(fā)送給服務端。
在有些設計中,將每個請求的負載單獨壓縮后再進行傳輸,這種設計雖然可以減小傳輸的數據量,但是存在一個小問題,我們常見壓縮算法是數據量越大壓縮率越高,一般情況下每個請求的負載不會特別大,這就導致壓縮率比較低。Kafka實現的壓縮方式是將多個消息一起進行壓縮,這樣就可以保證較高的壓縮率。而且在一般情況下,生產者發(fā)送的壓縮數據在服務端也是以保持壓縮狀態(tài)進行存儲的,消費者從服務端獲取的也是壓縮消息,消費者在處理消息之前才會解壓消息,這也就實現了“端到端的壓縮”。
壓縮后的消息格式與非壓縮的消息格式類似,但是分為兩層,如圖4-17所示。

壓縮消息的key為null,所以圖4-17沒有畫出key這部分;value中保存的是多條消息壓縮數據。
創(chuàng)建壓縮消息
在開始先回頭看第2章介紹的Compressor的構造函數的代碼:

通過MemoryRecords.append()方法不斷寫入消息并壓縮的過程在第2章已經分析過了,這里不再贅述。當MemoryRecords寫滿,則會調用Compressor.close()方法,完成offset、size、CRC32等字段的寫入,之后就可以發(fā)送到服務端了。Compressor.close()的實現如下:

細心的讀者可能會問,服務端需要為每個消息分配offset,要對消息解壓縮嗎?這里的設計很巧妙,原理如下:
1)當生產者產生創(chuàng)建壓縮信息的時候,對壓縮消息設置的offset是內部offset(inner offset),即分配給每個消息的offset分別是0、1、2……請讀者回顧第2章介紹的RecordBatch.tryAppend()方法。
2)在Kafka服務端為消息分配offset時,會根據外層消息中記錄的內層壓縮消息的個數為外層消息分配offset,為外層消息分配的offset是內層壓縮消息中最后一個消息的offset值,如圖4-18所示。

3)當消費者獲取壓縮消息后進行解壓縮,就可以根據內部消息的、相對的offset和外層消息的offset計算出每個消息的offset值了。
ByteBufferMessageSet分析
介紹完生產者和消費者對壓縮消息的處理過程,我們回到服務端,開始對ByteBufferMessageSet的分析,它底層使用ByteBuffer保存消息數據。ByteBufferMessageSet的角色和功能與MemoryRecords類似。ByteBufferMessageSet提供了三個方面的功能:
(1)將Message集合按照指定的壓縮類型進行壓縮,此功能主要用于構建ByteBufferMessageSet對象,通過ByteBufferMessageSet.create()方法完成。
提供迭代器,實現深層迭代和淺層迭代兩種迭代方式。
3)提供了消息驗證和offset分配的功能。
在ByteBufferMessageSet.create()方法中實現了消息的壓縮以及offset分配,其步驟如下所示。
(1)如果傳入的Message集合為空,則返回空ByteBuffer。
(2)如果要求不對消息進行壓縮,則通過OffsetAssigner分配每個消息的offset,在將消息寫入到ByteBuffer之后,返回ByteBuffer。OffsetAssigner的功能是存儲一串offset值,并像迭代器那樣逐個返回,OffsetAssigner的實現很簡單,讀者可參看源碼學習。
(3)如果要求對消息進行壓縮,則先將Message集合按照指定的壓縮方式進行壓縮并保存到緩沖區(qū),同時也會完成offset的分配,然后按照壓縮消息的格式寫入外層消息,最后將整個外層消息所在的ByteBuffer返回。
我們回到本節(jié)開始的那個問題,FileMessageSet.append()方法會將ByteBufferMessageSet中的全部數據追加到日志文件中,對于壓縮消息來說,多條壓縮消息就以一個外層消息的狀態(tài)存在于分區(qū)日志文件中了。當消費者獲取消息時也會得到壓縮的消息,從而實現“端到端壓縮”。
OffsetIndex
為了提高查找消息的性能,從Kafka 0.8版本開始,為每個日志文件添加了對應的索引文件。OffsetIndex對象對應管理磁盤上的一個索引文件,與上一節(jié)分析的FileMessageSet共同構成一個LogSegment對象。
首先來介紹索引文件中索引項的格式:每個索引項為8字節(jié),分為兩部分,第一部分是相對offset,占4個字節(jié);第二部分是物理地址,也就是其索引消息在日志文件中對應的position位置,占4個字節(jié)。這樣就實現了offset與物理地址之間的映射。相對offset表示的是消息相對于baseOffset的偏移量。例如,分段后的一個日志文件的baseOffset是20,當然,它的文件名就是20.log,那么offset為23的Message在索引文件中的相對offset就是23-20 = 3。消息的offset是Long類型,4個字節(jié)可能無法直接存儲消息的offset,所以使用相對offset,這樣可以減小索引文件占用的空間。
Kafka使用稀疏索引的方式構造消息的索引,它不保證每個消息在索引文件中都有對應的索引項,這算是磁盤空間、內存空間、查找時間等多方面的折中。不斷減小索引文件大小的目的是為了將索引文件映射到內存,在OffsetIndex中會使用MappedByteBuffer將索引文件映射到內存中。
介紹完了索引文件的相關概念后,我們來介紹OffsetIndex的字段。
- _file:指向磁盤上的索引文件。
- baseOffset:對應日志文件中第一個消息的offset
- ·mmap:用來操作索引文件的MappedByteBuffer。
- ·lock:ReentrantLock對象,在對mmap進行操作時,需要加鎖保護。
- ·_entries:當前索引文件中的索引項個數。
- ·_maxEntries:當前索引文件中最多能夠保存的索引項個數。
- ·_lastOffset:保存最后一個索引項的offset。
在OffsetIndex初始化的過程中會初始化上述字段,因為會有多個Handler線程并發(fā)寫入索引文件,所以這些字段使用@volatile修飾,保證線程之間的可見性。初始化代碼如下:


OffsetIndex提供了向索引文件中添加索引項的append()方法,將索引文件截斷到某個位置的truncateTo()方法和truncateToEntries()方法,進行文件擴容的resize()方法。這些方法實際上都是通過mmap字段的相關操作完成的
OffsetIndex中最常用的還是查找相關的方法,使用的是二分查找,涉及的方法是indexSlotFor()和lookup()。值得注意的地方是,查找的目標是小于targetOffset的最大offset對應的物理地址(position)。下面是lookup()方法的代碼:


OffsetIndexIndex的實現就介紹到這里。下一小節(jié)來分析LogSegment這個類。
LogSegment
為了防止Log文件過大,將Log切分成多個日志文件,每個日志文件對應一個LogSegment。在LogSegment中封裝了一個FileMessageSet和一個OffsetIndex對象,提供日志文件和索引文件的讀寫功能以及其他輔助功能。
下面先來看LogSegment的核心字段。
- log:用于操作對應日志文件的FileMessageSet對象。
- ·index:用于操作對應索引文件的OffsetIndex對象。
- ·baseOffset:LogSegment中第一條消息的offset值。
- ·indexIntervalBytes:索引項之間間隔的最小字節(jié)數。
- ·bytesSinceLastIndexEntry:記錄自從上次添加索引項之后,在日志文件中累計加入的Message集合的字節(jié)數,用于判斷下次索引項添加的時機。
- ·created:標識LogSegment對象創(chuàng)建時間,當調用truncateTo()方法將整個日志文件清空時,會將此字段重置為當前時間
在LogSegment.append()方法中實現了追加消息的功能,可能有多個Handler線程并發(fā)寫入同一個LogSegment,所以調用此方法時必須保證線程安全,在后面分析Log類時會看到相應的同步代碼。另外,注意append()方法的參數,其第二個參數messages表示的是待追加的消息集合,第一個參數offset表示messages中的第一條消息的offset,如果是壓縮消息,則是第一條內層消息的offset,如圖4-20所示。


讀取消息的功能由LogSegment.read()方法實現,它有四個參數。
- ·startOffset:指定讀取的起始消息的offset。
- ·maxOffset:指定讀取結束的offset,可以為空。
- ·maxSize:指定讀取的最大字節(jié)數。
- ·maxPosition:指定讀取的最大物理地址,可選參數,默認值是日志文件的大小。
在讀取日志文件之前,需要將startOffset和maxOffset轉化為對應的物理地址才能使用。這個轉換在translateOffset()方法中實現,我們先通過一個例子來介紹其功能。現假設startOffset是1017,圖4-21展示了將1017這個offset轉換成對應的物理地址的過程。
(1)我們將absoluteOffset轉換成Index File中使用的相對offset,得到17。通過OffsetIndex.lookup()方法查找Index File,得到(7,700)這個索引項,如圖4-21中步驟①所示。
(2)根據(7,700)索引項,我們從MessageSet File中position=700處開始查找absoluteOffset為1017的消息,如圖4-21中步驟②所示。
(3)通過FileMessageSet.searchFor()方法遍歷查找FileMessageSet,得到(1018,800)這個位置信息,如圖4-21中步驟③所示。

讀者可能會問,我們的目標offset是1017,為什么會返回1018呢?在這個示例中,offset=1017的消息與其他的消息被壓縮后一起構成了offset=1018這條外層消息,并存入了日志文件中。translateOffset()方法的代碼如下:

了解了offset與物理地址之間的轉換后,再來看read()方法就比較簡單了。注意讀取的結束位置由maxOffset、maxSize、maxPosition共同決定。LogSegment.read()方法的代碼如下,其中省略了一些邊界檢查的代碼:


LogSegment中還有一個值得注意的方法是recover()方法,其主要功能是根據日志文件重建索引文件,同時驗證日志文件中消息的合法性。在重建索引文件過程中,如果遇到了壓縮消息需要進行解壓,主要原因是因為索引項中保存的相對offset是第一條消息的offset,而外層消息的offset是壓縮消息集合中的最后一條消息的offset。
Log
Log是對多個LogSegment對象的順序組合,形成一個邏輯的日志。為了實現快速定位LogSegment,Log使用跳表(SkipList)對LogSegment進行管理。

跳表是一種隨機化的數據結構,它的查找效率和紅黑樹差不多,但是插入/刪除操作卻比紅黑樹簡單很多。目前在Redis等開源軟件中都能看到它的身影,在JDK中也提供了跳表的實現——ConcurrentSkipListMap,而且ConcurrentSkipListMap還是一個線程安全的實現,有興趣的讀者可以參考其源碼。
在Log中,將每個LogSegment的baseOffset作為key,LogSegment對象作為value,放入segments這個跳表中管理,如圖4-22所示。

例如,我們現在要查找offset大于6570的消息,可以首先通過segments快速定位到消息所在的LogSegment對象,定位過程如圖4-22中的虛線所示。之后使用前面介紹的LogSegment.read()方法,先按照OffsetIndex進行索引,然后從日志文件中進行讀取。
向Log中追加消息時是順序寫入的,那么只有最后一個LogSegment能夠進行寫入操作,在其之前的所有LogSegment都不能寫入數據。最后一個LogSegment使用Log.activeSegment()方法獲取,即segments集合中最后一個元素,為了描述方便,我們將此Segment對象稱為“activeSegment”。隨著數據的不斷寫入,當activeSegment的日志文件大小到達一定閾值時,就需要創(chuàng)建新的activeSegment,之后追加的消息將寫入新的activeSegment。
介紹完了Log的基本原理后,來看一下Log類中的關鍵字段。
- ·dir:Log對應的磁盤目錄,此目錄下存放了每個LogSegment對應的日志文件和索引文件。
- ·lock:可能存在多個Handler線程并發(fā)向同一個Log追加消息,所以對Log的修改操作需要進行同步
- ·segments:用于管理LogSegment集合的跳表。
- ·config:Log相關的配置信息,具體配置項在具體代碼中分析。
- ·recoveryPoint:指定恢復操作的起始offset,recoveryPoint之前的Message已經刷新到磁盤上持久存儲,而其后的消息則不一定,出現宕機時可能會丟失。所以只需要恢復recoveryPoint之后的消息即可。
- ·nextOffsetMetadata:LogOffsetMetadata對象。主要用于產生分配給消息的offset,同時也是當前副本的LEO(LogEndOffset)。LEO的相關介紹請讀者參考第1章。它的messageOffset字段記錄了Log中最后一個offset值,segmentBaseOffset字段記錄了activeSegment的baseOffset,relativePositionInSegment字段記錄了activeSegment的大小。需要注意的是,為了保證在線程間的可見性,使用@volatile 修飾nextOffsetMetadata字段。
append()方法
向Log追加消息的功能是在Log.append()方法中實現的。Kafka服務端在處理生產者發(fā)來的ProducerRequest時,會將請求解析成ByteBufferMessageSet,并最終調用Log.append()方法完成追加消息,圖4-23展示了這一調用關系。

Log.append()方法的大致流程如下:
(1)首先調用Log.analyzeAndValidateMessageSet()方法,對ByteBufferMessageSet中的Message數據進行驗證,并返回LogAppendInfo對象。在LogAppendInfo中封裝了ByteBufferMessageSet中第一個消息的offset、最后一個消息的offset、生產者采用的壓縮方式、追加到Log的時間戳、服務端用的壓縮方式、外層消息的個數、通過驗證的總字節(jié)數等信息。
(2)調用Log.trimInvalidBytes()方法,清除未驗證通過的Message。
(3)調用ByteBufferMessageSet.validateMessagesAndAssignOffsets()方法,進行內部壓縮消息做進一步驗證、消息格式轉換、調整Magic值、修改時間戳等操作,并為Message分配offset。在ByteBufferMessageSet小節(jié)介紹過了,這里就不再贅述了。
(4)如果在validateMessagesAndAssignOffsets()方法中修改了ByteBufferMessageSet的長度,則重新驗證Message的長度是否合法。
5)調用Log.maybeRoll()方法獲取activeSegment,此過程可能分配新的activeSegment。
(6)將ByteBufferMessageSetSet中的消息追加到activeSegment中,通過調用LogSegment.append()方法的實現。
(7)更新當前副本的LEO,也就是Log.nextOffsetMetadata字段。
(8)執(zhí)行flush()操作,將LEO之前的全部Message刷新到磁盤。
Log.append()方法的代碼:


介紹了append()方法的骨架代碼,下面具體分析其中每個方法的具體實現。步驟1中的Log.analyzeAndValidateMessageSet()方法主要功能是驗證消息的長度、CRC32校驗碼、內部offset是否單調遞增,這些驗證都是對外層消息進行的,并不會解壓內部的壓縮消息。在append()方法的代碼中我們也可以看到,如果需要進行offset分配,analyzeAndValidateMessageSet()方法返回的LogAppendInfo對象記錄中的firstOffset、lastOffset甚至時間戳都會被修改。
Log.analyzeAndValidateMessageSet()方法的代碼如下:


Log.maybeRoll()方法會檢測是否滿足創(chuàng)建新activeSegment的條件,如果滿足則創(chuàng)建新activeSegment,然后返回當前的activeSegment。創(chuàng)建新activeSegment的條件有下面幾個:
- 當前activeSegment的日志大小加上本次待追加的消息集合大小,超過配置的LogSegment的最大長度。
- ·當前activeSegment的壽命超過了配置的LogSegment最長存活時間
- 索引文件滿了
maybeRoll()方法的代碼如下,其中創(chuàng)建新activeSegment的邏輯在roll()方法中實現:


在Log.roll()方法最后使用KafkaScheduler線程池執(zhí)行flush()方法,它是在JDK提供的ScheduledThreadPoolExecutor之上進行的封裝和配置。在Kafka服務端有一部分定時任務是交由KafkaScheduler線程池執(zhí)行的。KafkaScheduler線程的實現如下:


回到append()方法繼續(xù)分析,步驟6調用了LogSegment.append(),請參考LogSegment小節(jié)的介紹。
最后來看flush()方法的原理,如圖4-25所示,flush()方法會將recoverPoint~LEO之間的消息數據刷新到磁盤上,并修改recoverPoint值。

flush()方法的代碼如下:

整個Log.append()方法的功能和實現到這里就分析完了。
read()方法
Log.read()方法實現了讀取消息的功能,它實現的邏輯是:通過segments跳表,快速定位到讀取的起始LogSegment并從中讀取消息。注意,在Log.append()方法中通過加鎖進行同步控制,在read()方法中并沒有加鎖操作,它在開始查詢消息之前會將nextOffsetMetadata字段(@volatile修飾)保存成方法的局部變量,從而避免線程安全的問題。讀者可以回顧updateLogEndOffset()方法的代碼會發(fā)現每次更新updateLogEndOffset的時候,都是創(chuàng)建新的LogOffsetMetadata對象,而且LogOffsetMetadata中也沒有提供任何修改屬性的方法,可見LogOffsetMetadata對象是個不可變對象。


這里著重介紹(1)處的代碼,為什么需要針對activeSegment的讀取做特殊的處理呢?在Kafka的Bug列表中的[KAFKA-2477]描述了此Bug,下面介紹造成這個問題的主要原因。在Kafka 0.8版本中未修復這個問題,前面(1)處對應的代碼如下:


簡單描述這種場景:
(1)前面分析了append()方法的邏輯,在寫線程調用append()方法時,會加鎖寫入,不會出現多個寫線程的并發(fā)?,F在按照append()方法的執(zhí)行順序將其分為兩個操作:一是先分配offset并將Message追加到日志文件中,二是更新nextOffsetMetadata。
現在假設寫線程在執(zhí)行完第一步寫入offset為18的消息后,CPU時間片到期,線程掛起,導致未對nextOffsetMetadata.messageOffset進行更新。
(2)其他的Follower 副本發(fā)來Fetch請求,讀取offset為17以及之后的Message。按照Kafka 0.8版本的代碼,Leader 副本會將offset為17、18的Message全部返回給Follower。
(3)Follower處理完offset為17、18兩條Message,會繼續(xù)請求offset為19的Message,此時,請求的startOffset>nextOffsetMetadata.messageOffset,Follower就會得到OffsetOutOfRangeException。Follower認為自己的Log出現了問題,會將此Log全部刪除,并請求從Leader重新同步一份過來。
(4)之后,寫線程重新執(zhí)行,更新nextOfsetMetadata。
在海量數據的情況下,Kafka中的每個Log都很大的(在筆者的實踐場景中,一個Log大約有15GB左右),如果多個Follower出現上述重新同步整個Log的情況,Leader副本所在的服服務器的I/O很快就會被占滿,整個服務器都變得不可用。為了處理這個問題,就有了我們看到的對activeSegment的特殊處理,依然是上述場景,由于nextOffsetMetadata未更新,nextOffsetMetadata.relativePositionInSegment依然指向offset為17的Message的尾部,限制了Leader返回給Follower的消息。當Follower請求offset為18的消息時,返回的是消息集合是空。
LogManager
在一個Broker上的所有Log都是由LogManager進行管理的。LogManager提供了加載Log、創(chuàng)建Log集合、刪除Log集合、查詢Log集合等功能,并且啟動了3個周期性的后臺任務以及Cleaner線程(可能不止一個線程),分別是:log-flusher(日志刷寫)任務、log-retention(日志保留)任務、recovery-point-checkpoint(檢查點刷新)任務以及Cleaner線程(日志清理)。
下面介紹LogManager中各個字段的功能。
- ·logDirs:log目錄集合,在server.properties配置文件中通過log.dirs項指定的多個目錄。每個log目錄下可以創(chuàng)建多個Log,每個Log都有自己對應的目錄,不要混淆。LogManager在創(chuàng)建Log時會選擇Log最少的log目錄創(chuàng)建Log。
- ·ioThreads:為完成Log加載的相關操作,每個log目錄下分配指定的線程執(zhí)行加載。
- ·scheduler:KafkaScheduler對象,用于執(zhí)行周期任務的線程池。與LogSegment小節(jié)介紹的執(zhí)行flush()操作的scheduler是同一個對象。
- ·logs:Pool[TopicAndPartition, Log]類型,用于管理TopicAndPartition與Log之間的對應關系。使用的是Kafka自定義的Pool類型對象,底層使用JDK提供的線程安全的HashMap——ConcurrentHashMap實現
- ·dirLocks:FileLock集合。這些FileLock用來在文件系統(tǒng)層面為每個log目錄加文件鎖。在LogManager對象初始化時,就會將所有l(wèi)og目錄加鎖。
- ·recoveryPointCheckpoints:Map[File, OffsetCheckpoint]類型,用于管理每個log目錄與其下的RecoveryPointCheckpoint文件之間的映射關系。在LogManager對象初始化時,會在每個log目錄下創(chuàng)建一個對應的RecoveryPointCheckpoint文件。此Map的value是OffsetCheckpoint類型的對象,其中封裝了對應log目錄下的RecoveryPointCheckpoint文件,并提供對RecoveryPointCheckpoint文件的讀寫操作。RecoveryPointCheckpoint文件中則記錄了該log目錄下的所有Log的recoveryPoint。
- ·logCreationOrDeletionLock:創(chuàng)建或刪除Log時需要加鎖進行同步。
LogManager中的定時任務
在LogManager.startup()方法中,將三個周期性任務提交到scheduler中定時執(zhí)行,并啟動LogCleaner線程。LogManager.startup()方法的實現如下:

三個周期性任務的功能如表4-1所示。

log-retention任務通過周期性地調用LogManager.cleanupLogs()方法完成對符合條件的LogSegment的刪除。cleanupLogs()方法的代碼如下:

LogManager.cleanupExpiredSegments()方法會根據LogSegment的存活時長判斷是否要刪除LogSegment。

最后的Log.deleteSegment()方法完成了刪除LogSegment的功能,其主要操作是清除segments跳表中的LogSegment對象,然后將日志文件和索引文件的后綴名改成“.deleted”,并創(chuàng)建一個刪除這兩個文件的任務,提交到scheduler線程池中異步執(zhí)行。Log. deleteSegment()方法的代碼如下:

介紹完LogManager.cleanupExpiredSegments()方法之后,再回來看LogManager.cleanupSegmentsToMaintainSize()方法,它會根據retention.bytes配置項的值與當前Log的大小判斷是否刪除LogSegment。

log-flusher任務會周期性地執(zhí)行flush操作,其執(zhí)行flush()方法的條件只有一個:Log未刷新時長是否大于此Log的flush.ms配置項指定的時長。

在每個log目錄下都有唯一的一個RecoveryPointCheckpoint文件 ,其中記錄此log目錄下的每個Log的recoveryPoint值。RecoveryPointCheckpoint文件會在Broker啟動時幫助Broker進行Log的恢復工作,具體恢復操作的流程后面會詳細介紹。
recovery-point-checkpoint任務會周期性地調用LogManager.checkpointRecoveryPointOffsets()方法完成RecoveryPointCheckpoint文件的更新。checkpointRecoveryPointOffsets()方法的代碼如下:

RecoveryPointCheckpoint文件的更新操作是在OffsetCheckpoint中實現的,其更新方式是:先將log目錄下的所有Log的recoveryPoint寫到tmp文件中,然后用tmp文件替換原來的RecoveryPointCheckpoint文件文件。OffsetCheckpoint.write()方法的是如下:

日志壓縮
通過上面介紹的log-retention任務,Kafka服務端可以避免出現大量日志占滿磁盤的情況。log-retention任務中配置的閾值非常靈活,可以對整個Broker設置全局配置值,也可以對某些特定的Topic配置特定值覆蓋全局配置。
Kafka還提供了 “日志壓縮”(Log Compaction)功能,通過此功能也可以有效地減小日志文件的大小,緩解磁盤緊張的情況。在很多實踐場景中,消息的key與value的值之間的對應關系是不斷變化的,就像數據庫中的數據記錄會不斷被修改一樣。如果消費者只關心key對應的最新value值,可以開啟Kafka的日志壓縮功能,服務端會在后臺啟動Cleaner線程池,定期將相同key的消息進行合并,只保留最新的value值。日志壓縮的原理如圖4-27所示,這里以key值為key3的消息為例進行說明,offset為3、6、10的三條消息按時間順序依次被追加到Log中,在進行日志壓縮時,只會保留key值為key3的最新消息(offset=10)。

我們已經知道,Log在寫入消息時其實就是將消息追加到activeSegment的日志文件末尾。為了避免activeSegment成為熱點,activeSegment不會參與日志壓縮操作,而是只壓縮其余的只讀的LogSegment。在日志壓縮過程中啟動多條Cleaner線程,我們可以通過調整Cleaner線程池中的線程數量,優(yōu)化并發(fā)壓縮的性能,減少對整個服務端性能的影響。一般情況下,Log的數據量很大,為了避免Cleaner線程與其他業(yè)務線程長時間競爭CPU,并不會將除activeSegment之外的所有LogSegment在一次壓縮操作中全部處理掉,而是將這些LogSegment分批進行壓縮。
每個Log都可以通過cleaner checkpoint切分成clean和dirty兩部分,clean部分表示的是之前已經被壓縮過的部分,而dirty部分則表示未壓縮的部分,如圖4-28所示?,F在假設Log中所有的消息都是非壓縮消息,所有消息的offset都是連續(xù)的。日志壓縮操作完成后,dirty部分消息的offset依然是連續(xù)遞增的,而clean部分消息的offset是斷斷續(xù)續(xù)的。cleaner checkpoint與前面介紹的Log.recoveryPoint類似,保存在每個log目錄對應一個的cleaner-offset-checkpoint文件中,由OffsetCheckpoint完成相應的讀寫操作。

每個Log需要進行日志壓縮的迫切程度也不同,每個Cleaner線程只選取最迫切需要被壓縮的Log進行處理。這里的“迫切程度”是通過cleanableRatio(dirty部分占整個Log的比例)決定的。
Cleaner線程在選定需要清理的Log后,首先為dirty部分的消息建立key與其last_offset(此key出現的最大offset)的映射關系,該映射通過SkimpyOffsetMap維護,后面會詳細介紹SkimpyOffsetMap。然后重新復制LogSegment,只保留SkimpyOffsetMap中記錄的消息,拋棄掉其他消息。經過日志壓縮后,日志文件和索引文件會不斷減小,Cleaner線程還會對相鄰的LogSegment進行合并,避免出現過小的日志文件和索引文件。
最后值得注意的是,在日志壓縮時,value為空的消息會被認為是刪除此key對應的消息的標志,此標志消息會被保留一段時間,超時后會在下一次日志壓縮操作中刪除。
介紹完日志壓縮的基本概念,來看一下日志壓縮相關的實現類之間的依賴關系,如圖4-29所示。

在LogCleaner中使用cleaners字段管理CleanerThread線程,通過startup()方法和shutdown()方法完成CleanerThread線程的啟動和停止。

LogCleaner中的其他方法都直接委托給了LogCleanerManager對應的方法,代碼不貼出來了。
LogCleanerManager主要負責每個Log的壓縮狀態(tài)管理以及cleaner checkpoint信息維護和更新。LogCleanerManager中各個字段的含義如下所述。
·checkpoints:Map[File, OffsetCheckpoint]類型,用來維護data數據目錄與cleaneroffset-checkpoint文件之間的對應關系,與LogManager.recoveryPointCheckpoints 集合類似。
·inProgress:HashMap[TopicAndPartition, LogCleaningState]類型,用于記錄正在進行清理的TopicAndPartition的壓縮狀態(tài)
...
到這里,Kafka的日志壓縮功能以及其相關實現就介紹完了。下一節(jié)將回到LogManager中繼續(xù)分析其初始化過程。
LogManager初始化
在LogManager的初始化過程中,除了初始化上述三個定時任務日志壓縮的組件,還會完成相關的恢復操作和Log加載。首先調用LogManager.createAndValidateLogDirs()方法,保證每個log目錄都存在并且可讀,代碼比較簡單,就不貼出來了。之后會調用LogManager.loadLogs()方法加載log目錄下的所有Log。這是LogManager初始化的重要過程,其步驟大致如下:
(1)為每個log目錄分配一個有ioThreads條線程的線程池,用來執(zhí)行恢復操作。
2)檢測Broker上次關閉是否正常,并設置Broker的狀態(tài)。在Broker正常關閉時,會創(chuàng)建一個“.kafka_cleanshutdown”的文件,這里就是通過此文件進行判斷的。
(3)載入每個Log的recoveryPoint。
(4)為每個Log創(chuàng)建一個恢復任務,交給線程池處理。
(5)主線程阻塞等待所有的恢復任務完成。
(6)關閉所有在步驟1中創(chuàng)建的線程池。
LogManager.loadLogs()方法的代碼如下:


從LogManager.loadLogs()方法的代碼來看,只是創(chuàng)建了Log對象,并存入LogManager.logs集合進行管理。但是Log的初始化過程并不僅僅是創(chuàng)建一個對象而已,它會調用Log.loadSegments()方法.