kafka是一個分布式、支持分區(qū)(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)。

topic
Topic是一個類別的名稱,同類消息發(fā)送到同一個Topic下面。對于每一個Topic,下面可以有多個分區(qū) (Partition)日志文件:

offset
每個consumer是基于自己在partition的消費進度(offset)來進行工作的.在kafka中,消費offset由consumer自 己來維護;一般情況下我們按照順序逐條消費commit log中的消息,當然我可以通過指定offset來重復消費某些消息, 或者跳過某些消息。 這意味kafka中的consumer對集群的影響是非常小的,添加一個或者減少一個consumer,對于集群或者其他consumer 來說,都是沒有影響的,因為每個consumer維護各自的offset。所以說kafka集群是無狀態(tài)的,性能不會因為 consumer數(shù)量受太多影響。kafka還將很多關鍵信息記錄在zookeeper里,保證自己的無狀態(tài),從而在水平擴容時非常方便。
producers
生產(chǎn)者將消息發(fā)送到topic中去,同時負責選擇將message發(fā)送到topic的哪一個partition中。通過round-robin做簡單的 負載均衡。也可以根據(jù)消息中的某一個關鍵字來進行區(qū)分。通常第二種方式使用的更多。
Consumers

一個partition同一個時刻在一個consumer group中只有一個consumer instance在消費,從而保證順序。 consumer group中的consumer instance的數(shù)量不能比一個Topic中的partition的數(shù)量多,否則,多出來的 consumer消費不到消息。
server.properties


Kafka核心總控制器Controller
在Kafka集群中會有一個或者多個broker,其中有一個broker會被選舉為控制器(Kafka Controller),它負責管理整個 集群中所有分區(qū)和副本的狀態(tài)。
當某個分區(qū)的leader副本出現(xiàn)故障時,由控制器負責為該分區(qū)選舉新的leader副本。
當檢測到某個分區(qū)的ISR集合發(fā)生變化時,由控制器負責通知所有broker更新其元數(shù)據(jù)信息。
當使用kafka-topics.sh腳本為某個topic增加分區(qū)數(shù)量時,同樣還是由控制器負責分區(qū)的重新分配。
Controller選舉機制
在kafka集群啟動的時候,會自動選舉一臺broker作為controller來管理整個集群,選舉的過程是集群中每個broker都會 嘗試在zookeeper上創(chuàng)建一個 /controller 臨時節(jié)點,zookeeper會保證有且僅有一個broker能創(chuàng)建成功,這個broker 就會成為集群的總控器controller。
當這個controller角色的broker宕機了,此時zookeeper臨時節(jié)點會消失,集群里其他broker會一直監(jiān)聽這個臨時節(jié) 點,發(fā)現(xiàn)臨時節(jié)點消失了,就競爭再次創(chuàng)建臨時節(jié)點,就是我們上面說的選舉機制,zookeeper又會保證有一個broker 成為新的controller。
具備控制器身份的broker需要比其他普通的broker多一份職責,具體細節(jié)如下:
- 監(jiān)聽broker相關的變化。為Zookeeper中的/brokers/ids/節(jié)點添加BrokerChangeListener,用來處理broker 增減的變化。
- 監(jiān)聽topic相關的變化。為Zookeeper中的/brokers/topics節(jié)點添加TopicChangeListener,用來處理topic增減 的變化;為Zookeeper中的/admin/delete_topics節(jié)點添加TopicDeletionListener,用來處理刪除topic的動作。
- 從Zookeeper中讀取獲取當前所有與topic、partition以及broker有關的信息并進行相應的管理。對于所有topic 所對應的Zookeeper中的/brokers/topics/[topic]節(jié)點添加PartitionModificationsListener,用來監(jiān)聽topic中的 分區(qū)分配變化。
- 更新集群的元數(shù)據(jù)信息,同步到其他普通的broker節(jié)點中。
Partition副本選舉Leader機制
controller感知到分區(qū)leader所在的broker掛了(controller監(jiān)聽了很多zk節(jié)點可以感知到broker存活),controller會從每 個parititon的 replicas 副本列表中取出第一個broker作為leader,當然這個broker需要也同時在ISR列表里。
消費者消費消息的offset記錄機制
每個consumer會定期將自己消費分區(qū)的offset提交給kafka內(nèi)部topic:__consumer_offsets,提交過去的時候,key是 consumerGroupId+topic+分區(qū)號,value就是當前offset的值,kafka會定期清理topic里的消息,最后就保留最新的那條數(shù)據(jù)。
因為__consumer_offsets可能會接收高并發(fā)的請求,kafka默認給其分配50個分區(qū)(可以通過 offsets.topic.num.partitions設置),這樣可以通過加機器的方式抗大并發(fā)。

