Kafka Note(五)生產(chǎn) & 消費(fèi)問(wèn)題

(五)kafka生產(chǎn) & 消費(fèi)問(wèn)題

A. 重平衡rebalance

1. 基礎(chǔ)定義

  • 觸發(fā)機(jī)制
    • 消費(fèi)組consumer group內(nèi)成員數(shù)發(fā)生變化
    • 分區(qū)partition數(shù)目發(fā)生變化
    • coordinator節(jié)點(diǎn)故障(消費(fèi)者會(huì)在所有broker中選一個(gè)作為consumer group的coordinator,用于保存consumer提交的offset)
  • 重平衡rebalance實(shí)現(xiàn)消費(fèi)組內(nèi)成員的故障轉(zhuǎn)移
    1. 消費(fèi)組內(nèi)有a,b,c三個(gè)消費(fèi)者,分別消費(fèi)partition 0,1,2;
    2. 若消費(fèi)者a故障則partition 0就不會(huì)被消費(fèi),這不可被接受,此時(shí)需要使用rebalance來(lái)解決該問(wèn)題;
    3. rebalance會(huì)將partition 0 分給剩下正常的消費(fèi)者(b c中的一個(gè));
    4. 重新分配后就完成了消費(fèi)者a的故障轉(zhuǎn)移。
  • 重平衡rebalance實(shí)現(xiàn)動(dòng)態(tài)分區(qū)分配
    1. 某個(gè)topic的partition數(shù)目新增(理論上partition不會(huì)減少),原本3臺(tái)brokers,該topic共計(jì)6個(gè)partitons,現(xiàn)在新增到12個(gè)partitions;
    2. 消費(fèi)組內(nèi)的成員和partition的對(duì)應(yīng)關(guān)系需相應(yīng)進(jìn)行變化,不然新增的6個(gè)partitions無(wú)人消費(fèi);
    3. 通過(guò)rebalance,將新增的6個(gè)partition分配給消費(fèi)者,也可通過(guò)新增組內(nèi)消費(fèi)者(新拉起線程)。

2. 重平衡具體流程

  • 重平衡rebalance的宏觀過(guò)程
    1. empty狀態(tài):該狀態(tài)即消費(fèi)組新建,內(nèi)部尚無(wú)消費(fèi)者,出現(xiàn)的第一個(gè)消費(fèi)者后會(huì)發(fā)送FIND_COORDINATOR以開(kāi)啟首次rebalance;
    2. preparing rebalance狀態(tài):消費(fèi)者客戶端確認(rèn)自己的coordinator后,向其發(fā)出JOIN_GROUP的請(qǐng)求,coordinator收到后將整體狀態(tài)從empty變?yōu)閜reparing rebalance;
    3. completing rebalance狀態(tài):當(dāng)組內(nèi)所有的消費(fèi)者都發(fā)送完JOIN_GROUP的請(qǐng)求后(或者rebalance超時(shí)),狀態(tài)轉(zhuǎn)化成completing rebalance;
    4. stable狀態(tài):kafka broker返回所有JOIN_GROUP的響應(yīng)后,leader客戶端收到后發(fā)送SYNC_GROUP。broker收到該請(qǐng)求后,狀態(tài)就設(shè)置成stable,完成reblance。
  • 重平衡rebalance的微觀過(guò)程
    1. FIND_COORDINATOR時(shí),會(huì)帶上自身的group.id,計(jì)算其coordinator在哪里(coordinatorId=groupId.hash % 50),獲得的是具體的分區(qū)號(hào);
    2. broker響應(yīng)消費(fèi)者客戶端的JOIN_GROUP請(qǐng)求時(shí),會(huì)選一個(gè)作為消費(fèi)者客戶端的leader并告知它就是leader,給它發(fā)送元數(shù)據(jù),讓leader進(jìn)行分配分區(qū);
    3. 發(fā)送SYNC_GROUP請(qǐng)求時(shí),leader會(huì)把分區(qū)分配方案告知broker,其余消費(fèi)者客戶端發(fā)送SYNC_GROUP時(shí),直接返回他們分配結(jié)果,告知對(duì)應(yīng)消費(fèi)內(nèi)容;
    4. 為何分四個(gè)狀態(tài):首次請(qǐng)求需要3次(find,join和sync),但是正常運(yùn)行的consumer僅需2次(join和sync,他們知道coorinator在哪里),除非是coordinator犧牲導(dǎo)致的reblance,那么老的consumer也要3次請(qǐng)求。

