kafka文件存儲(chǔ)機(jī)制

一,kafka簡介 ? ??

Kafka最初由Linkedin公司開發(fā)的分布式、分區(qū)的、多副本的、多訂閱者的消息系統(tǒng)。它提供了類似于JMS的特性,但是在設(shè)計(jì)實(shí)現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實(shí)現(xiàn)。kafka對(duì)消息保存是根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer;消息接受者稱為Consumer;此外kafka集群有多個(gè)kafka實(shí)例組成,每個(gè)實(shí)例(server)稱為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息(kafka的0.8版本之后,producer不在依賴zookeeper保存meta信息,而是producer自己保存meta信息)。本文不打算對(duì)Apache Kafka的原理和實(shí)現(xiàn)進(jìn)行介紹,而在編程的角度上介紹如何使用Apache Kafka。我們分別介紹如何編寫Producer、Consumer以及Partitioner等。

二、producer以及consumer是如何知道該去哪個(gè)broker里傳送以及消費(fèi)對(duì)應(yīng)的數(shù)據(jù)

? ? ? ? 是因?yàn)閗afka從0.8版本開始,kafka的producer開始不用從ZK獲取broker的元信息。之前的版本是需要的。0.8版本后,producer可以指定一個(gè)或者多個(gè)broker的信息(ip:port),來獲取kafka集群的元信息。(比如集群有50個(gè)broker,但是producer只需要指定至少1個(gè)就可以獲取到整個(gè)broker的集群的活動(dòng)的列表(但是最好多指定幾個(gè),否則這個(gè)broker連接不上了,就over了),每個(gè)broker,topic有多少partition,每個(gè)partition在哪個(gè)broker上,該信息會(huì)存儲(chǔ)到broker的內(nèi)存之中進(jìn)行維護(hù))。而consumer是通過連接ZK,發(fā)現(xiàn)kafka集群的元信息(broker的集群的活動(dòng)的列表,每個(gè)broker,topic有多少partition,每個(gè)partition在哪個(gè)broker上)。從而找到了對(duì)應(yīng)的數(shù)據(jù)位置。

? ? 介于producer保存著kafka的元數(shù)據(jù)信息,系統(tǒng)會(huì)去刷新元數(shù)據(jù)信息的。(1、定期刷新元數(shù)據(jù)信息(通過producer的配置文件里設(shè)置);2、傳送數(shù)據(jù)失敗后,會(huì)去刷新broker元數(shù)據(jù)信息)

三,kafka存儲(chǔ)文件

? ? ? ?1,在kafka集群中,每個(gè)broker(一個(gè)kafka實(shí)例稱為一個(gè)broker)中有多個(gè)topic,topic數(shù)量可以自己設(shè)定。在每個(gè)topic中又有多個(gè)partition,每個(gè)partition為一個(gè)分區(qū)。kafka的分區(qū)有自己的命名的規(guī)則,它的命名規(guī)則為topic的名稱+有序序號(hào),這個(gè)序號(hào)從0開始依次增加。

? ? ? 2,在每個(gè)partition中有可以分為多個(gè)segment file。當(dāng)生產(chǎn)者往partition中存儲(chǔ)數(shù)據(jù)時(shí),內(nèi)存中存不下了,就會(huì)往segment file里面存儲(chǔ)。kafka默認(rèn)每個(gè)segment file的大小是500M,在存儲(chǔ)數(shù)據(jù)時(shí),會(huì)先生成一個(gè)segment file,當(dāng)這個(gè)segment file到500M之后,再生成第二個(gè)segment file 以此類推。每個(gè)segment file對(duì)應(yīng)兩個(gè)文件,分別是以.log結(jié)尾的數(shù)據(jù)文件和以.index結(jié)尾的索引文件。在服務(wù)器上,每個(gè)partition是一個(gè)文件夾,每個(gè)segment是一個(gè)文件。

? ? ? 每個(gè)segment file也有自己的命名規(guī)則,每個(gè)名字有20個(gè)字符,不夠用0填充。每個(gè)名字從0開始命名,下一個(gè)segment file文件的名字就是,上一個(gè)segment file中最后一條消息的索引值。在.index文件中,存儲(chǔ)的是key-value格式的,key代表在.log中按順序開始第條消息,value代表該消息的位置偏移。但是在.index中不是對(duì)每條消息都做記錄,它是每隔一些消息記錄一次,避免占用太多內(nèi)存。即使消息不在index記錄中,在已有的記錄中查找,范圍也大大縮小了。


四,consumer如何消費(fèi)數(shù)據(jù)

? ? ? ?消費(fèi)者需要先指定消費(fèi)哪個(gè)topic。在kafka中各個(gè)partition中已經(jīng)存儲(chǔ)了數(shù)據(jù)。由于之前存儲(chǔ)是按照順序存儲(chǔ)的,各個(gè)segment file文件命名也時(shí)有一定規(guī)則的,這種存儲(chǔ)規(guī)則使得查找文件會(huì)很快速。

? ? ? 1,假設(shè)需要查找offest=12345的消息,通過二分查找法可以很快速的定位到該文件所在的.index和.log文件。這一步就查找到該消息所在的segment file文件。

