Kafka設(shè)計

全局架構(gòu)圖

全局架構(gòu)圖

磁盤結(jié)構(gòu)

記錄格式

type VarInt int // 變長整型,使用Varints和ZigZag編碼的整型

type RecordBatch struct {
    FirstOffset          int64 // 起始偏移
    Length               int32 // 從PartitionLeaderEpoch開始的長度
    PartitionLeaderEpoch int32 // 分區(qū)Leader紀(jì)元
    Magic                int8  // 消息版本號,當(dāng)前為2,表示V2
    Crc32                int32 // crc校驗(yàn)和
    Attributes           int16 // [0-2]壓縮格式, 4時間戳類型, 5是否出于事務(wù)中, 6控制消息
    LastOffsetDelta      int32 // 最后一個Record的offset與FirstOffset的差值,用于保證消息組裝的正確性
    FirstTimestamp       int64 // 第一個Record的時間戳
    MaxTimestamp         int64 // 最后一個Record的時間戳,用于保證消息組裝的正確性
    ProducerID           int64 // 用于支持冪等性和事務(wù)
    ProducerEpoch        int32 // 用于支持冪等性和事務(wù)
    FirstSequence        int32 // 用于支持冪等性和事務(wù)
    RecordsCount         int32 // RecordsCount數(shù)組元素個數(shù)
    Records              []Record
}

type Record struct {
    Length         VarInt   // Record長度
    Attributions   int8     // 屬性,暫時沒用
    TimestampDelta VarInt   // 相對于RecordBatch的FirstTimestamp的偏移量
    OffsetDelta    VarInt   // 相對于RecordBatch的FirstOffset的偏移量
    KeyLength      VarInt   // key長度
    Key            []byte   // key內(nèi)容
    ValueLength    VarInt   // value長度
    Value          []byte   // value內(nèi)容
    HeadersCount   VarInt   // Headers數(shù)組元素個數(shù)
    Headers        []Header // Headers數(shù)組,用于支持應(yīng)用級別擴(kuò)展
}

type Header struct {
    HeaderKeyLength   VarInt
    HeaderKey         string
    HeaderValueLength VarInt
    HeaderValue       string
}

日志文件存儲

disk

使用時間戳查找消息

  1. 通過時間戳日志分段索引文件名查找對應(yīng)的日志分段文件
  2. 在該日志分段中通過二分法查找到最近的偏移量
  3. 通過該偏移量在偏移量日志分段索引文件中查找對應(yīng)的消息位置
  4. 從該位置開始,向后查找,直到找到不小于指定時間戳的消息

日志清理

日志刪除

  • 基于時間:rog.retention.hours/minutes/ms
  • 基于日志大小: log.retention.bytes
  • 基于起始偏移量: DelectRecordsRequest.logStartOffset

日志壓縮/合并

對于相同的key的不同value值,只保留最后一個版本。當(dāng)應(yīng)用僅關(guān)心消息的最新value時,可以開啟日志合并功能。

  • 線程會選擇污濁率最高的日志文件來進(jìn)行清理,污濁率=dirtyBytes/(cleanBytes+dirtyBytes)。
  • log.cleaner.min.compaction.lag.ms,消息在被清理前的最小保留時間。
  • 實(shí)現(xiàn)方式:第一次遍歷日志來構(gòu)建key和最后的offset的映射關(guān)系,第二次遍歷判斷是否需要保留,如果不需要,就刪除。SkimpyOffsetMap使用md5來進(jìn)計算key的哈希值,在映射時僅考慮md5,如果不同的key哈希到了同一個md5,會導(dǎo)致某個key對應(yīng)的消息丟失,丟失率取決于md5的沖突率,沖突時用線性探測法來處理。
  • 合并時,是對整個日志進(jìn)行合并,所以清理之后,可能會將多個日志分段合并為一個段。

消費(fèi)位移

  1. 保存在_comsumer_offset主題中
  2. 可以通過offset或者時間戳進(jìn)行定位
  3. 利用seek功能,我們可以將消費(fèi)位移保存在外部存儲中

消費(fèi)者重均衡

消費(fèi)組分區(qū)分配策略

RangeAssignor

  • 原理:對于每一個訂閱的主題,按照消費(fèi)者總數(shù)和主題分區(qū)數(shù)進(jìn)行整除運(yùn)算來獲得一個跨度,然后將分區(qū)按照跨度進(jìn)行平均分配
  • 例子:C0[T0P0,T0P1;T1P1,T1P2],C1[T0P2;T1P2]
  • 缺點(diǎn):在一個消費(fèi)者訂閱多個主題的情況下且主題分區(qū)無法整除消費(fèi)者數(shù)時,會導(dǎo)致不均衡
  • 評價:適合消費(fèi)者和主題分區(qū)數(shù)能夠確定且不變時,不實(shí)用,對擴(kuò)容不友好,建議不要用

RoundRobinAssignor

  • 原理:將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分區(qū)按照字典序排序,然后通過輪詢方式逐個將分區(qū)依次分配給每個消費(fèi)者
  • 例子:C0[T0P0;T0P2;T1P1],C2[T0P1;T1P0;T1P2]
  • 缺點(diǎn):在消費(fèi)者訂閱的主題不一樣時,會導(dǎo)致不均衡
  • 評價:一般情況下同一個消費(fèi)組會訂閱相同的主題信息,可以使用

StickyAssignor

  • 原理:很復(fù)雜
  • 目的:要分區(qū)的分配要盡可能均勻;分區(qū)的分配盡可能與上次分配的保持相同。
  • 評價:比上面兩種都好,建議使用

自定義Assignor

  • 原理:實(shí)現(xiàn)PartitionAssignor接口
  • 評價:不建議

