一、組件
介紹一下kafka進(jìn)行數(shù)據(jù)復(fù)制時(shí)會(huì)涉及到的一些組件概念
zookeeper:維護(hù)集群信息,當(dāng)broker加入或退出時(shí),kafka通過訂閱zookeeper就能獲得通知
broker:一個(gè)獨(dú)立的kafka服務(wù)器稱為一個(gè)broker。broker接收來自生產(chǎn)者的消息,為消息設(shè)置位移,并將消息刷入到磁盤里。broker并且提供消費(fèi)者服務(wù),對(duì)讀取的分區(qū)數(shù)據(jù)提供響應(yīng)。
-
控制器/Controller:除了有一般broker的功能外,還會(huì)負(fù)責(zé)分區(qū)首領(lǐng)的選舉,使用epoch來控制“腦裂”。
集群里第一個(gè)啟動(dòng)的broker通過在Zookeeper里創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)/controller使自己成為控制器,其他的broker節(jié)點(diǎn)在啟動(dòng)時(shí)也會(huì)嘗試創(chuàng)建這個(gè)節(jié)點(diǎn),但會(huì)提示失敗,因?yàn)橐呀?jīng)存在了,其它broker節(jié)點(diǎn)會(huì)在Zookeeper創(chuàng)建/watcher節(jié)點(diǎn)去感知控制器的狀態(tài),當(dāng)控制器被關(guān)閉或者離開集群了,他們會(huì)再次嘗試創(chuàng)建/controller節(jié)點(diǎn)重復(fù)同樣的操作。
新選舉出來的控制器,會(huì)得到一個(gè)遞增的controller epoch,其它broker在得知當(dāng)前的controller epoch后,會(huì)忽略舊控制器發(fā)出的消息,避免了腦裂的現(xiàn)象。
控制器可以進(jìn)行broker分區(qū)選舉。當(dāng)分區(qū)首領(lǐng)所在的broker離開集群時(shí),控制器遍歷這些分區(qū),并確定哪個(gè)副本會(huì)成為新的分區(qū)首領(lǐng),然后向所有broker發(fā)送請(qǐng)求,該請(qǐng)求包含誰是新leader誰是follower,隨后新首領(lǐng)開始處理來自生產(chǎn)者和消費(fèi)者的請(qǐng)求,而follower開始從leader處復(fù)制消息
-
分區(qū):kafka使用主題來組織數(shù)據(jù),每個(gè)主題被劃分為若干個(gè)分區(qū),每個(gè)分區(qū)可以有若干個(gè)副本,分區(qū)分配遵循同一分區(qū)副本均勻分布在不同broker上。
例如有4個(gè)broker,創(chuàng)建一個(gè)包含10個(gè)分區(qū)的主題,復(fù)制因子設(shè)置為2,那么總共有20個(gè)副本,可以按照如下方式分配給4個(gè)broker:
1、若未指定機(jī)架信息,隨機(jī)指定一個(gè)broker0,首領(lǐng)分區(qū)0分配給broker0,首領(lǐng)分區(qū)1分配給broker1,以此類推......隨后從分區(qū)首領(lǐng)后開始,依次分配跟隨者副本,例如分區(qū)0的首領(lǐng)在broker0,那么它的第一個(gè)跟隨者副本會(huì)分配給broker1......
2、若指定了機(jī)架信息,例如broker0和broker1在機(jī)架1,broker2和broker3分別在不同的機(jī)架,那么分區(qū)副本需要按照broker0,broker2,broker1,broker3進(jìn)行交替分配
-
副本:分為首領(lǐng)(leader)副本和跟隨者(follower)副本。
-
leader副本處理所有的寫入和訪問請(qǐng)求,另外會(huì)通過與follower保持狀態(tài)的交互,維護(hù)一個(gè)isr列表;
broker在處理請(qǐng)求時(shí),如果收到一個(gè)包含特定分區(qū)的生產(chǎn)和讀取請(qǐng)求,但是該分區(qū)的leader副本并不在該broker上,會(huì)導(dǎo)致報(bào)錯(cuò)。
客戶端會(huì)采用元數(shù)據(jù)請(qǐng)求方式,服務(wù)器會(huì)給出對(duì)應(yīng)的響應(yīng),響應(yīng)的消息會(huì)指明特定的主題,主題的分區(qū)、分區(qū)的副本以及副本leader信息,然后客戶端會(huì)緩存起來便于下次直接訪問。并會(huì)時(shí)不時(shí)更新元數(shù)據(jù)信息
follower的任務(wù)是復(fù)制leader的消息,保持與leader的一致性
-
ISR機(jī)制:每個(gè)分區(qū)都有一個(gè)ISR列表,用于維護(hù)所有的同步副本。leader副本必須是同步的,follower副本要滿足兩個(gè)條件才算是同步副本:
- 定時(shí)向zk發(fā)送心跳消息,保持與zk的活躍會(huì)話
- 持續(xù)向leader副本請(qǐng)求消息,在允許的消息量/時(shí)間延遲范圍內(nèi)保持與leader副本的消息同步(副本LEO落后于leader LEO的時(shí)長不大于replica.lag.time.max.ms參數(shù)值)
-
LEO:日志末端位移,記錄每個(gè)副本中下一條消息的偏移量
-
HW:水位值,記錄當(dāng)前topic已提交的偏移量。也即消費(fèi)者能消費(fèi)到的最大偏移量
Leader Epoch
二、消息的可靠性傳遞
-
broker有3個(gè)配置可影響消息存儲(chǔ)可靠性
復(fù)制系數(shù):主題級(jí)別的配置參數(shù)是replication.factor,broker級(jí)別可以通過default.replication.factor來配置自動(dòng)創(chuàng)建的主題;更高的復(fù)制系數(shù)可以帶來更高的可用性、可靠性,但是也需要消耗更多的存儲(chǔ)空間
-
不完全的首領(lǐng)選舉:unclean.leader.election只能在broker級(jí)別配置,默認(rèn)值是enable。
當(dāng)分區(qū)首領(lǐng)不可用時(shí),一個(gè)同步副本會(huì)被選為新首領(lǐng),如果在選舉過程中沒有出現(xiàn)數(shù)據(jù)丟失,那么這個(gè)選舉就是完全的。如果允許不同步的副本成為分區(qū)首領(lǐng),那么需要承擔(dān)丟失數(shù)據(jù)和數(shù)據(jù)不一致的風(fēng)險(xiǎn),如果不允許,那么就要接受較低的可靠性
最小同步副本:主題和broker級(jí)別上都可以配置參數(shù)min.insync.replicas參數(shù),如果當(dāng)前同步副本的個(gè)數(shù)小于這個(gè)參數(shù)時(shí),那么生產(chǎn)者將不能往主題分區(qū)寫入數(shù)據(jù),分區(qū)也變成了只讀狀態(tài)。
-
生產(chǎn)者配置
- 發(fā)送確認(rèn)配置:acks可配置3中不同的確認(rèn)模式
- acks=0:生產(chǎn)者能夠把消息發(fā)送出去,則認(rèn)為消息已成功寫入kafka,這種配置可以得到最大的吞吐量帶寬利用率,但是卻最不穩(wěn)定最有可能丟失數(shù)據(jù)
- acks=1:分區(qū)首領(lǐng)在收到數(shù)據(jù)后寫入分區(qū)數(shù)據(jù)文件時(shí)會(huì)返回確認(rèn)或者失敗的消息,如果生產(chǎn)者能正確處理錯(cuò)誤消息,會(huì)重試嘗試發(fā)送消息,最終消息會(huì)成功寫入到分區(qū)首領(lǐng)。這種配置方式也有造成丟失數(shù)據(jù)的風(fēng)險(xiǎn),當(dāng)消息寫入分區(qū)leader但是在follower復(fù)制時(shí)leader崩潰了
- acks=all:生產(chǎn)者在消息寫入分區(qū)首領(lǐng)和所有的副本后才確認(rèn)消息被寫入,這個(gè)參數(shù)會(huì)配合最小同步副本來使用,在確認(rèn)最小寫入副本數(shù)成功后就能返回繼續(xù)處理下一條消息的繼續(xù)寫入。這種配置可靠性最高,但是吞吐率最低
- 配置重試次數(shù):對(duì)于可重試解決錯(cuò)誤的事件,生產(chǎn)者可以嘗試重新發(fā)送消息;對(duì)于不可重試解決錯(cuò)誤的事件,多次重試已失去意義,可以直接丟棄或保存到磁盤再后續(xù)處理。重試次數(shù)的配置主要看重試的目的是什么。
- 額外的錯(cuò)誤處理:對(duì)于重試機(jī)制不能解決的錯(cuò)誤,例如消息序列化失敗,生產(chǎn)者重試次數(shù)達(dá)到上限,需要開發(fā)人員自行捕獲異常并處理。
- 發(fā)送確認(rèn)配置:acks可配置3中不同的確認(rèn)模式
-
消費(fèi)者可靠性配置
-
自動(dòng)提交偏移量
- enable.auto.commit(消費(fèi)者再均衡后會(huì)有消息重復(fù)消費(fèi)的情況)
- auto.commit.interval.ms(自動(dòng)提交開啟,默認(rèn)提交間隔是5s)
-
手動(dòng)提交偏移量
enable.auto.commit參數(shù)設(shè)置為false,手動(dòng)提交偏移量分兩類
- 手動(dòng)提交當(dāng)前輪訓(xùn)的最大偏移量
- 手動(dòng)提交固定偏移量
api分同步提交和異步提交兩類
-
同步提交:提交失敗消息后阻塞,消費(fèi)者進(jìn)行自動(dòng)重試,保證消息能夠最大限度地提交成功,但會(huì)降低吞吐量
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*同步提交*/ consumer.commitSync(); } -
異步提交:提交失敗后不能自動(dòng)重試,但是可以通過一個(gè)Map<TopicPartition, Integer> offsets對(duì)象來維護(hù)每個(gè)分區(qū)提交的偏移量,如果失敗的偏移量小于最后一次已提交的偏移量,則不需要重試
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); } /*異步提交并定義回調(diào)*/ consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n", x.topic(), x.partition(), y.offset())); } } }); }
-
