深入kafka

(一)kafka集群

1、集群成員關(guān)系

(1)broker如何將自己注冊到zookeeper上:broker啟動的時候通過在zookeeper上創(chuàng)建臨時節(jié)點的方式將自己的brokerid(唯一)注冊到zookeeper上

(2)zookeeper用于注冊broker的路徑:/brokers/ids

(3)broker取消注冊:kafka組件訂閱以上路徑,當某個broker和zookeeper斷開連接導(dǎo)致臨時節(jié)點被刪除時,其他broker可以收到通知。

2、控制器

(1)什么是控制器:一個特殊的broker

(2)控制器的作用:負責分區(qū)首領(lǐng)(如果一個分區(qū)在一個broker上,那么這個broker稱為這個分區(qū)的首領(lǐng))的選舉。

(3)控制器是怎么產(chǎn)生的:集群里第一個啟動的broker在對應(yīng)zookeeper上創(chuàng)建一個臨時節(jié)點/controller讓自己成為控制器,其他broker再去創(chuàng)建的時候,收到節(jié)點已存在的反饋,保證集群中只有一個broker是控制器。

(4)控制器宕機怎么辦:控制器被關(guān)閉后,對應(yīng)的臨時節(jié)點刪除。其他broker收到通知后嘗試創(chuàng)建臨時節(jié)點,成功創(chuàng)建的節(jié)點成為控制器

(5)控制器是如何進行分區(qū)選舉的呢:kafka會在zk上針對每個topic維護一個ISR集合(in-sync-replica同步副本集)。

a、某個分區(qū)的leader不可用,控制器從isr集合中選擇一個新的副本作為新的leader。再通知leader及其他相關(guān)broker,告知其讀取數(shù)據(jù)或者從何處進行進行數(shù)據(jù)同步

(6)某一個topic分區(qū)的分配:控制器從所有broker中隨機選擇一個作為起點,然后順位分配。新增partition也是控制器利用這種算法進行重新分配

(二)kafka如何進行復(fù)制

1、副本分類:

(1)首領(lǐng)副本:處理生產(chǎn)者和消費者請求的副本,為保證一致性,所有請求都會經(jīng)過這個副本

????a、當前首領(lǐng):當前的首領(lǐng)

????b、首選首領(lǐng):創(chuàng)建主題時選定的首領(lǐng)

(2)跟隨者副本:首領(lǐng)副本以外的副本成為跟隨者副本。跟隨者副本不關(guān)心請求來源,只是為保證自己和首領(lǐng)副本的一致性不斷從首領(lǐng)副本同步消息

2、工作模式:

(1)跟隨者副本從首領(lǐng)副本同步消息,同步方式和消費者消費消息完全一致。跟隨者副本告知消費的偏移量和消費數(shù)據(jù)量,首領(lǐng)副本將消息返回給跟隨者副本。

(2)首領(lǐng)副本通過查看每個跟隨者請求的最新偏移量,首領(lǐng)就會知道跟隨者的進度。通過replica.lag.time.max.ms配置最長不從首領(lǐng)同步數(shù)據(jù)的時間。超過這個時間或者每次請求的偏移量不變則認為該跟隨者副本是不同步副本,反之則是同步副本。

(3)如果首領(lǐng)副本失效,只有同步副本才有可能成為下一個首領(lǐng)副本

(三)kafka如何處理生產(chǎn)者和消費者的請求

1、kafka定義的基于TCP的二進制傳輸協(xié)議:

(1)標準消息頭:

2、處理請求

(1)請求來源:客戶端、分區(qū)副本、控制器

(2)kafka處理請求流程

????a、Processor線程:網(wǎng)絡(luò)線程,接受來自客戶端的請求放入隊列并從響應(yīng)隊列獲取響應(yīng)返回給請求發(fā)送者

????b、IO線程:負責處理真正的消息寫入和讀取

(3)請求必須發(fā)送給分區(qū)首領(lǐng),那么如何確定分區(qū)的首領(lǐng)在哪里呢----元數(shù)據(jù)請求

