kafka學(xué)習(xí)筆記

  • kafka屬于消息引擎系統(tǒng), 主要用于系統(tǒng)間傳輸消息, 可以做到系統(tǒng)業(yè)務(wù)上的解耦, 緩沖系統(tǒng)上下游瞬時(shí)突發(fā)流量,使其更平滑(削峰填谷)。

kafka系統(tǒng)里各種概念

  • 消息:Record。Kafka 是消息引擎嘛,這里的消息就是指 Kafka 處理的主要對(duì)象。
  • 主題:Topic。主題是承載消息的邏輯容器,在實(shí)際使用中多用來區(qū)分具體的業(yè)務(wù)。
  • 分區(qū):Partition。一個(gè)有序不變的消息序列。每個(gè)主題下可以有多個(gè)分區(qū)。
  • 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個(gè)單調(diào)遞增且不變的值。
  • 副本:Replica。Kafka 中同一條消息能夠被拷貝到多個(gè)地方以提供數(shù)據(jù)冗余,這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。副本是在分區(qū)層級(jí)下的,即每個(gè)分區(qū)可配置多個(gè)副本實(shí)現(xiàn)高可用。
  • 生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。
  • 消費(fèi)者:Consumer。從主題訂閱新消息的應(yīng)用程序。
  • 消費(fèi)者位移:Consumer Offset。表征消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。
  • 消費(fèi)者組:Consumer Group。多個(gè)消費(fèi)者實(shí)例共同組成的一個(gè)組,同時(shí)消費(fèi)多個(gè)分區(qū)以實(shí)現(xiàn)高吞吐。
  • 重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。
  • kafka的各種概念如下圖所示:
    kafka相關(guān)概念圖

    重點(diǎn): kafka里的副本針對(duì)的是分區(qū)來做的, 副本不提供對(duì)外的服務(wù),只記錄消息數(shù)據(jù),kafka通過對(duì)topic分區(qū)來實(shí)現(xiàn)消息系統(tǒng)的負(fù)載。

其他

kafka版本

生產(chǎn)者

生產(chǎn)者發(fā)送數(shù)據(jù)流程
  • 如果想指定生產(chǎn)者發(fā)消息的分區(qū)策略, 可以在生產(chǎn)端配置參數(shù): partitioner.class, 對(duì)應(yīng)的class需要實(shí)現(xiàn): org.apache.kafka.clients.producer.Partitioner 這個(gè)接口。
  • 生產(chǎn)者默認(rèn)的分區(qū)策略是根據(jù)消息指定的key發(fā)送到指定的分區(qū)(這也是生產(chǎn)者保證消息有序性的要點(diǎn)),如果消息沒有指定key, 采用的是輪詢策略。具體可以看 DefaultPartitioner這個(gè)類的實(shí)現(xiàn)
  • 為了提高生產(chǎn)者的發(fā)送效率, 在發(fā)送消息的時(shí)候, 可以對(duì)要發(fā)送的消息做壓縮處理。配置參數(shù)為: "compression.type"。 啟用壓縮需要在生產(chǎn)端的cpu資源有多余的情況下(一般業(yè)務(wù)系統(tǒng)都是I/O密集型的)。
  • kafka發(fā)送的消息, 在發(fā)送的時(shí)候, 會(huì)把多條消息放在一起, 組成消息集合,在Broker端存的消息是發(fā)送端發(fā)送的"消息集合"
  • 避免在Broker配置compression.type, 防止Broker端配置的compression.type跟生產(chǎn)端配置的不一樣, 如果配置的不一樣, Broker需要對(duì)消息集合做解壓縮, 讓后用Broker配置的壓縮算法重新壓縮消息, 對(duì)Broker的性能有極大的影響。
  • 解壓縮發(fā)生在Consumer端, 壓縮算法在消息集合里。
  • 壓縮算法的對(duì)吧吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
  • 發(fā)送消息的時(shí)候, 一定要用通過回調(diào)方法驗(yàn)證消息是否發(fā)送成功, 不然發(fā)送端有可能會(huì)有丟消息的風(fēng)險(xiǎn)。
  • 設(shè)置 retries 為一個(gè)較大的值,當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。