3. 線上問(wèn)題排查 & 修復(fù)

  • 線上排查重平衡rebalance
    • 概述:觸發(fā)只有三個(gè)可能,需線上排查的場(chǎng)景一般為消費(fèi)者減少導(dǎo)致的rebalance,其他兩個(gè)較容易發(fā)現(xiàn)(新增partiton是人為的,coordinator down的話會(huì)有告警)。
    • 消費(fèi)者減少 ———— 心跳超時(shí)
      • 認(rèn)為消費(fèi)者犧牲了,一般是因?yàn)橄M(fèi)者機(jī)器負(fù)載過(guò)高,其cpu無(wú)暇發(fā)送心跳;
      • 心跳超時(shí)有onExpireHeartbeat的日志打印
    • 消費(fèi)者減少 ———— poll間隔超過(guò)配置
      • poll即為拉取數(shù)據(jù)頻率,間隔超出配置是因?yàn)閜oll出數(shù)據(jù)后業(yè)務(wù)處理過(guò)慢,要么優(yōu)化業(yè)務(wù)邏輯,要么多線程消費(fèi)、異步處理;
      • 較依賴(lài)排除法判斷,若消費(fèi)者減少且沒(méi)看到是心跳超時(shí)日志,基本就是poll間隔超過(guò)配置。
  • 消費(fèi)者如何感知重平衡rebalance
    • 概述:消費(fèi)組有關(guān)的metadata都在coordinator中,消費(fèi)者的心跳(heart beat)和消費(fèi)位置(offset commit)都會(huì)和coordinator交互。
    • 心跳:每次提交心跳都會(huì)帶上group的紀(jì)元值(類(lèi)似epoch),每次kafka broker認(rèn)定完成一次rebalance紀(jì)元值都會(huì)+1,若果“心跳計(jì)數(shù)器”對(duì)不上就會(huì)開(kāi)啟重平衡。
    • 提交消費(fèi): 過(guò)程和心跳是一樣的。主要是依靠狀態(tài)和紀(jì)元來(lái)進(jìn)行感知,狀態(tài)生效于rebalance過(guò)程中,紀(jì)元?jiǎng)t在join完成階段后生效。
  • 如何減小重平衡rebalance的副作用
    • 概述:rebalance類(lèi)似于java中的stw,消費(fèi)者發(fā)送join group時(shí)是停止消費(fèi)的。
    • 降低影響面的手段
      • 一個(gè)consumer group中避免有過(guò)多的consumers,做好隔離分類(lèi)工作;
      • 避免消費(fèi)者consumer變動(dòng)過(guò)于頻繁;
      • 對(duì)consumer做好cpu的監(jiān)控,避免高負(fù)載導(dǎo)致心跳丟失;
      • 設(shè)置合適的poll間隔,建議使用異步處理poll出來(lái)的信息;
      • 配置靜態(tài)消費(fèi)者(static member),降低rebalance頻次(具體參考博文)。

B. 生產(chǎn)者producer客戶端框架

1. 基礎(chǔ)定義

  • 生產(chǎn)者結(jié)構(gòu)和模式
    • producer僅有兩個(gè)類(lèi)別的線程組成,業(yè)務(wù)線程(主線程) & sender線程(網(wǎng)絡(luò)線程)。
    • producer把消息推送至broker,其默認(rèn)發(fā)送的方式是同步發(fā)送。
  • producer初始化過(guò)程
    1. 代入公共參數(shù),例如maxRequestSize, totalMemorySize, compressionType等;
    2. 設(shè)置主線程的三個(gè)組件——攔截器、序列化器 & 分區(qū)器;
    3. 啟動(dòng)網(wǎng)絡(luò)線程sender;
    4. 注意:初始化階段producer不會(huì)同broker建立連接,在首次調(diào)用send方法時(shí)才會(huì)檢查對(duì)應(yīng)的partition的metadata,從metadata中獲取目標(biāo)partition的leader副本的broker地址,開(kāi)始建立通信。