????a、客戶端向集群發(fā)送元數(shù)據(jù)請求,元數(shù)據(jù)中包含了對應(yīng)主題所包含的分區(qū),每個分區(qū)都有哪些副本以及哪個副本是首領(lǐng)。(元數(shù)據(jù)請求可以發(fā)送給任意一個broker,因為broker都緩存了這些信息)

????b、一般客戶端會緩存這些信息,然后隔一段時間從broker拉取。metadata.max.age.ms參數(shù)指定metadata的刷新間隔

(4)客戶端發(fā)送請求的過程

注:如果新加入了broker或者其他原因?qū)е路謪^(qū)副本重新分配,客戶端收到“非首領(lǐng)”的錯誤,將會重新刷新metadata信息

(5)處理生產(chǎn)者具體請求

????a、消息驗證:發(fā)送數(shù)據(jù)的用戶是否有主題寫權(quán)限,acks值是否有效,如果是acks=all,是否有足夠的同步副本保證消息被完全寫入

????b、將消息寫入文件系統(tǒng)緩存,并不是馬上被寫入磁盤。并不保證何時被寫入磁盤。首領(lǐng)寫入消息后,檢查acks的值,如果是0或者1直接返回,如果是all,直到所有副本都復(fù)制了消息,響應(yīng)才會返回

(6)處理消費者具體請求

????a、消息驗證:驗證想要獲取的偏移量在指定的分區(qū)是否存在

????b、從文件系統(tǒng)緩存讀取消息到客戶端。(不需要管理本地緩存,不需要進行消息的復(fù)制)

????c、只有被所有復(fù)制者同步的消息可以被消費者讀取,如下圖所示:

(四)kafka存儲

前言:kafka基本存儲單元是分區(qū),一個分區(qū)無法再在各個broker之間分配,也無法分配到同一個broker的不同磁盤目錄上

1、分區(qū)分配

(1)目標:

????a、broker之間分區(qū)分配平均;

????b、確保每個分區(qū)的副本在不同的broker上

(2)分配方法:

????a、隨機選擇一個broker(假設(shè)是4),然后使用輪詢的方式給每個broker分區(qū)確定首領(lǐng)分區(qū)的位置。即:首領(lǐng)分區(qū)0在broker4,首領(lǐng)分區(qū)1在broker5,首領(lǐng)分區(qū)2在broker0(只有6個broker),以此類推。

????b、從首領(lǐng)分區(qū)0開始,依次分配跟隨者副本。如果分區(qū)0的首領(lǐng)在broker4,那么它的每一個跟隨者副本在broker5,以此類推。

????c、如果配置了機架信息(如broker0/1/2在一臺機架上),那么分配順序就會變成0、3、1、4、2、5。

????d、為分區(qū)選擇合適的磁盤目錄,計算每個目錄上分區(qū)的數(shù)量,選擇分區(qū)最少的目錄,將分區(qū)添加到該目錄下面。(不會考慮具體目錄的負載)

2、文件管理

(1)kafka為每個主題的數(shù)據(jù)保留一定時間(可配置),不會一直保留,也不會等到全部被消費

(2)考慮到一個大文件讀寫數(shù)據(jù)很費時,將分區(qū)分成若干個片段(默認一個片段包含1G或者一周的數(shù)據(jù),以較小的那個為準)。寫入文件時,如果片段達到上限,則關(guān)閉當前片段,打開一個新的文件。

注:當前正在寫入的片段成為活躍片段,活躍片段永遠不會被刪除。對于不活躍的片段,broker會為其打開一個文件句柄。

3、文件格式:

????鍵,值,偏移量,消息大小,消息格式版本號,壓縮算法和時間戳。批量發(fā)送的消息格式如圖(批量發(fā)送的消息中每個消息都有自己的偏移量):

4、索引

(1)目的:為了能夠快速讀取任意偏移量的數(shù)據(jù),kafka為每個分區(qū)維護了一個索引。

(2)工作模式:將偏移量映射到片段文件和偏移量在文件中的位置。

(3)保障:如果索引出現(xiàn)損壞,kafka會通過重新讀取這些消息的方式重新錄制偏移量和位置來重新生成索引。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容