? ? ? 2,在該segment file文件中查找該消息。通過.index索引文件,快速的查找到該消息在.log中的位置,查找文件就完成了。

五、producer如何發(fā)送數(shù)據(jù)

? ? producer需要指定數(shù)據(jù)到哪個(gè)topic,并且指定的分區(qū),系統(tǒng)會(huì)自動(dòng)均勻分配到各個(gè)broker里,而不會(huì)都存在一個(gè)或某幾個(gè)broker里。那消息該發(fā)送到哪個(gè)broker上呢,一般有兩種情況(可以自定義class去設(shè)置分區(qū),在producer的實(shí)例,配置文件時(shí),partitioner.class這個(gè)配置項(xiàng)指定對(duì)應(yīng)的自己創(chuàng)建的分區(qū)類即可)。如下

Properties props = new Properties();

? ? props.put("metadata.broker.list", BROKER_LIST);

? ? props.put("serializer.class", StringEncoder.class.getName());

? ? props.put("partitioner.class", HashPartitioner.class.getName());

1、根據(jù)消息的key模與分區(qū)數(shù),去指定該消息該到哪個(gè)分區(qū)。

2、根據(jù)消息發(fā)送的順序,去指定該消息該到哪個(gè)分區(qū)。例如,第一條數(shù)據(jù)去topic-0,第二條數(shù)據(jù)去topic-1,以此類推。

producer還分為兩種

? ? 1同步producer

? 消息發(fā)送過去之后,只有成功之后才會(huì)發(fā)送下一條消息。不成功的話,會(huì)retry,3次不成功會(huì)catch異常,出現(xiàn)異常時(shí),可以忽略,也可以手動(dòng)操作,或者把這條數(shù)據(jù)存儲(chǔ)到另一個(gè)存儲(chǔ)里,下次再處理。

? ?2異步producer

不是馬上就發(fā),會(huì)先存儲(chǔ)在query,達(dá)到要求后,再把query發(fā)送個(gè)broker。如果query滿了,剛要發(fā)送時(shí),此時(shí)出現(xiàn)阻塞來不及發(fā)送,此時(shí)如果有新的數(shù)據(jù)進(jìn)來,會(huì)選擇把新的數(shù)據(jù)丟掉(這個(gè)是可以設(shè)置的,也可以設(shè)置為等待)。

六、過期日志的處理

? ?kafka作為一個(gè)消息中間件,是需要定期處理數(shù)據(jù)的,否則磁盤就爆了。

1、處理的機(jī)制

? ? ? 1)根據(jù)數(shù)據(jù)的時(shí)間長短進(jìn)行清理,例如數(shù)據(jù)在磁盤中超過多久會(huì)被清理(默認(rèn)是168個(gè)小時(shí))?

? ? ? ?2)根據(jù)文件大小的方式給進(jìn)行清理,例如數(shù)據(jù)大小超過多大時(shí),刪除數(shù)據(jù)(大小是按照每個(gè)partition的大小來界定的)。

2、刪除過期的日志的方式

直接刪除segment文件。后臺(tái)會(huì)周期性的掃描,當(dāng)滿足設(shè)定的條件的數(shù)據(jù)就執(zhí)行刪除。如果設(shè)置是按照大小的方式,刪除segment是按照segment存在順序進(jìn)行刪除,即先刪除存在最久的那個(gè)segment。

七、幾個(gè)重要的知識(shí)點(diǎn)

1、kafka里的數(shù)據(jù)只能追加和消費(fèi),不能進(jìn)行修改,即不可變數(shù)據(jù)集。?

2、consumer還可以同過設(shè)置topic的黑白名單,去設(shè)置自己想消費(fèi)以及不想消費(fèi)的topic。? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

3、producer是通過partition判定消息該到哪個(gè)broker的。通過HashPartiton以及RoundRobinPartition等去判定數(shù)據(jù)key屬于哪個(gè)partition,從而到對(duì)應(yīng)的broker中(每條消息僅屬于一個(gè)topic,每個(gè)topic的partition在不同的broker中是不同的)。

4、broker集群是會(huì)讓每個(gè)topic的partition均勻分配在每個(gè)broker之中。? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

5、producer到broker是通過push的方式。consumer是通過pull的方式(1、使用pull可以讓系統(tǒng)設(shè)計(jì)更為簡單,producer不用去感知地下consumer的狀態(tài),代碼設(shè)計(jì)上會(huì)簡單許多。2、通過pull,consumer可以進(jìn)行消息峰值的控制,避免數(shù)據(jù)量太大時(shí)候壓垮cunsumer。寧愿消息暫時(shí)性延遲也不愿意consumer宕機(jī)。3、但是PULL模型可能造成消費(fèi)者在沒有消息的情況下盲等,這種情況下可以通過long polling機(jī)制緩解(當(dāng)沒有數(shù)據(jù)時(shí),就等待,當(dāng)然會(huì)有超時(shí)時(shí)間),而對(duì)于幾乎每時(shí)每刻都有消息傳遞的流式系統(tǒng),這種影響可以忽略。)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容