全局架構(gòu)圖

全局架構(gòu)圖
磁盤結(jié)構(gòu)
記錄格式
type VarInt int // 變長整型,使用Varints和ZigZag編碼的整型
type RecordBatch struct {
FirstOffset int64 // 起始偏移
Length int32 // 從PartitionLeaderEpoch開始的長度
PartitionLeaderEpoch int32 // 分區(qū)Leader紀(jì)元
Magic int8 // 消息版本號,當(dāng)前為2,表示V2
Crc32 int32 // crc校驗(yàn)和
Attributes int16 // [0-2]壓縮格式, 4時間戳類型, 5是否出于事務(wù)中, 6控制消息
LastOffsetDelta int32 // 最后一個Record的offset與FirstOffset的差值,用于保證消息組裝的正確性
FirstTimestamp int64 // 第一個Record的時間戳
MaxTimestamp int64 // 最后一個Record的時間戳,用于保證消息組裝的正確性
ProducerID int64 // 用于支持冪等性和事務(wù)
ProducerEpoch int32 // 用于支持冪等性和事務(wù)
FirstSequence int32 // 用于支持冪等性和事務(wù)
RecordsCount int32 // RecordsCount數(shù)組元素個數(shù)
Records []Record
}
type Record struct {
Length VarInt // Record長度
Attributions int8 // 屬性,暫時沒用
TimestampDelta VarInt // 相對于RecordBatch的FirstTimestamp的偏移量
OffsetDelta VarInt // 相對于RecordBatch的FirstOffset的偏移量
KeyLength VarInt // key長度
Key []byte // key內(nèi)容
ValueLength VarInt // value長度
Value []byte // value內(nèi)容
HeadersCount VarInt // Headers數(shù)組元素個數(shù)
Headers []Header // Headers數(shù)組,用于支持應(yīng)用級別擴(kuò)展
}
type Header struct {
HeaderKeyLength VarInt
HeaderKey string
HeaderValueLength VarInt
HeaderValue string
}
日志文件存儲

disk
使用時間戳查找消息
- 通過時間戳日志分段索引文件名查找對應(yīng)的日志分段文件
- 在該日志分段中通過二分法查找到最近的偏移量
- 通過該偏移量在偏移量日志分段索引文件中查找對應(yīng)的消息位置
- 從該位置開始,向后查找,直到找到不小于指定時間戳的消息
日志清理
日志刪除
- 基于時間:rog.retention.hours/minutes/ms
- 基于日志大小: log.retention.bytes
- 基于起始偏移量: DelectRecordsRequest.logStartOffset
日志壓縮/合并
對于相同的key的不同value值,只保留最后一個版本。當(dāng)應(yīng)用僅關(guān)心消息的最新value時,可以開啟日志合并功能。
- 線程會選擇污濁率最高的日志文件來進(jìn)行清理,污濁率=dirtyBytes/(cleanBytes+dirtyBytes)。
- log.cleaner.min.compaction.lag.ms,消息在被清理前的最小保留時間。
- 實(shí)現(xiàn)方式:第一次遍歷日志來構(gòu)建key和最后的offset的映射關(guān)系,第二次遍歷判斷是否需要保留,如果不需要,就刪除。SkimpyOffsetMap使用md5來進(jìn)計算key的哈希值,在映射時僅考慮md5,如果不同的key哈希到了同一個md5,會導(dǎo)致某個key對應(yīng)的消息丟失,丟失率取決于md5的沖突率,沖突時用線性探測法來處理。
- 合并時,是對整個日志進(jìn)行合并,所以清理之后,可能會將多個日志分段合并為一個段。
消費(fèi)位移
- 保存在_comsumer_offset主題中
- 可以通過offset或者時間戳進(jìn)行定位
- 利用seek功能,我們可以將消費(fèi)位移保存在外部存儲中
消費(fèi)者重均衡
消費(fèi)組分區(qū)分配策略
RangeAssignor
- 原理:對于每一個訂閱的主題,按照消費(fèi)者總數(shù)和主題分區(qū)數(shù)進(jìn)行整除運(yùn)算來獲得一個跨度,然后將分區(qū)按照跨度進(jìn)行平均分配
- 例子:C0[T0P0,T0P1;T1P1,T1P2],C1[T0P2;T1P2]
- 缺點(diǎn):在一個消費(fèi)者訂閱多個主題的情況下且主題分區(qū)無法整除消費(fèi)者數(shù)時,會導(dǎo)致不均衡
- 評價:適合消費(fèi)者和主題分區(qū)數(shù)能夠確定且不變時,不實(shí)用,對擴(kuò)容不友好,建議不要用
RoundRobinAssignor
- 原理:將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分區(qū)按照字典序排序,然后通過輪詢方式逐個將分區(qū)依次分配給每個消費(fèi)者
- 例子:C0[T0P0;T0P2;T1P1],C2[T0P1;T1P0;T1P2]
- 缺點(diǎn):在消費(fèi)者訂閱的主題不一樣時,會導(dǎo)致不均衡
- 評價:一般情況下同一個消費(fèi)組會訂閱相同的主題信息,可以使用
StickyAssignor
- 原理:很復(fù)雜
- 目的:要分區(qū)的分配要盡可能均勻;分區(qū)的分配盡可能與上次分配的保持相同。
- 評價:比上面兩種都好,建議使用
自定義Assignor
- 原理:實(shí)現(xiàn)PartitionAssignor接口
- 評價:不建議
發(fā)生時機(jī)
- 組成員數(shù)發(fā)生變更:加入組或者離開組或者被剔出組。
- 訂閱主題數(shù)發(fā)生變更:正則訂閱或者手動更改訂閱主題數(shù)。
- 訂閱主題的分區(qū)數(shù)發(fā)生變更:分區(qū)重分配。
流程

