一、kafka簡介
kafka是分布式消息隊列,具有高性能、持久化、多副本備份、橫向擴展能力,其最大的特性是高吞吐量、低延遲,可以幾乎實時的處理大量數(shù)據(jù)。這和其文件存儲機制的設(shè)計有關(guān),簡單來說寫入時為順序?qū)懭耄矣蠱MFile的機制,而在檢索時,也做了分段和稀疏索引。同時,在整體架構(gòu)上,對于同一個topic中的消息會均勻的分布在不同的partition上,也使得分布式消費成為可能。
二、鳥瞰kafka

總體來講,kafka的數(shù)據(jù)流向如上圖所示,簡單描述流程大概是:
2.1 producer往topic上寫消息,此時kafka會均勻的把message分布在不同的partition上(發(fā)送時不需要指定partition,kafka會自動的loan balance),同時寫入時只寫入到了leader上
2.2 leader發(fā)送給其他的follower
2.3 各個consumer線程可以組成一個consumer group,每個consumer group 均可以從頭開始消費一個topic下的各個partition,即同一個topic的message,允許多個consumer group進行消費
2.4 在某一個consumer group 進行消費時,partition中每一條message只能被其中的一個consumer線程消費
2.5 消費時可是從leader進行的消費
2.6 Consumer處理partition里面的message的時候是順序讀取的。所以必須維護著上一次讀到哪里的offsite信息
2.7 broker、topics、partitions的一些元信息用zk來存,監(jiān)控和路由啥的也都會用到zk
好了,到這里一定會產(chǎn)生一些疑問,比如:
1.partition是干嘛的?
2.為什么不同的consumer group 可以同時消費同一個topic?如何確保消費順序?
3.leader 和 follower的設(shè)計是有什么用?
4.消息的可靠性?
5.為什么說kafka的性能很好,讀寫很快?
一個個來看。
三、Topic & partition
Topic相當(dāng)于傳統(tǒng)消息系統(tǒng)MQ中的一個隊列queue,producer端發(fā)送的message必須指定是發(fā)送到哪個topic,但是不需要指定topic下的哪個partition,因為kafka會把收到的message進行l(wèi)oad balance,均勻的分布在這個topic下的不同的partition上( hash(message) % [broker數(shù)量] )。物理上存儲上,這個topic會分成一個或多個partition,每個partiton相當(dāng)于是一個子queue。在物理結(jié)構(gòu)上,每個partition對應(yīng)一個物理的目錄(文件夾),文件夾命名是[topicname][partition][序號],一個topic可以有無數(shù)多的partition,根據(jù)業(yè)務(wù)需求和數(shù)據(jù)量來設(shè)置。在kafka配置文件中可隨時更高num.partitions參數(shù)來配置更改topic的partition數(shù)量,在創(chuàng)建Topic時通過參數(shù)指定parittion數(shù)量。Topic創(chuàng)建之后通過Kafka提供的工具也可以修改partiton數(shù)量。
一般來說,(1)一個Topic的Partition數(shù)量大于等于Broker的數(shù)量,可以提高吞吐率。(2)同一個Partition的Replica盡量分散到不同的機器,高可用。
當(dāng)add a new partition的時候,partition里面的message不會重新進行分配,原來的partition里面的message數(shù)據(jù)不會變,新加的這個partition剛開始是空的,隨后進入這個topic的message就會重新參與所有partition的load balance。
四、消息寫入及順序消費
kafka中的Message是以topic為基本單位組織的,不同的topic之間是相互獨立的。每個topic又可以分成幾個不同的partition(每個topic有幾個partition是在創(chuàng)建topic時指定的),每個partition存儲一部分Message。
另外,因為硬盤是機械結(jié)構(gòu),每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是最耗時的。所以硬盤最討厭隨機I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,kafka就是使用順序I/O。
也就是說,當(dāng)有消息寫入時,實際的寫入會類似下圖所示

可以看到,message在partition中是順序存儲的,這種方式有一個很明顯的缺陷——沒有辦法刪除數(shù)據(jù),所以kafka是不會刪除數(shù)據(jù)的,它會把所有的數(shù)據(jù)都保留下來。所以消費時需要consumer通過offset來標(biāo)記消費到了那一條數(shù)據(jù),通過的consumer group中的consumer消費的offset可能不一樣,如下圖所示

五、leader選舉機制
從前文的鳥瞰圖我們可以知道,kafka的每個partition可以在其他的kafka broker節(jié)點上存副本,即對于同一個partition可以有一個leader多個follower,這是為了確保當(dāng)某個broker宕機的時候不會影響這個kafka集群,如果是leader所在的broker宕機了,也會有新的follower站出來成為leader,也就是leader的重新選舉機制。那kafka是如何進行l(wèi)eader的重新選舉的呢?
1.kafka會在所有的broker中選出一個controller,所有partition的leader選舉都由controller決定,controller在zk注冊Watch,一旦有broker宕機,其在zk對應(yīng)的znode會自動被刪除,zk會fire controller注冊的watch
2.此時controller會去重新讀取一遍最新的幸存的broker,并決定partition集合(set_p),這個集合包含了宕機的broker上所有的partition
3.對于集合中的每個partition,controller會去讀取該partition的ISR(副本同步隊列),如果當(dāng)前ISR中有至少一個replica(副本)還幸存,則選擇其中一個作為新leader,新的ISR則包含當(dāng)前ISR中所有幸存的replica(選舉算法的實現(xiàn)類似于微軟的PacificA)。如果該partition的所有replica都宕機了,則將新的leader設(shè)置為-1。
4.將新的leader,ISR和新的leader_epoch及controller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。
5.直接通過RPC向set_p相關(guān)的broker發(fā)送LeaderAndISRRequest命令。
如下圖所示