2. 生產(chǎn)者Producer具體工作流程

  • 主線程
    • 概述:三個(gè)組件攔截器、分區(qū)器 & 序列化器承擔(dān)其主要功能,負(fù)責(zé)處理要發(fā)送的業(yè)務(wù)數(shù)據(jù),同時(shí)承擔(dān)batch批處理和RecordAccumulator緩沖區(qū)。
    • 攔截器:處理業(yè)務(wù)數(shù)據(jù)過(guò)程中,通過(guò)實(shí)現(xiàn)producerInterceptor方法定制需要的攔截效果。
    • 序列化器:把需要發(fā)送的數(shù)據(jù)按照指定的序列化器進(jìn)行序列化。
    • 分區(qū)器:將發(fā)送的數(shù)據(jù)進(jìn)行分區(qū)分配(消費(fèi)者的分區(qū)分配策略主要應(yīng)用于rebalance場(chǎng)景下,與該分區(qū)分配場(chǎng)景不一致),一般情況下會(huì)根據(jù)默認(rèn)分區(qū)器的算法決定該消息數(shù)據(jù)發(fā)往哪個(gè)partition,主要分為是否指定key的場(chǎng)景。
      • 指定key:根據(jù)hash計(jì)算key后確定發(fā)往哪個(gè)partition,潛在破壞kafka負(fù)載均衡的可能性,致使topic的分區(qū)不平均,broker承載量差異偏大。
      • 未指定key:根據(jù)當(dāng)前存活的分區(qū)進(jìn)行輪詢,屬默認(rèn)情況下的方案。
    • batch:順序完成上方三個(gè)步驟后消息會(huì)按照不同的分區(qū)打包,以batch形式發(fā)送以提升網(wǎng)絡(luò)利用率。
      • linger.ms:等待組包的最大時(shí)長(zhǎng),默認(rèn)值為0,即不等待直接發(fā)送,若對(duì)時(shí)延不敏感可設(shè)置為10ms。
      • batch.size:組包的大小限制,默認(rèn)是16kb。注意,必須調(diào)節(jié)linger.ms的默認(rèn)值以后再調(diào)整batch.size才有意義。
    • RecordAccumulator:本質(zhì)就是一個(gè)內(nèi)存隊(duì)列,是線程安全的map,將上一步獲得的batch暫存一段時(shí)間等待后續(xù)sender線程來(lái)調(diào)用,默認(rèn)參數(shù)buffer.memory為32M。
  • 網(wǎng)絡(luò)線程sender
    • 工作流程
      1. 拉取數(shù)據(jù):從RecordAccumulator緩沖區(qū)拉取batch,根據(jù)partition把batch封裝成request;
      2. 查詢目標(biāo)broker:查詢metadata確認(rèn)去哪個(gè)broker,通過(guò)socketChannel將request發(fā)送至目標(biāo)broker,broker由網(wǎng)絡(luò)模型中的accpetor接收request,通過(guò)processor線程一路往下(細(xì)節(jié)參考第四章節(jié)的kafka服務(wù)端網(wǎng)絡(luò)通信模型);
      3. 得到request的結(jié)果(success, fail, timeout)后,將結(jié)果回調(diào)告知producer;
    • 關(guān)鍵參數(shù)
      • request.timeout.ms:sender每次獲取RecordAccumulator中的首個(gè)batch,其余保持等待,若超過(guò)request.timeout.ms就被清除。
      • max.in.flight:設(shè)置異步發(fā)送時(shí)生效,request發(fā)送給socketChannel前先送至inFlightRequsts隊(duì)列中,同時(shí)把request給到selector去發(fā)送,inFlightRequsts用于記錄給哪些broker發(fā)送了多少request。該隊(duì)列本質(zhì)是hashmap的結(jié)構(gòu),且sender是單線程,不用在意線程安全問(wèn)題。參數(shù)max.in.flight.requests.per.connection會(huì)限制producer發(fā)出的請(qǐng)求,默認(rèn)為5。當(dāng)5個(gè)消息發(fā)出后,producer沒(méi)有收到broker的響應(yīng),那么就會(huì)拋出異常。
      • selector:nio中的多路復(fù)用器,kafka改進(jìn)后進(jìn)行了封裝,其功能是將request通過(guò)send方法最終發(fā)送出去。
      • 回調(diào):數(shù)據(jù)發(fā)送后,等待響應(yīng)。max.in.flight中超時(shí)的請(qǐng)求,也會(huì)被處理成response。