kafka重均衡
分區(qū)重分配
基本原理:先通過控制器為每個分區(qū)添加新副本(增加副本因子),待復(fù)制完成后,將舊的副本從副本清單中刪除(恢復(fù)為原先的副本因子)
事務(wù)
冪等性
實(shí)現(xiàn)原理:
Kafka 的冪等只能保證單個生產(chǎn)者會話(session)中單分區(qū)的冪等。對于每一個生產(chǎn)者,kafka會為其分配一個pid,每一對<pid,partiton>都對應(yīng)一個序列號,在生產(chǎn)者發(fā)送消息的時候,序列號遞增。當(dāng)kafka收到新消息時,如果序列號sn<so+1,則說明發(fā)生了重復(fù)寫入,則丟棄;如果序列號sn>so+1,說明出現(xiàn)了消息亂序,拋出異常OutOfOrderSequenceException。
事務(wù)
概念:kafka的事務(wù)可以保證應(yīng)用程序?qū)⒍鄠€的消費(fèi)消息、生產(chǎn)消息、提交消費(fèi)位移當(dāng)作原子操作來處理,同時成功或失敗,即使該生產(chǎn)或消費(fèi)會跨多個分區(qū)。
應(yīng)用場景:Consume-Transform-Produce,以支持流失計算

事務(wù)流程
- 使用transactionID獲取計算獲取TransactionCoordinator的broker地址。
- 使用transactionID請求得到PID信息,TC在收到該請求后會將transaction和pid保存到__transaction_state中,以進(jìn)行持久化。
- 生產(chǎn)者使用beginTransaction()開啟一個事務(wù)。
- 消費(fèi)-轉(zhuǎn)換-生產(chǎn)
- 應(yīng)用程序通過消費(fèi)者消費(fèi)到消息,轉(zhuǎn)換完成后,在生產(chǎn)者向新的分區(qū)寫入消息之前,先通過AddPartitionsToTxnRequest將新的分區(qū)記錄到__transaction_state中,包括<transactionID,pid,topic-partitions>。
- 生產(chǎn)者向?qū)?yīng)的分區(qū)所在的broker發(fā)送消息,消息中會包含<pid,seq_num>,注意由于寫入的消息的事務(wù)控制字段都是1,所以在read_commited級別下對應(yīng)用程序是不可見的。
- 通過AddOffsetsToTxnRequest將所有要提交的分區(qū)的offset的信息和group_id寫入__transaction_state中,TC可以通過對應(yīng)的group_id來計算出GC,GC也會保存在__transaction_state中,從而在生產(chǎn)者宕機(jī)后,支持后續(xù)TC的崩潰恢復(fù)。
- 生產(chǎn)者通過TxnOffsetCommitRequest將所有分區(qū)的偏移量條,寫入到__consumer_offsets中,注意由于寫入的消息的事務(wù)控制字段都是1,所以在read_commited級別下對應(yīng)用程序是不可見的。
- 生產(chǎn)者通過EndTxnRequest向TC提交或者中止事務(wù),TC會將PREPARE_COMMIT或PREPARE_ABORT信息寫入到__transaction_state中,然后在通過WriteTxnMarkersRequest請求向分區(qū)(GC和生產(chǎn)者寫入的分區(qū))寫入COMMIT或ABORT消息,再之后將COMPLETE_COMMIT或COMPLETE_ABORT寫入到__transaction_state中。
復(fù)制

復(fù)制
如上一主三從
- 其中2個follower在ISR集合中,1個失效follower在OSR集合中,其中min.insync.replicas=2,當(dāng)ISR集合中的broker數(shù)少于2個時,該分區(qū)將禁止寫入。
- ISR集合中的所有follower中最小的LEO為HW,每次follower向leader進(jìn)行fetch時,會帶上自身的leo,leader會計算出hw進(jìn)行更新,并返回給follower。
- 每當(dāng)OSR中的一個follower追上最小的LEO即HW時,該集合將會進(jìn)入ISR集合中;每次follower請求拉取到leader副本leo前最新的消息時,則認(rèn)為是一次caughUp,leader副本將會更新對應(yīng)follower的lastCaughUpTime時間,在每replicaMaxLagTime/2一次的isr-expiration后臺周期任務(wù)中,如果檢查到某個follower滿足now-lastCaughUpTime>replicaMaxLagTime,則將該follower將到OSR集合中。
- 每次ISR集合的變更都會被集合到isrChangeSet中,2.5s一周期的isr-changge-propagation任務(wù)會將ISR變更信息寫入ZK中的/isr_change_notification/isr_change_*中,controller會通過Watcher監(jiān)聽到該消息,進(jìn)而更新自身的元數(shù)據(jù)并向其它broker發(fā)送更新元數(shù)據(jù)的請求,然后刪除isr_change節(jié)點(diǎn)。
- 當(dāng)producer發(fā)送消息時,攜帶的acks參數(shù)會告訴leader需要幾個節(jié)點(diǎn)的確認(rèn)才能響應(yīng)成功,leader副本寫入數(shù)據(jù)到本地日志后會hold,等待其它follower將這條消息復(fù)制走,當(dāng)acks-1個follower復(fù)制后,才會解除hold,響應(yīng)成功。