從前文的鳥瞰圖我們可以知道,producer發(fā)送消息時是先寫到leader上,再由leader同步到follower,如果寫到leader后還沒同步時正好broker宕機了,此時重新選舉后follower充當(dāng)新的leader,則剛剛那條消息的數(shù)據(jù)就丟失了,也就是說kafka在一些少量場景下存在丟失數(shù)據(jù)的可能(消息隊列消費時也同樣存在丟失數(shù)據(jù)或者重復(fù)消費的場景),接下來就來看下kafka在消息可靠性上的設(shè)置
六、消息的可靠性
6.1生產(chǎn)者生產(chǎn)消息的可靠性

6.2消費者消費時可能存在的異常場景
6.2.1 consumer的delivery gurarantee,默認(rèn)是讀完message先commmit再處理message,autocommit默認(rèn)是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經(jīng)+1,這個時候就會丟message
6.2.2 如前文所述,同一個topic支持被不同的consumer group消費,如果同樣的業(yè)務(wù)存在兩個consumer group(一般不會這樣),在消息消費時就需要考慮重復(fù)消費的場景
6.3由于MMFile導(dǎo)致的消息丟失
kafka為了像操作內(nèi)存一樣寫入數(shù)據(jù),在寫入數(shù)據(jù)時,除了前文提到的順序?qū)懭胪?,也會有?nèi)存映射文件(Memory Mapped Files ,MMFile)這樣的機制,它的工作原理是直接利用操作系統(tǒng)的Page來實現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上,如果在完成同步之前,服務(wù)器宕機了,則也會丟失一部分?jǐn)?shù)據(jù)。
七、kafka如何做到快速寫入和讀取的
7.1 快速寫入
關(guān)于寫入的部分前文已經(jīng)提到,主要是依靠順序?qū)懭氡苊饬俗詈臅r的機械動作——尋址,以及上面剛剛提到的MMFile機制可以使kafka如同操作內(nèi)存一樣的寫入。
7.2 快速讀取
7.2.1 parition數(shù)據(jù)文件
為了弄明白kafka如何做到快速讀取某一條消息的,首先要看一下partition文件的結(jié)構(gòu)。

partition是以文件的形式存儲在文件系統(tǒng)中。
Partition中的每條Message由offset來表示它在這個partition中的偏移量,這個offset不是該Message在partition數(shù)據(jù)文件中的實際存儲位置,而是邏輯上一個值,它唯一確定了partition中的一條Message。因此,可以認(rèn)為offset是partition中Message的id。partition中的每條Message包含了以下三個屬性:
offset
MessageSize
data
其中offset為long型,MessageSize為int32,表示data有多大,data為message的具體內(nèi)容
7.2.2 分段
由于consumer消費消息時一定是根據(jù)偏移量(offset)順序消費的,弄清楚了partition文件的結(jié)構(gòu)后,所以很容易想到kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段。
比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
7.2.3 索引
數(shù)據(jù)文件分段使得可以在一個較小的數(shù)據(jù)文件中查找對應(yīng)offset的message了,但是這依然需要順序掃描才能找到對應(yīng)offset的message。為了進一步提高查找的效率,kafka為每個分段后的數(shù)據(jù)文件建立了索引文件。
索引包含兩個部分(均為4個字節(jié)的數(shù)字),分別為相對offset和position。
相對offset:因為數(shù)據(jù)文件分段以后,每個數(shù)據(jù)文件的起始o(jì)ffset不為0,相對offset表示這條message相對于其所屬分段的數(shù)據(jù)文件中最小的offset的大小。舉例,分段后的一個數(shù)據(jù)文件的offset是從20開始,那么offset為25的message在index文件中的相對offset就是25-20 = 5。
position:表示該條message在數(shù)據(jù)文件中的絕對位置。只要打開文件并移動文件指針到這個position就可以讀取對應(yīng)的message了。
值得一提的是,index文件中并沒有為數(shù)據(jù)文件中的每條message建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。但缺點是沒有建立索引的message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。
八、參考及引用的資料
8.1 Kafka史上最詳細原理總結(jié) ----看完絕對不后悔 https://blog.csdn.net/lingbo229/article/details/80761778
8.2 震驚了!原來這才是kafka! http://www.itdecent.cn/p/d3e963ff8b70
8.3 kafka leader選舉機制原理 http://www.itdecent.cn/p/1f02328a4f2e