生產(chǎn)端TCP連接相關(guān)

  • KafkaProducer 實(shí)例創(chuàng)建時(shí)啟動(dòng) Sender 線程,從而創(chuàng)建與 bootstrap.servers 中所有 Broker 的 TCP 連接。
  • KafkaProducer 實(shí)例首次更新元數(shù)據(jù)信息之后,還會(huì)再次創(chuàng)建與集群中所有 Broker 的 TCP 連接。
  • 如果 Producer 端發(fā)送消息到某臺(tái) Broker 時(shí)發(fā)現(xiàn)沒有與該 Broker 的 TCP 連接,那么也會(huì)立即創(chuàng)建連接。
  • 如果設(shè)置 Producer 端 connections.max.idle.ms 參數(shù)大于 0,則步驟 1 中創(chuàng)建的 TCP 連接會(huì)被自動(dòng)關(guān)閉;如果設(shè)置該參數(shù) =-1,那么步驟 1 中創(chuàng)建的 TCP 連接將無法被關(guān)閉,從而成為“僵尸”連接。

消費(fèi)者

消費(fèi)者總體工作流程
消費(fèi)者初始化流程
消費(fèi)者組消費(fèi)詳細(xì)流程
  • Consumer分區(qū)的分配策略是在消費(fèi)端來處理的, 并非在Broker端做的分配方案,
  • kafka中消費(fèi)者組是一個(gè)很重要的概念, 消費(fèi)者通過Group_Id來標(biāo)識(shí)自己屬于那一個(gè)消費(fèi)者組, 消費(fèi)者組整體消費(fèi)某一個(gè)Topic, 每個(gè)分區(qū)只會(huì)有一個(gè)消費(fèi)者組的消費(fèi)者來消費(fèi)。
  • Consumer端有個(gè)參數(shù)enable.auto.commit,把它設(shè)置成false,并采用手動(dòng)提交位移的方式。
  • partition.assignment.strategy:消費(fèi)者分區(qū)分配策略,默認(rèn)策略Range+CooperativeSticky。Kafka可以同時(shí)使用多個(gè)分區(qū)分配策略??梢赃x擇的策略包括:Range、RoundRobin、Sticky、CooperativeSticky
  • 注意消費(fèi)端如果掉線了, 或者執(zhí)行的任務(wù)過程, 會(huì)導(dǎo)致消費(fèi)端觸發(fā)“重平衡”, 重平衡是很重的操作, 需要盡量避免
  • __consumer_offsets 主題里面采用 key 和 value 的方式存儲(chǔ)數(shù)據(jù)。key 是group.id+topic+分區(qū)號(hào),value 就是當(dāng)前 offset 的值。每隔一段時(shí)間,kafka 內(nèi)部會(huì)對(duì)這個(gè) topic 進(jìn)行compact,也就是每個(gè) group.id+topic+分區(qū)號(hào)就保留最新數(shù)據(jù)。
  • Consumer offset是很重要的, 可以參考這篇文章: https://blog.csdn.net/warybee/article/details/121990020

Broker端

Zookeeper中存儲(chǔ)的Kafka 信息
  • 設(shè)置 unclean.leader.election.enable = false,它控制的是哪些 Broker 有資格競(jìng)選分區(qū)的 Leader。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
  • 設(shè)置 replication.factor >= 3, 最好將消息多保存幾份,畢竟目前防止消息丟失的主要機(jī)制就是冗余
  • 設(shè)置 min.insync.replicas > 1, 控制的是消息至少要被寫入到多少個(gè)副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實(shí)際環(huán)境中千萬不要使用默認(rèn)值 1。
  • 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容