3. 增加Producer吞吐量手段

  • 單對(duì)象:kafka-java-producer的客戶端(即主線程部分)是線程安全的,所以使用時(shí)只創(chuàng)建一個(gè)producer。
  • 異步發(fā)送:若沒(méi)有嚴(yán)格的消息丟失、消息順序要求,可以選擇異步發(fā)送(同步的話max.in.flight這個(gè)參數(shù)是不起效的),并且ack選擇1降低等待成本(僅需確認(rèn)leader收到即可)。
  • 調(diào)整緩沖區(qū)大?。簉ecordAccumulator的buffer.memory可以從默認(rèn)的32m調(diào)整到64或者128m,讓更多的消息組包成batch后可以被塞入這個(gè)內(nèi)存隊(duì)列。
  • 優(yōu)化batch:首先修改linger.ms,默認(rèn)是0;其次可以修改batch.size確保個(gè)別較大的消息可以塞入,該操作可以增加吞吐量但會(huì)增加時(shí)延,業(yè)務(wù)對(duì)時(shí)延敏感的不建議調(diào)整。
  • 回調(diào)處理優(yōu)化:sender發(fā)送完畢后會(huì)執(zhí)行業(yè)務(wù)的回調(diào)以獲取發(fā)送成功與否的反饋,這部分回調(diào)函數(shù)的邏輯是開(kāi)發(fā)優(yōu)化重點(diǎn)。
  • jvm參數(shù)優(yōu)化:使用jstat查看jvm gc情況,根據(jù)實(shí)際調(diào)整,防止頻繁fullGC。
  • 排查順序:排查吞吐量偏低的優(yōu)先級(jí)為回調(diào)函數(shù) > 上方羅列的生產(chǎn)者參數(shù) > jvm參數(shù) > 網(wǎng)絡(luò)問(wèn)題 > kafka broker配置

C. 消費(fèi)者consumer客戶端框架

1. 基礎(chǔ)定義

  • 消費(fèi)者結(jié)構(gòu)概述
    • 作為broker的下游,其重要功能是保障及時(shí)消費(fèi)、rebalance分區(qū)分配 & 保證coordinator交互的leader完成這一動(dòng)作等。
    • 核心鏈路涉及四個(gè)步驟:初始化、消息消費(fèi)、位移提交 & 心跳。
  • 消費(fèi)者consumer初始化
    1. 基礎(chǔ)參數(shù)代入設(shè)置,例如clientID & groupID等。
    2. 攔截器 & 序列化模塊的初始化。
    3. 網(wǎng)絡(luò)化模塊初始化。
    4. 協(xié)調(diào)器(coordinator)初始化。
    5. 消息拉取組件(fetcher)初始化。