發(fā)生時機(jī)

  • 組成員數(shù)發(fā)生變更:加入組或者離開組或者被剔出組。
  • 訂閱主題數(shù)發(fā)生變更:正則訂閱或者手動更改訂閱主題數(shù)。
  • 訂閱主題的分區(qū)數(shù)發(fā)生變更:分區(qū)重分配。

流程

kafka重均衡

分區(qū)重分配

基本原理:先通過控制器為每個分區(qū)添加新副本(增加副本因子),待復(fù)制完成后,將舊的副本從副本清單中刪除(恢復(fù)為原先的副本因子)

事務(wù)

冪等性

實(shí)現(xiàn)原理:
Kafka 的冪等只能保證單個生產(chǎn)者會話(session)中單分區(qū)的冪等。對于每一個生產(chǎn)者,kafka會為其分配一個pid,每一對<pid,partiton>都對應(yīng)一個序列號,在生產(chǎn)者發(fā)送消息的時候,序列號遞增。當(dāng)kafka收到新消息時,如果序列號sn<so+1,則說明發(fā)生了重復(fù)寫入,則丟棄;如果序列號sn>so+1,說明出現(xiàn)了消息亂序,拋出異常OutOfOrderSequenceException。

事務(wù)

概念:kafka的事務(wù)可以保證應(yīng)用程序?qū)⒍鄠€的消費(fèi)消息、生產(chǎn)消息、提交消費(fèi)位移當(dāng)作原子操作來處理,同時成功或失敗,即使該生產(chǎn)或消費(fèi)會跨多個分區(qū)。

應(yīng)用場景:Consume-Transform-Produce,以支持流失計算

事務(wù)流程
  1. 使用transactionID獲取計算獲取TransactionCoordinator的broker地址。
  2. 使用transactionID請求得到PID信息,TC在收到該請求后會將transaction和pid保存到__transaction_state中,以進(jìn)行持久化。
  3. 生產(chǎn)者使用beginTransaction()開啟一個事務(wù)。
  4. 消費(fèi)-轉(zhuǎn)換-生產(chǎn)
    1. 應(yīng)用程序通過消費(fèi)者消費(fèi)到消息,轉(zhuǎn)換完成后,在生產(chǎn)者向新的分區(qū)寫入消息之前,先通過AddPartitionsToTxnRequest將新的分區(qū)記錄到__transaction_state中,包括<transactionID,pid,topic-partitions>。
    2. 生產(chǎn)者向?qū)?yīng)的分區(qū)所在的broker發(fā)送消息,消息中會包含<pid,seq_num>,注意由于寫入的消息的事務(wù)控制字段都是1,所以在read_commited級別下對應(yīng)用程序是不可見的。
    3. 通過AddOffsetsToTxnRequest將所有要提交的分區(qū)的offset的信息和group_id寫入__transaction_state中,TC可以通過對應(yīng)的group_id來計算出GC,GC也會保存在__transaction_state中,從而在生產(chǎn)者宕機(jī)后,支持后續(xù)TC的崩潰恢復(fù)。
    4. 生產(chǎn)者通過TxnOffsetCommitRequest將所有分區(qū)的偏移量條,寫入到__consumer_offsets中,注意由于寫入的消息的事務(wù)控制字段都是1,所以在read_commited級別下對應(yīng)用程序是不可見的。
    5. 生產(chǎn)者通過EndTxnRequest向TC提交或者中止事務(wù),TC會將PREPARE_COMMIT或PREPARE_ABORT信息寫入到__transaction_state中,然后在通過WriteTxnMarkersRequest請求向分區(qū)(GC和生產(chǎn)者寫入的分區(qū))寫入COMMIT或ABORT消息,再之后將COMPLETE_COMMIT或COMPLETE_ABORT寫入到__transaction_state中。

復(fù)制

復(fù)制

如上一主三從

  • 其中2個follower在ISR集合中,1個失效follower在OSR集合中,其中min.insync.replicas=2,當(dāng)ISR集合中的broker數(shù)少于2個時,該分區(qū)將禁止寫入。
  • ISR集合中的所有follower中最小的LEO為HW,每次follower向leader進(jìn)行fetch時,會帶上自身的leo,leader會計算出hw進(jìn)行更新,并返回給follower。
  • 每當(dāng)OSR中的一個follower追上最小的LEO即HW時,該集合將會進(jìn)入ISR集合中;每次follower請求拉取到leader副本leo前最新的消息時,則認(rèn)為是一次caughUp,leader副本將會更新對應(yīng)follower的lastCaughUpTime時間,在每replicaMaxLagTime/2一次的isr-expiration后臺周期任務(wù)中,如果檢查到某個follower滿足now-lastCaughUpTime>replicaMaxLagTime,則將該follower將到OSR集合中。
  • 每次ISR集合的變更都會被集合到isrChangeSet中,2.5s一周期的isr-changge-propagation任務(wù)會將ISR變更信息寫入ZK中的/isr_change_notification/isr_change_*中,controller會通過Watcher監(jiān)聽到該消息,進(jìn)而更新自身的元數(shù)據(jù)并向其它broker發(fā)送更新元數(shù)據(jù)的請求,然后刪除isr_change節(jié)點(diǎn)。
  • 當(dāng)producer發(fā)送消息時,攜帶的acks參數(shù)會告訴leader需要幾個節(jié)點(diǎn)的確認(rèn)才能響應(yīng)成功,leader副本寫入數(shù)據(jù)到本地日志后會hold,等待其它follower將這條消息復(fù)制走,當(dāng)acks-1個follower復(fù)制后,才會解除hold,響應(yīng)成功。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容