消費者Rebalance機制
消費者rebalance就是說如果consumer group中某個消費者掛了,此時會自動把分配給他的分區(qū)交給其他的消費者,如果他又重啟了,那么又會把一些分區(qū)重新交還給他。
如下情況可能會觸發(fā)消費者rebalance
1、consumer所在服務重啟或宕機了
2、動態(tài)給topic增加了分區(qū)
3、消費組訂閱了更多的topic
rebalance過程如下
當有消費者加入消費組時,消費者、消費組及組協(xié)調(diào)者之間會經(jīng)歷以下幾個階段。
第一階段,選擇組協(xié)調(diào)器
每個consumer group都會選擇一個broker作為自己的組協(xié)調(diào)器coordinator,負責監(jiān)控這個消費組里的所有消費組的心跳,以及判斷是否宕機,然后開啟消費者rebalance。consumer group中的每個consumer啟動時會向kafka集群中的某個節(jié)點發(fā)送FindCoordinatorRequest請求來查找對應組協(xié)調(diào)器GroupCoordinator,并跟其建立網(wǎng)絡連接。
第二階段,加入消費組 join group
在成功找到消費組所對應的GroupCoordinator之后就進入加入消費組的階段,在此階段的消費者會向GroupCoordinator發(fā)送joinGroupRequest請求,并處理響應。然后GroupCoordinator 從一個consumer group中 選擇第一個加入group的consumer作為leader(消費組協(xié)調(diào)器),把consumer group情況發(fā)送給這個leader,接著這個leader會負責制定分區(qū)方案。
第三階段( SYNC GROUP)
consumer leader通過給GroupCoordinator發(fā)送SyncGroupRequest,接著GroupCoordinator就把分區(qū)方案下發(fā)給各 個consumer,他們會根據(jù)指定分區(qū)的leader broker進行網(wǎng)絡連接以及消息消費。
producer發(fā)布消息機制剖析
寫入方式
producer采用push模式將消息發(fā)布到broker,每條消息都被append到partition中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比)。
消息路由
producer發(fā)消息到broker時,會根據(jù)分區(qū)算法選擇將其存儲到哪一個partition。其路由機制為:
1.指定partition,則直接使用;
2.未指定partition但指定key,通過對key的value進行hash選出一個partition;
3.partition和key都未指定,使用輪詢選出一個partition;
寫入流程
1.producer先從zookeeper的"/brokers/.../state"節(jié)點找到該partition的leader
2.producer將消息發(fā)送給該leader
3.leader將消息寫入本地log
4.follows從leader pull消息,寫入本地log后向leader發(fā)送ACK
5.leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer發(fā)送ACK。

HW與LEO
HW俗稱高水位,HighWatermark的縮寫,取一個partition對應的ISR中最小的LEO(log-end-offset)作為HW,consumer最多只能消費到HW所在的位置。另外每個replica都有HW,leader和follow各自負責更新自己的HW的狀態(tài)。對于leader新寫入的消息,consumer不能立刻消費,leader會等待該消息被所有ISR中的replicas同步后更新HW,此時消息才能被consumer消費。這樣就保證了如果leader所在的broker失效,該消息仍然可以從新選舉的leader中獲取。對于來自內(nèi)部的broker的讀取請求,沒有HW的限制。
假設某分區(qū)的 ISR 集合中有 3 個副本,即一個 leader 副本和 2 個 follower 副本,此時分區(qū)的 LEO 和 HW 都分別為 3 。消息3和消息4從生產(chǎn)者出發(fā)之后先被存入leader副本。

在消息被寫入leader副本之后,follower副本會發(fā)送拉取請求來拉取消息3和消息4進行消息同步。

在同步過程中不同的副本同步的效率不盡相同,在某一時刻follower1完全跟上了leader副本而follower2只同步了消息3,如此leader副本的LEO為5,follower1的LEO為5,follower2的LEO 為4,那么當前分區(qū)的HW取最小值4,此時消費者可以消費到offset0至3之間的消息。


由此可見kafka的復制機制既不是完全的同步復制,也不是單純的異步復制。事實上,同步復制要求所有能工作的follower副本都復制完,這條消息才會被確認已成功提交,這種復制方式極大的影響了性能。而在異步復制的方式下,follower副本異步的從leader副本中復制數(shù)據(jù),數(shù)據(jù)只要被leader副本寫入就會被認為已經(jīng)成功提交。在這種情況下,如果follower副本都還沒有復制完而落后于leader副本,然后leader副本宕機,則會造成數(shù)據(jù)丟失。kafka使用這種ISR的方式有效的權(quán)衡了數(shù)據(jù)可靠性和性能之間的關系。
日志分段存儲
Kafka 一個分區(qū)的消息數(shù)據(jù)對應存儲在一個文件夾下,以topic名稱+分區(qū)號命名,kafka規(guī)定了一個分區(qū)內(nèi)的 .log 文件 最大為 1G,做這個限制目的是為了方便把 .log 加載到內(nèi)存去操作:

這個 9936472 之類的數(shù)字,就是代表了這個日志段文件里包含的起始 Offset,也就說明這個分區(qū)里至少都寫入了接近 1000 萬條數(shù)據(jù)了。
Kafka Broker 有一個參數(shù),log.segment.bytes,限定了每個日志段文件的大小,最大就是 1GB。 一個日志段文件滿了,就自動開一個新的日志段文件來寫入,避免單個文件過大,影響文件的讀寫性能,這個過程叫做 log rolling,正在被寫入的那個日志段文件,叫做 active log segment。
也正是由于這種文件按partition存儲的方式,導致kafka在topic增多時、partition分區(qū)過多時,每個broker上的log文件增多,consumer在讀取消息時,文件讀取方式由順序I/O開始接近于隨機I/O,導致性能降低。
kafka為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴展名為.index。index文件中并沒有為數(shù)據(jù)文件中的每條message建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將索引文件保存在內(nèi)存中。