2. 消費(fèi)者Consumer具體工作流程

  • 消息消費(fèi)步驟拆分
    • 訂閱:初始化consumer完成后,在消息消費(fèi)前需要訂閱topic list,具體有三種方式。
      • 手動(dòng)指定一個(gè)topic列表給消費(fèi)者,使用consumer.subscribe(Collections.singletonList(this.topic))
      • 強(qiáng)制指定目標(biāo)消費(fèi)分區(qū),喪失消費(fèi)組特性,使用consumer.assign(partitions),此舉會(huì)影響rebalance故障轉(zhuǎn)移的效果,不建議使用。
      • 使用正則匹配topic并可設(shè)置rebalance的回調(diào)方法,一般用于靈活匹配topic列表時(shí),具體方法為consumer.subscribe(pattern, callback)。
    • 消費(fèi) —— poll整體流程
      • 整體邏輯就是基于while(true)這個(gè)循環(huán)進(jìn)行消費(fèi),poll(timeout)來(lái)控制具體細(xì)節(jié),即在poll過(guò)程中若所有對(duì)應(yīng)的分區(qū)沒(méi)有未消費(fèi)的消息,則broker最多等待時(shí)長(zhǎng)為timeout的數(shù)值;
      • poll若等待達(dá)到了timeout時(shí)長(zhǎng),仍未有新消息寫(xiě)入則返回一個(gè)空值并進(jìn)去下輪poll;若正常讀取到新數(shù)據(jù),則立刻拿到數(shù)據(jù)返回并準(zhǔn)備下輪拉?。?/li>
      • 一般poll的timeout設(shè)置在100ms - 1000ms之間,不建議太短,可能導(dǎo)致broker高頻處理消費(fèi)者請(qǐng)求(實(shí)際都是空值,因?yàn)閜roducer沒(méi)寫(xiě)入),建議設(shè)置為500ms。
    • 消費(fèi) —— 核心函數(shù)pollOnce(remaining)
      • 激活coordinator:利用Find_Coordinator方法確認(rèn)自身對(duì)應(yīng)的coordinator,隨后激活消費(fèi)組(即啟動(dòng)心跳與coordinator交互,發(fā)送join_group),最后判斷是否執(zhí)行自動(dòng)提交。
      • 從緩存隊(duì)列completeFetchs拉取數(shù)據(jù)并返回。
      • 緩存中若無(wú)數(shù)據(jù)則發(fā)送fetch請(qǐng)求,后續(xù)會(huì)將數(shù)據(jù)存在緩存隊(duì)列中。
  • 位移提交:消費(fèi)完需要讓broker知道,通過(guò)commit offset(具體入口也在pollOnce),具體有三種方式。
    • 自動(dòng)提交:保證高吞吐但可能出現(xiàn)重復(fù)消息,開(kāi)啟enable.auto.commit=true,每5s提交一次。
    • 異步提交:兼顧可靠性和吞吐量,是kafka consumer默認(rèn)的提交位移方式,每消費(fèi)完就發(fā)送一條commit_offset請(qǐng)求,但不等待broker的響應(yīng)。
    • 同步提交:保障消息不丟失但吞吐量降低,每消費(fèi)完就發(fā)送一條commit_offset請(qǐng)求,需等待broker回復(fù)后才可繼續(xù)消費(fèi)。
  • 心跳:心跳不啟動(dòng)于初始化,啟動(dòng)于poolOnce階段的首次poll方法。該線程是守護(hù)線程故priority較低,cpu負(fù)載高時(shí)可能會(huì)搶占不到資源導(dǎo)致心跳中斷。心跳線程具體工作內(nèi)容如下:
    1. 檢測(cè)心跳是否超時(shí),若超時(shí)改coordinator為unknown,下一輪poll開(kāi)啟rebalance。
    2. 檢測(cè)兩次poll的間隔是否大于max.poll.interval.ms,默認(rèn)是5分鐘,超過(guò)了也會(huì)發(fā)出leaveGroup請(qǐng)求,于下輪poll開(kāi)啟rebalance。
    3. 檢測(cè)是否到了心跳間隔時(shí)間,若沒(méi)到就設(shè)置重試間隔為100ms。
    4. 時(shí)間到后,即發(fā)送心跳請(qǐng)求,并設(shè)置對(duì)應(yīng)listener。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容