Q&A-11 Kafka

參考鏈接:

為什么要使用 kafka?

緩沖和削峰:上游數(shù)據(jù)時有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機(jī)器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務(wù)就可以按照自己的節(jié)奏進(jìn)行慢慢處理。

解耦和擴(kuò)展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業(yè)務(wù)流程。只需要遵守約定,針對數(shù)據(jù)編程即可獲取擴(kuò)展能力。

冗余:可以采用一對多的方式,一個生產(chǎn)者發(fā)布消息,可以被多個訂閱topic的服務(wù)消費(fèi)到,供多個毫無關(guān)聯(lián)的業(yè)務(wù)使用。

健壯性:消息隊列可以堆積請求,所以消費(fèi)端業(yè)務(wù)即使短時間死掉,也不會影響主要業(yè)務(wù)的正常進(jìn)行。

異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機(jī)制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

kafka和其他MQ的對比

kafka的優(yōu)勢:

  • 吞吐量更大,性能更高
  • 兼容性是最好的沒有之?,尤其在?數(shù)據(jù)和流計算領(lǐng)域。

kafka的數(shù)據(jù)可靠性怎么保證:ack應(yīng)答機(jī)制

為保證producer發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個partition收到producer發(fā)送的數(shù)據(jù)后,都需要向producer發(fā)送ack(acknowledgement確認(rèn)收到),如果producer收到ack,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。所以引出ack機(jī)制。ack應(yīng)答機(jī)制Kafka為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。

acks參數(shù)配置:

  • 0:producer不等待broker的ack,這一操作提供了一個最低的延遲,broker一接收到還沒有寫入磁盤就已經(jīng)返回,當(dāng)broker故障時有可能丟失數(shù)據(jù)。

  • 1(默認(rèn)):producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會丟失數(shù)據(jù)。

  • -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會造成數(shù)據(jù)重復(fù)。

Kafka的數(shù)據(jù)是放在磁盤上還是內(nèi)存上,為什么速度會快?

  • 順序?qū)懭?/strong>
  • parition大文件分成多個小文件段,容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用
  • 稀疏索引

kafka使用的是磁盤存儲。速度快是因為:順序?qū)懭耄阂驗橛脖P是機(jī)械結(jié)構(gòu),每次讀寫都會尋址->寫入,其中尋址是一個“機(jī)械動作”,它是耗時的。所以硬盤 “討厭”隨機(jī)I/O, 喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。Memory Mapped Files(內(nèi)存映射文件):64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來實現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上。

Kafka高效文件存儲設(shè)計:Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用。通過索引信息可以快速定位 message和確定response的 大 小。通過index元數(shù)據(jù)全部映射到memory(內(nèi)存映射文件), 可以避免segment file的IO磁盤操作。通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。

注:Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設(shè)將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨(dú)的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中 小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。為數(shù)據(jù)文件建 索引數(shù)據(jù)文件分段 使得可以在一個較小的數(shù)據(jù)文件中查找對應(yīng)offset的Message 了,但是這依然需要順序掃描才能找到對應(yīng)offset的Message。

為了進(jìn)一步提高查找的效率,Kafka為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴(kuò)展名為.index。

LEO 和 HW

image.png

LEO:log end offset,指的是每個副本最大的offset。

HW:high water,ISR隊列中最小的LEO,指的是消費(fèi)者能見到的最大的offset。

Kafka為什么不支持讀寫分離?

kafka 是主寫主讀模式。

  • 主寫從讀的話,主從數(shù)據(jù)不一致
  • kafka 主從節(jié)點數(shù)據(jù)同步更耗時

在 Kafka 中,生產(chǎn)者寫入消息、消費(fèi)者讀取消息的操作都是與 leader 副本進(jìn)行交互的,從 而實現(xiàn)的是一種主寫主讀的生產(chǎn)消費(fèi)模型。Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明顯的缺點:數(shù)據(jù)一致性問題:數(shù)據(jù)從主節(jié)點轉(zhuǎn)到從節(jié)點必然會有一個延時的時間窗口,這個時間 窗口會導(dǎo)致主從節(jié)點之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點和從節(jié)點中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點之前,應(yīng)用讀取從節(jié)點中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產(chǎn)生了數(shù)據(jù)不一致的問題。

延時問題:類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點到同步至從節(jié)點中的過程需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點內(nèi)存→網(wǎng)絡(luò)→從節(jié)點內(nèi)存 這幾個階段,整個過程會耗費(fèi)一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經(jīng)歷 網(wǎng)絡(luò)→主節(jié)點內(nèi)存→主節(jié)點磁盤→網(wǎng)絡(luò)→從節(jié) 點內(nèi)存→從節(jié)點磁盤 這幾個階段。對延時敏感的應(yīng)用而言,主寫從讀的功能并不太適用。

Kafka 的多分區(qū)(Partition)以及多副本(Replica)機(jī)制有什么好處呢?

  1. 并發(fā)(負(fù)載均衡):Kafka 通過給特定 Topic 指定多個 Partition, ?各個 Partition 可以分布在不同的 Broker 上,這樣便能提供?較好的并發(fā)能?(負(fù)載均衡)。
  2. 安全(容災(zāi)能力):Partition 可以指定對應(yīng)的 Replica 數(shù), 這也極?地提?了消息存儲的安全性, 提?了容災(zāi)能?,不過也相應(yīng)的增加了所需要的存儲空間。

核心概念

  • zookeeper
  • Producer 生產(chǎn)者
  • Consumer 消費(fèi)者, ConsumerGroup 消費(fèi)組
  • Broker 代理
    kafka以集群的方式運(yùn)行,可以由一個或多個服務(wù)組成,每個服務(wù)叫做一個 broker
  • Topic 主題
  • Partition 分區(qū) => 副本集,leader 和 follower
image.png

Zookeeper 在 Kafka 中的作?知道嗎?

image.png

ZooKeeper 主要為 Kafka 提供元數(shù)據(jù)的管理的功能。

從圖中我們可以看出,Zookeeper 主要為 Kafka 做了下?這些事情:

  1. Broker 注冊 :在 Zookeeper 上會有?個專??來進(jìn)? Broker 服務(wù)器列表記錄的節(jié)點。每個 Broker 在啟動時,都會到 Zookeeper 上進(jìn)?注冊,即到/brokers/ids 下創(chuàng)建屬于??的節(jié)點。每個 Broker 就會將??的 IP 地址和端?等信息記錄到該節(jié)點中去
  2. Topic 注冊 : 在 Kafka 中,同?個Topic 的消息會被分成多個分區(qū)并將其分布在多個Broker 上,這些分區(qū)信息及與 Broker 的對應(yīng)關(guān)系也都是由 Zookeeper 在維護(hù)。?如我創(chuàng)建了?個名字為 my-topic 的主題并且它有兩個分區(qū),對應(yīng)到 zookeeper 中會創(chuàng)建這些?件夾: /brokers/topics/my-topic/Partitions/0 、 /brokers/topics/my-topic/Partitions/1
  3. 負(fù)載均衡 :上?也說過了 Kafka 通過給特定 Topic 指定多個 Partition, ?各個 Partition 可以分布在不同的 Broker 上, 這樣便能提供??好的并發(fā)能?。 對于同?個 Topic 的不同Partition,Kafka 會盡?將這些 Partition 分布到不同的 Broker 服務(wù)器上。當(dāng)?產(chǎn)者產(chǎn)?消息后也會盡量投遞到不同 Broker 的 Partition ??。當(dāng) Consumer 消費(fèi)的時候,Zookeeper 可以根據(jù)當(dāng)前的 Partition 數(shù)量以及 Consumer 數(shù)量來實現(xiàn)動態(tài)負(fù)載均衡。
  4. ......

生產(chǎn)者 push & 消費(fèi)者 pull

生產(chǎn)者 push

消費(fèi)者 pull:

  • 如果是push的話,不同消費(fèi)速率的 consumer 不好處理。
  • consumer 可以自主決定是否批量的從 broker 拉取數(shù)據(jù)。
  • 缺點:如果 broker 沒有可供消費(fèi)的消息,將導(dǎo)致 consumer 不斷在循環(huán)中輪詢,直到新消息到 t 達(dá)。為了避免這點,Kafka 有個參數(shù)可以讓 consumer 阻塞直到新消息到達(dá)。(當(dāng)然也可以阻塞直到消息的數(shù)量達(dá)到某個特定的量這樣就可以批量發(fā)。)

Kafka 判斷一個節(jié)點是否還活著有那兩個條件?

  1. 節(jié)點必須可以維護(hù)和 ZooKeeper 的連接,Zookeeper 通過心跳機(jī)制檢查每個節(jié)點的連接
  2. 如果節(jié)點是個 follower,他必須能及時的同步 leader 的寫操作,延時不能太久

topic, partition 和 offset

每個 Topic 可以劃分多個分區(qū)(每個 Topic 至少有一個分區(qū)),同一 Topic 下的不同分區(qū)包含的消息是不同的。

每個消息在被添加到分區(qū)時,都會被分配一個 offset,它是消息在此分區(qū)中的唯一編號,Kafka 通過 offset 保證消息在分區(qū)內(nèi)的順序,offset 的順序不跨分區(qū),即 Kafka 只保證在同一個分區(qū)內(nèi)的消息是有序的。

image.png

Kafka 分區(qū)數(shù)可以增加或減少嗎?為什么?

我們可以使用 bin/kafka-topics.sh 命令對 Kafka 增加 Kafka 的分區(qū)數(shù)據(jù),但是 Kafka 不支持減少分區(qū)數(shù)。

Kafka 分區(qū)數(shù)據(jù)不支持減少是由很多原因的,比如減少的分區(qū)其數(shù)據(jù)放到哪里去?是刪除,還是保留?刪除的話,那么這些沒消費(fèi)的消息不就丟了。如果保留這些消息如何放到其他分區(qū)里面?追加到其他分區(qū)后面的話那么就破壞了 Kafka 單個分區(qū)的有序性。如果要保證刪除分區(qū)數(shù)據(jù)插入到其他分區(qū)保證有序性,那么實現(xiàn)起來邏輯就會非常復(fù)雜。

參考鏈接:https://blog.csdn.net/weixin_39860755/article/details/112076339

producer

Kafka 中最基本的數(shù)據(jù)單元就是消息,而一條消息其實是由 Key + Value 組成的(Key 是可選項,可傳空值,Value 也可以傳空值),這也是與 ActiveMQ 不同的一個地方。

每個消息在被添加到分區(qū)時,都會被分配一個 offset,它是消息在此分區(qū)中的唯一編號,Kafka 通過 offset 保證消息在分區(qū)內(nèi)的順序,offset 的順序不跨分區(qū),即 Kafka 只保證在同一個分區(qū)內(nèi)的消息是有序的。

producer 直接將數(shù)據(jù)發(fā)送到 partition 的 leader(主節(jié)點),然后 follower 才能從 leader 中拉取消息進(jìn)?同步。

producer 消息分發(fā):

  • 可以指定 Key,不指定 partition
    那么 Producer 會根據(jù) Key 和 partition 機(jī)制來判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲到哪個 partition 中(這個就跟分片機(jī)制類似)。默認(rèn)情況下,Kafka 采用的是 hash 取 % 的分區(qū)算法。
    如果 Key 為 null,則會隨機(jī)分配一個分區(qū)。這個隨機(jī)是在這個參數(shù)“metadata.max.age.ms“的時間范圍內(nèi)隨機(jī)選擇一個。對于這個時間段內(nèi),如果 Key 為 null,則只會發(fā)送到唯一的分區(qū)。這個值默認(rèn)情況下是 10 分鐘更新一次(因為 partition 狀態(tài)可能會發(fā)生變化)。
  • 可以同時指定 Key 和 partition
  • 可以根據(jù)需要進(jìn)行擴(kuò)展 Producer 的 partition 機(jī)制

消息在被追加到 Partition(分區(qū))的時候都會分配?個特定的偏移量(offset)。Kafka 通過偏移量(offset)來保證消息在分區(qū)內(nèi)的順序性。
每次添加消息到 Partition(分區(qū)) 的時候都會采?尾加法,如上圖所示。Kafka 只能為我們保證Partition(分區(qū)) 中的消息有序,?不能保證 Topic(主題) 中的 Partition(分區(qū)) 的有序。

consumer 消費(fèi)消息

當(dāng)消費(fèi)者拉取到了分區(qū)的某個消息之后,消費(fèi)者會?動提交 offset。

單個consumer

單個 consumer 消費(fèi)消息:

  • 可以指定topic
  • 可以指定topic和partition

消費(fèi)組 consumer group

image.png

同一個topic / partition也可以由多個Consumer Group并發(fā)消費(fèi)。
同一個Consumer Group可以并發(fā)地消費(fèi)多個topic的消息。
同一個Consumer Group中,一個partition只能由一個consumer消費(fèi)。
同一個Consumer Group中,不同consumer可以訂閱不同topic。

對同一個 Group 來說,其中的 Consumer 可以消費(fèi)指定分區(qū)也可以消費(fèi)自動分配的分區(qū)。

  • Consumer 數(shù)量和 partition 數(shù)量一致:均勻分配
  • Consumer 數(shù)量大于 partition 數(shù)量:consumer浪費(fèi),應(yīng)該避免這種情況
  • Consumer 數(shù)量小于 partition 數(shù)量:Kafka 分區(qū)分配策略

Consumer Group 消費(fèi) partition:Kafka 分區(qū)分配策略

可以通過partition.assignment.strategy參數(shù)選擇 range 或 roundrobin。
partition.assignment.strategy參數(shù)默認(rèn)的值是range。

Kafka 對于分配策略這塊,提供了可插拔的實現(xiàn)方式,也就是說,除了以下這3種之外,我們還可以創(chuàng)建自己的分配機(jī)制。

  • Range strategy(范圍分區(qū))
  • RoundRobin strategy(輪詢分區(qū))
  • StickyAssignor(粘性分區(qū))

1. Range strategy(范圍分區(qū))

Range 策略是對每個主題而言的,首先對同一個主題里面的分區(qū)按照序號進(jìn)行排序,并對消費(fèi)者按照字母順序進(jìn)行排序。假設(shè)我們有10個分區(qū),3個消費(fèi)者,排完序的分區(qū)將會是0,1,2,3,4,5,6,7,8,9;消費(fèi)者線程排完序?qū)荂1-0, C2-0, C3-0。然后將 partitions 的個數(shù)除于消費(fèi)者線程的總數(shù)來決定每個消費(fèi)者線程消費(fèi)幾個分區(qū)。如果除不盡,那么前面幾個消費(fèi)者線程將會多消費(fèi)一個分區(qū)。

假如在 Topic1 中有 10 個分區(qū),3 個消費(fèi)者線程,10/3 = 3,而且除不盡,那么消費(fèi)者線程 C1-0 將會多消費(fèi)一個分區(qū),所以最后分區(qū)分配的結(jié)果是這樣的:

C1-0 將消費(fèi) 0,1,2,3 分區(qū)
C2-0 將消費(fèi) 4,5,6 分區(qū)
C3-0 將消費(fèi) 7,8,9 分區(qū)

假如在 Topic1 中有 11 個分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:

C1-0 將消費(fèi) 0,1,2,3 分區(qū)
C2-0 將消費(fèi) 4, 5, 6, 7 分區(qū)
C3-0 將消費(fèi) 8,9,10 分區(qū)

假如有兩個 Topic:Topic1 和 Topic2,都有 10 個分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:

C1-0 將消費(fèi) Topic1 的 0,1,2,3 分區(qū)和 Topic1 的 0,1,2,3 分區(qū)
C2-0 將消費(fèi) Topic1 的 4,5,6 分區(qū)和Topic2 的 4,5,6 分區(qū)
C3-0 將消費(fèi) Topic1 的 7,8,9 分區(qū)和Topic2 的 7,8,9 分區(qū)

其實這樣就會有一個問題,C1-0 就會多消費(fèi)兩個分區(qū),這就是一個很明顯的弊端。

2. RoundRobin strategy(輪詢分區(qū))

輪詢分區(qū)策略是把所有 partition 和所有 Consumer 線程都列出來,然后按照 hashcode 進(jìn)行排序。最后通過輪詢算法分配partition 給消費(fèi)線程。如果所有 Consumer 實例的訂閱是相同的,那么 partition 會均勻分布。

假如按照 hashCode 排序完的 Topic / partitions組依次為T1一5, T1一3, T1-0, T1-8, T1-2, T1-1, T1-4,T1-7,T1-6,T1-9,消費(fèi)者線程排序為 C1-0,C1-1,C2-0,C2-1,最后的分區(qū)分配的結(jié)果為:

C1-0 將消費(fèi) T1-5, T1-2, T1-6分區(qū)
C1-1 將消費(fèi) T1-3, T1-1, T1-9分區(qū)
C2-0 將消費(fèi) T1-0, T1-4分區(qū)
C2-1 將消費(fèi) T1-8, T1-7分區(qū)

使用輪詢分區(qū)策略必須滿足兩個條件

  • 同一個消費(fèi)者組里面的所有消費(fèi)者的num.streams(消費(fèi)者消費(fèi)線程數(shù))必須相等;
  • 每個消費(fèi)者訂閱的主題必須相同。

3. StickyAssignor(粘性分區(qū))

我們再來看一下StickyAssignor策略,“sticky”這個單詞可以翻譯為“粘性的”,Kafka從0.11.x版本開始引入這種分配策略,它主要有兩個目的:

  1. 分區(qū)的分配要盡可能的均勻,分配給消費(fèi)者的主題分區(qū)數(shù)最多相差一個;
  2. 分區(qū)的分配盡可能的與上次分配的保持相同。

當(dāng)兩者發(fā)生沖突時,第一個目標(biāo)優(yōu)先于第二個目標(biāo)。

Rebalance 再均衡

Rebalance(再均衡:在同一個消費(fèi)者組當(dāng)中,分區(qū)的所有權(quán)從一個消費(fèi)者轉(zhuǎn)移到另外一個消費(fèi)者)機(jī)制,Rebalance顧名思義就是重新均衡消費(fèi)者消費(fèi)。

當(dāng)出現(xiàn)以下幾種情況時,Kafka 會進(jìn)行一次分區(qū)分配操作,也就是 Kafka Consumer 的 Rebalance:
(1) 一個 Consumer group 內(nèi) Consumer 的數(shù)量增加或減少
(3) Topic 新增了分區(qū)

消費(fèi)者消費(fèi)消息時,會記錄消費(fèi)者offset(注意不是分區(qū)的offset,不同的上下文環(huán)境一定要區(qū)分),這個消費(fèi)者的offset,也是保存在一個特殊的topic,叫做__consumer_offsets,它就一個作用,那就是保存消費(fèi)組里消費(fèi)者的offset。默認(rèn)創(chuàng)建時會生成50個分區(qū)(offsets.topic.num.partitions設(shè)置),一個副本,如果50個分區(qū)分布在50臺服務(wù)器上,將大大緩解消費(fèi)者提交offset的壓力。可以在創(chuàng)建消費(fèi)者的時候產(chǎn)生這個特殊消費(fèi)組。

如果只啟動了hadoop03一個broker,則所有的50個分區(qū)都會在這上面生成

[root@hadoop03 /home/software/kafka-2/bin]# sh kafka-console-consumer.sh --bootstrap-server hadoop03:9092 --topic football --from-beginning --new-consumer

那么問題來了,消費(fèi)者的offset到底保存到哪個分區(qū)呢,kafka中是按照消費(fèi)組group.id來確定的,使用Math.abs(groupId.hashCode())%50,來計算分區(qū)號,這樣就可以確定一個消費(fèi)組下的所有的消費(fèi)者的offset,都會保存到哪個分區(qū)了.

那么問題又來了,既然一個消費(fèi)組內(nèi)的所有消費(fèi)者都把offset提交到了__consumer_offsets下的同一個分區(qū),如何區(qū)分不同消費(fèi)者的offset呢?原來提交到這個分區(qū)下的消息,key是groupId+topic+分區(qū)號,value是消費(fèi)者offset。這個key里有分區(qū)號,注意這個分區(qū)號是消費(fèi)組里消費(fèi)者消費(fèi)topic的分區(qū)號。由于實際情況下一個topic下的一個分區(qū),只能被一個消費(fèi)組里的一個消費(fèi)者消費(fèi),這就不擔(dān)心offset混亂的問題了。

實際上,topic下多個分區(qū)均勻分布給一個消費(fèi)組下的消費(fèi)者消費(fèi),是由coordinator來完成的,它會監(jiān)聽消費(fèi)者,如果有消費(fèi)者宕機(jī)或添加新的消費(fèi)者,就會rebalance,使用一定的策略讓分區(qū)重新分配給消費(fèi)者。如下圖所示,消費(fèi)組會通過offset保存的位置在哪個broker,就選舉它作為這個消費(fèi)組的coordinator,負(fù)責(zé)監(jiān)聽各個消費(fèi)者心跳了解其健康狀況,并且將topic對應(yīng)的leader分區(qū),盡可能平均的分給消費(fèi)組里的消費(fèi)者,根據(jù)消費(fèi)者的變動,如新增一個消費(fèi)者,會觸發(fā)coordinator進(jìn)行rebalance。

Rebalance的過程如下:

  1. 所有consumer向coordinator發(fā)送請求,請求入組。一旦所有成員都發(fā)送了請求,coordinator會從中選擇一個consumer擔(dān)任leader的角色,并把組成員信息以及訂閱信息發(fā)給leader。
  2. leader開始分配消費(fèi)方案,指明具體哪個consumer負(fù)責(zé)消費(fèi)哪些topic的哪些partition。一旦完成分配,leader會將這個方案發(fā)給coordinator。
  3. coordinator接收到分配方案之后會把方案發(fā)給各個consumer,這樣組內(nèi)的所有成員就都知道自己應(yīng)該消費(fèi)哪些分區(qū)了。所以對于Rebalance來說,Coordinator起著至關(guān)重要的作用。

Kafka Consuemr 的 Rebalance 機(jī)制規(guī)定了一個 Consumer group 下的所有 Consumer 如何達(dá)成一致來分配訂閱 Topic 的每個分區(qū)。而具體如何執(zhí)行分區(qū)策略,就是前面提到過的分區(qū)策略。

參考資料:Kafka消費(fèi)者組三種分區(qū)分配策略roundrobin,range,StickyAssignor

如何保證 Kafka 中消息消費(fèi)的順序?

  1. 1 個 Topic 只對應(yīng)?個 Partition。
  2. (推薦)producer發(fā)送消息和consumer消費(fèi)消息的時候指定 key/Partition

Kafka 的多分區(qū)(Partition)以及多副本(Replica)機(jī)制有什么好處呢?

  1. Kafka 通過給特定 Topic 指定多個 Partition, ?各個 Partition 可以分布在不同的 Broker 上,這樣便能提供比較好的并發(fā)能?(負(fù)載均衡)。
  2. Partition 可以指定對應(yīng)的 Replica 數(shù), 這也極?地提?了消息存儲的安全性, 提?了容災(zāi)能?,不過也相應(yīng)的增加了所需要的存儲空間。

Kafka集群partitions/replicas默認(rèn)分配解析

副本分配算法如下:

  1. 將所有n個Broker和待分配的I個Partition排序.
  2. 將第i個Partition分配到第(i mod n)個Broker上.
  3. 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.

例:4個Broker,1個topic包含4個Partition,2 Replication:

image.png

例:6個Broker,1個topic包含6個Partition,2 Replication:

image.png

參考鏈接:https://blog.csdn.net/lizhitao/article/details/41778193

防止消息丟失

  1. 生產(chǎn)者丟失消息
  2. 消費(fèi)者丟失消息
  3. kafka丟失消息

生產(chǎn)者丟失消息

?產(chǎn)者(Producer) 調(diào)? send ?法發(fā)送消息之后,消息可能因為?絡(luò)問題并沒有發(fā)送過去。
所以,我們不能默認(rèn)在調(diào)? send ?法發(fā)送消息之后消息消息發(fā)送成功了。為了確定消息是發(fā)送成功,我們要判斷消息發(fā)送的結(jié)果。但是要注意的是 Kafka ?產(chǎn)者(Producer) 使? send ?法發(fā)送消息實際上是異步的操作。

  1. 如果消息發(fā)送失敗的話,我們檢查失敗的原因之后重新發(fā)送
  2. 推薦為 Producer 的 retries (重試次數(shù))設(shè)置?個?較合理的值,?般是 3 ,但是為了保證消息不丟失的話?般會設(shè)置?較??點。設(shè)置完成之后,當(dāng)出現(xiàn)?絡(luò)問題之后能夠?動重試消息發(fā)送,避免消息丟失。另外,建議還要設(shè)置重試間隔,因為間隔太?的話重試的效果就不明顯了,?絡(luò)波動?次你3次?下?就重試完了。

同步:

SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
 logger.info("?產(chǎn)者成功發(fā)送消息到" + sendResult.getProducerRecord().topic() +
"-> " + sendRe
 sult.getProducerRecord().value().toString());
}

異步:

ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, o);
 future.addCallback(result -> logger.info("?產(chǎn)者成功發(fā)送消息到topic:{}
partition:{}的消息", result.getRecordMetadata().topic(),
result.getRecordMetadata().partition()),
 ex -> logger.error("?產(chǎn)者發(fā)送消失敗,原因:{}",
ex.getMessage()));

消費(fèi)者丟失消息

當(dāng)消費(fèi)者拉取到了分區(qū)的某個消息之后,消費(fèi)者會?動提交了 offset。?動提交的話會有?個問題,試想?下,當(dāng)消費(fèi)者剛拿到這個消息準(zhǔn)備進(jìn)?真正消費(fèi)的時候,突然掛掉了,消息實際上并沒有被消費(fèi),但是 offset 卻被?動提交了。

解決辦法也?較粗暴,我們?動關(guān)閉閉?動提交 offset,每次在真正消費(fèi)完消息之后之后再???動提交 offset 。

但是,細(xì)?的朋友?定會發(fā)現(xiàn),這樣會帶來消息被重新消費(fèi)的問題。?如你剛剛消費(fèi)完消息之后,還沒提交 offset,結(jié)果??掛掉了,那么這個消息理論上就會被消費(fèi)兩次。

kafka丟失消息

  1. request.required.acks
  2. min.insync.replicas
  3. replication.factor
  4. replica.lag.time.max.ms
  5. unclean.leader.election.enable

request.required.acks

為保證producer發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個partition收到producer發(fā)送的數(shù)據(jù)后,都需要向producer發(fā)送ack(acknowledgement確認(rèn)收到),如果producer收到ack,就會進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。所以引出ack機(jī)制。ack應(yīng)答機(jī)制Kafka為用戶提供了三種可靠性級別,用戶根據(jù)對可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。

request.required.acks 有三個值 0 1 -1

0:生產(chǎn)者不會等待 broker 的 ack,這個延遲最低但是存儲的保證最弱當(dāng) server 掛掉的時候就會丟數(shù)據(jù)

1(默認(rèn)):服務(wù)端會等待 ack 值 leader 副本確認(rèn)接收到消息后發(fā)送 ack 但是如果 leader 掛掉后他不確保是否復(fù)制完成新 leader 也會導(dǎo)致數(shù)據(jù)丟失

-1 或 all:同樣在 1 的基礎(chǔ)上 服務(wù)端會等所有的 follower 的副本受到數(shù)據(jù)后才會受到 leader 發(fā)出的 ack,這樣數(shù)據(jù)不會丟失

replication.factor

設(shè)置 replication.factor >= 3
為了保證 leader 副本能有 follower 副本能同步消息,我們?般會為 topic 設(shè)置 replication.factor >= 3。這樣就可以保證每個 分區(qū)(partition) ?少有 3 個副本。雖然造成了數(shù)據(jù)冗余,但是帶來了數(shù)據(jù)的安全性。

min.insync.replicas

設(shè)置 min.insync.replicas > 1。
?般情況下我們還需要設(shè)置 min.insync.replicas> 1 ,這樣配置代表消息?少要被寫?到 2 個副本才算是被成功發(fā)送。min.insync.replicas 的默認(rèn)值為 1 ,在實際?產(chǎn)中應(yīng)盡量避免默認(rèn)值1。
但是,為了保證整個 Kafka 服務(wù)的?可?性,你需要確保 replication.factor > min.insync.replicas 。為什么呢?設(shè)想?下加?兩者相等的話,只要是有?個副本掛掉,整個分區(qū)就?法正常?作了。這明顯違反?可?性!?般推薦設(shè)置成 replication.factor = min.insync.replicas + 1。

ISR 和 replica.lag.time.max.ms

ack同步機(jī)制中,如果采用全部完成同步,才發(fā)送ack的副本的同步策略的話:提出問題:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個follower,因為某種故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。為了解決這個問題,就有了ISR。

ISR(in-sync replicas):同步副本列表。每個分區(qū)的 Leader 維護(hù)了一個動態(tài)的in-sync replica set (ISR),意為和leader保持同步的follower集合,ISR 列表里面就是這些 follower 副本的 Borker 編號。當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會給follower發(fā)送ack。如果follower長時間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。Leader發(fā)生故障之后,就會從ISR中選舉新的leader。只有 ISR 里的成員才有被選為 leader 的可能。

replica.lag.time.max.ms :延遲時間,指定了副本在復(fù)制消息時可被允許的最大延遲時間。

unclean.leader.election.enable

所以當(dāng)Leader掛掉了,而且 unclean.leader.election.enable=false 的情況下,Kafka 會從 ISR 列表中選擇第一個follower作為新的Leader,因為這個分區(qū)擁有最新的已經(jīng)committed的消息。通過這個可以保證已經(jīng)committed的消息的數(shù)據(jù)可靠性。

設(shè)置不清潔選舉 unclean.leader.election.enable = false
Kafka 0.11.0.0版本開始 unclean.leader.election.enable 參數(shù)的默認(rèn)值由原來的true 改為false。
我們最開始也說了我們發(fā)送的消息會被發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)?同步。多個 follower 副本之間的消息同步情況不?樣,當(dāng)我們配置了unclean.leader.election.enable = false 的話,當(dāng) leader 副本發(fā)?故障時就不會從 follower 副本中和 leader 同步程度達(dá)不到要求的副本中選擇出 leader ,這樣降低了消息丟失的可能性。

參考鏈接:https://blog.csdn.net/weixin_39860755/article/details/112076339

Kafka 新建的分區(qū)會在哪個目錄下

創(chuàng)建在啟動 Kafka 集群之前,我們需要配置好 broker配置中的log.dirs 參數(shù),其值是 Kafka的存放目錄,這個參數(shù)可以配置多個目錄,目錄之間使用逗號分隔,通常這些目錄是分布在不同的磁盤上用于提高讀寫性能。
當(dāng)然我們也可以配置 log.dir 參數(shù),含義一樣。只需要設(shè)置其中一個即可。如果 log.dirs 參數(shù)只配置了一個目錄,那么分配到各個 Broker 上的分區(qū)肯定只能在這個目錄下創(chuàng)建文件夾用于存放數(shù)據(jù)。

假設(shè) Kafka 消息文件存儲目錄

log.dirs=/tmp/kafka-logs

假設(shè) partition 數(shù)量為 4

/bin/kafka-topics.sh –create –zookeeper localhost:2181 –partitions 4 –topic mytopic –replication-factor 4

然后就能在 /tmp/kafka-logs 目錄中看到 4 個目錄。

但是如果 log.dirs 參數(shù)配置了多個目錄,那么創(chuàng)建新的分區(qū)目錄時,Kafka 會在哪個目錄中創(chuàng)建分區(qū)目錄呢?
答案是:Kafka 會在含有 partition 文件夾(分區(qū)目錄)數(shù)量最少的 Broker 文件夾中創(chuàng)建新的分區(qū)目錄,分區(qū)目錄名為 Topic名+分區(qū) ID。注意,是 partition 文件夾(分區(qū)目錄)數(shù)量最少的目錄,而不是磁盤使用量最少的目錄!也就是說,如果你給 log.dirs 參數(shù)新增了一個新的磁盤,新的分區(qū)目錄肯定是先在這個新的磁盤上創(chuàng)建直到這個新的磁盤目錄擁有的partition 文件夾(分區(qū)目錄)不是最少為止。
(即:某個broker,log.dirs配了多個目錄,在該broker新增一個partition,該partition的文件夾存放在哪個目錄中?)

Kafka 文件存儲

image.png
  • topic: 可以理解為一個消息隊列的名字
  • partition:為了實現(xiàn)擴(kuò)展性,一個非常大的topic可以分布到多個 broker(即服務(wù)器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列
  • segment:partition物理上由多個segment組成
  • message:每個segment文件中實際存儲的一條條數(shù)據(jù)就是message
  • offset:每個partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中,partition中的每個消息都有一個連續(xù)的序列號叫做offset,用于partition唯一標(biāo)識一條消息

topic-partition

topic和partition創(chuàng)建成功后存儲在log.dirs指定的目錄下,默認(rèn)的存儲位置在:/tmp/kafka-logs下。
目錄名稱:topic的名稱+有序序號,這個序號從0開始依次增加,如:

test-topic-0  
test-topic-1  
test-topic-2

segment

當(dāng)生產(chǎn)者往partition中存儲數(shù)據(jù)時,內(nèi)存中存不下了,就會往segment file里面存儲。
在每個partition文件夾中有可以分為多個segment file。每個segment file對應(yīng)3個文件,分別是.log數(shù)據(jù)文件、.index偏移量索引文件、 .timeindex時間戳索引文件。在服務(wù)器上,每個partition是一個文件夾,每個segment是一個文件。
kafka默認(rèn)每個segment file的大小是500M,在存儲數(shù)據(jù)時,會先生成一個segment file,當(dāng)這個segment file到500M之后,再生成第二個segment file 以此類推。

segment file也有自己的命名規(guī)則,每個名字有20個字符,不夠用0填充。每個名字從0開始命名,下一個segment file文件的名字就是,上一個segment file中最后一條消息的offset。

test-topic-0  
├── 00000000000000000001.index  
├── 00000000000000000001.log  
├── 00000000000000000001.timeindex  
├── 00000000000000001018.index  
├── 00000000000000001018.log  
├── 00000000000000001018.timeindex  
├── 00000000000000002042.index  
├── 00000000000000002042.log  
├── 00000000000000002042.timeindex

.index文件
.index偏移量索引文件中保存了消息的offset,和position(表示具體消息存儲在log中的物理地址。)
用來建立消息偏移量(offset)到物理地址之間的映射關(guān)系,方便快速定位消息所在的物理文件位置。
偏移量是單調(diào)遞增的,查詢指定偏移量時,使用二分查找法來快速定位偏移量的位置。

.timeindex文件
.timeindex 時間戳索引文件,它一條數(shù)據(jù)的結(jié)構(gòu)是時間戳(8byte)+相對offset(4byte),如果要使用這個索引文件,首先需要通過時間范圍,找到對應(yīng)的相對offset,然后再去對應(yīng)的index文件找到position信息,然后才能遍歷log文件,它也是需要使用上面說的index文件的。
它的作用是可以讓用戶查詢某個時間段內(nèi)的消息,根據(jù)指定的時間戳(timestamp)來查找對應(yīng)的偏移量信息。
時間戳也保持嚴(yán)格的單調(diào)遞增,查詢指定時間戳?xí)r,也根據(jù)二分查找法來查找。

稀疏存儲

log日志默認(rèn)每寫入4K(log.index.interval.bytes設(shè)定的),會寫入一條索引信息到index文件中。
.index文件和.timeindex文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。但缺點是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。

kafka查找一條offset對應(yīng)的實際的消息時,可以通過index二分查找,獲取到最近的低位offset,然后從低位offset對應(yīng)的position開始,從實際的log文件中開始往后查找對應(yīng)的消息。如要查找offset=5的消息,先去索引文件中找到低位的3 4597這條數(shù)據(jù),然后通過4597這個字節(jié)偏移量,從log文件中從4597個字節(jié)開始讀取,直到讀取到offset=5的這條數(shù)據(jù),這比直接從log文件開始讀取要節(jié)省時間。二分查找的時間復(fù)雜度為O(lgN),如果從頭遍歷時間復(fù)雜度是O(N)。

image.png

.log文件
.log文件中并不是直接存儲數(shù)據(jù),而是通過許多的message組成。
message包含了實際的消息數(shù)據(jù),由一個固定長度的頭部和可變長度的字節(jié)數(shù)組組成:
偏移(offset)
消息長度
CRC32校驗碼
版本號
...
具體的消息: n bytes

參考鏈接:

Kafka 高效文件存儲設(shè)計特點

  1. Kafka 把 topic 中一個 parition 大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用。
  2. 通過索引信息可以快速定位 message 和確定 response 的最大大小。
  3. 通過 index 元數(shù)據(jù)全部映射到 memory,可以避免 segment file 的 IO 磁盤操作。
  4. 通過索引文件稀疏存儲,可以大幅降低 index 文件元數(shù)據(jù)占用空間大小。

Kafka 與傳統(tǒng)消息系統(tǒng)之間有三個關(guān)鍵區(qū)別

  1. Kafka 持久化日志,這些日志可以被重復(fù)讀取和無限期保留
  2. Kafka 是一個分布式系統(tǒng):它以集群的方式運(yùn)行,可以靈活伸縮,在內(nèi)部通過復(fù)制數(shù)據(jù)提升容錯能力和高可用性
  3. Kafka 支持實時的流式處理

冪等性

Idempotence (UK: /??d?m?po?t?ns/, US: /?a?d?m-/) 冪等:重復(fù)執(zhí)行,獲得結(jié)果相同。

kafka可能出現(xiàn)非冪等性的情況

image

在Consumer端offset沒有提交的時候,Consumer重啟了,這時候就會出現(xiàn)重復(fù)消費(fèi)的情況

解決方案:

  • 唯一ID+指紋碼

整體實現(xiàn)相對簡單,需要進(jìn)行數(shù)據(jù)庫寫入,利用數(shù)據(jù)庫主鍵去重,使用ID進(jìn)行分庫分表算法路由,從單庫的冪等性到多庫的冪等性

  1. 這里唯一ID一般就是業(yè)務(wù)表的主鍵,比如商品ID
  2. 指紋碼:每次操作都要生成指紋碼,可以用時間戳+業(yè)務(wù)編號+...組成,目的是保證每次操作都是正常的
image

整體流程

  1. 需要一個統(tǒng)一ID生成服務(wù),為了保證可靠性,上游服務(wù)也要有個本地ID生成服務(wù),然后發(fā)送消息給Broker
  2. 需要ID規(guī)則路由組件去監(jiān)聽消息,先入庫,如果入庫成功,證明沒有重復(fù),然后發(fā)給下游,如果發(fā)現(xiàn)庫里面有了這條消息,就不發(fā)給下游

參考鏈接:http://www.itdecent.cn/p/c48075dc4395

kafka 消息送達(dá)語義

消息送達(dá)語義是消息系統(tǒng)中一個常見的問題,主要包含三種語義:

  • At most once:消息發(fā)送或消費(fèi)至多一次
  • At least once:消息發(fā)送或消費(fèi)至少一次
  • Exactly once:消息恰好只發(fā)送一次或只消費(fèi)一次

下面我們分別從發(fā)送者和消費(fèi)者的角度來闡述這三種消息送達(dá)語義。

Producer

從Producer的角度來看:

  • At least once

意味著Producer發(fā)送完一條消息后,會確認(rèn)消息是否發(fā)送成功。如果Producer沒有收到Broker的ack確認(rèn)消息,那么會不斷重試發(fā)送消息。這樣就意味著消息可能被發(fā)送不止一次,也就存在這消息重復(fù)的可能性。

acks=-1/all。

  • At most once

意味著Producer發(fā)送完一條消息后,不會確認(rèn)消息是否成功送達(dá)。這樣從Producer的角度來看,消息僅僅被發(fā)送一次,也就存在者丟失的可能性。

我們可以通過配置Producer的以下配置項來實現(xiàn)At most once語義:

acks=0。acks配置項表示Producer期望的Broker的確認(rèn)數(shù)。默認(rèn)值為1??蛇x項:[0,1,all]。如果設(shè)置為0,表示Producer發(fā)送完消息后不會等待任何Broker的確認(rèn);設(shè)置為1表示Producer會等待Broker集群中的leader的確認(rèn)寫入消息;設(shè)置為all表示Producer需要等待Broker集群中l(wèi)eader和其所有follower的確認(rèn)寫入消息。

retries=0。retires配置項表示當(dāng)消息發(fā)送失敗時,Producer重發(fā)消息的次數(shù)。默認(rèn)值為2147483647。當(dāng)配置了acks=0時,retries配置項就失去了作用,因此這兒可以不用配置。

當(dāng)配置了retires的值后,如果沒有將max.in.flight.requests.per.connection配置的值設(shè)置為1,有可能造成消息亂序的結(jié)果。max.in.flight.requests.per.connection配置代表著一個Producer同時可以發(fā)送的未收到確認(rèn)的消息數(shù)量。如果max.in.flight.requests.per.connection數(shù)量大于1,那么可能發(fā)送了message1后,在沒有收到確認(rèn)前就發(fā)送了message2,此時message1發(fā)送失敗后觸發(fā)重試,而message2直接發(fā)送成功,就造成了Broker上消息的亂序。max.in.flight.requests.per.connection的默認(rèn)值為5。

  • Exactly once = At Least Once + 冪等性

意味著Producer消息的發(fā)送是冪等的。這意味著不論消息重發(fā)多少遍,最終Broker上記錄的只有一條不重復(fù)的數(shù)據(jù)。

在0.11版本以前的Kafka,對此是無能為力的,只能保證數(shù)據(jù)不丟失,再在下游消費(fèi)者對數(shù)據(jù)做全局去重。

Exactly once是Kafka從版本0.11之后提供的高級特性。開啟冪等性:enable.idempotence=true。所謂的冪等性就是指Producer不論向Server發(fā)送多少次重復(fù)數(shù)據(jù),Server端都只會持久化一條。冪等性結(jié)合At Least Once語義,就構(gòu)成了Kafka的Exactly Once語義。即:At Least Once + 冪等性 = Exactly Once。Kafka的冪等性實現(xiàn)其實就是將原來下游需要做的去重放在了數(shù)據(jù)上游。

我們可以通過配置Producer的以下配置項來實現(xiàn)Exactly once語義:

acks=-1/all。

enable.idempotence=true。

Idempotence (UK: /??d?m?po?t?ns/, US: /?a?d?m-/)

enable.idempotence配置項表示是否使用冪等性。當(dāng)enable.idempotence配置為true時,acks必須配置為all。并且建議max.in.flight.requests.per.connection的值小于5。

為了實現(xiàn)消息發(fā)送的冪等性,Kafka引入了兩個新的概念:

  • PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。
  • Sequence Numbler。對于每個PID,該P(yáng)roducer發(fā)送數(shù)據(jù)的每個<Topic, Partition>都對應(yīng)一個從0開始單調(diào)遞增的Sequence Number。Broker端在緩存中保存了這Sequence Numbler,對于接收的每條消息,如果其序號比Broker緩存中序號大于1則接受它,否則將其丟棄。這樣就可以實現(xiàn)了消息重復(fù)提交了。但是,只能保證單個Producer對于同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等。

開啟冪等性的Producer在初始化的時候會被分配一個PID,發(fā)往同一Partition的消息會附帶Sequence Number。而Broker端會對< PID, Partition, SeqNumber>做緩存,當(dāng)具有相同主鍵的消息提交時,Broker只會持久化一條。但是PID重啟就會變化,同時不同的Partition也具有不同主鍵,所以冪等性無法保證跨分區(qū)跨會話的Exactly Once。

Consumer

從Consumer的角度來看,

  • At least once

Consumer對一條消息可能消費(fèi)多次??紤]下面的情況:Consumer首先讀取消息,然后處理這條消息,最后提交offset。在處理消息時成功后,Consumer宕機(jī)了,此時offset還未提交,下一次讀取消息時依舊是這條消息,那么處理消息的邏輯又將被執(zhí)行一遍,這就是At least once消費(fèi)。

enable.auto.commit=false。禁止后臺自動提交offset。
手動調(diào)用consumer.commitSync()來提交offset。手動調(diào)用保證了offset即時更新。
通過手動提交offset,就可以實現(xiàn)Consumer At least once語義。

  • At most once

著Consumer對一條消息最多消費(fèi)一次,因此有可能存在消息消費(fèi)失敗依舊提交offset的情況??紤]下面的情況:Consumer首先讀取消息,然后提交offset,最后處理這條消息。在處理消息時,Consumer宕機(jī)了,此時offset已經(jīng)提交,下一次讀取消息時讀到的是下一條消息了,這就是At most once消費(fèi)。

enable.auto.commit=true。后臺定時提交offset。
auto.commit.interval.ms配置為一個很小的數(shù)值。auto.commit.interval.ms表示后臺提交offset的時間間隔。
通過自動提交offset,并且將定時提交時間間隔設(shè)置的很小,就可以實現(xiàn)Consumer At most once語義。

  • Exactly once

isolation.level=read_committed

意味著消息的消費(fèi)處理邏輯和offset的提交是原子性的,即消息消費(fèi)成功后offset改變,消息消費(fèi)失敗offset也能回滾。

isolation.level表示何種類型的message對Consumer可見。

一個常見的Exactly once的的使用場景是:當(dāng)我們訂閱了一個topic,然后往另一個topic里寫入數(shù)據(jù)時,我們希望這兩個操作是原子性的,即如果寫入消息失敗,那么我們希望讀取消息的offset可以回滾。

此時可以通過Kafka的Transaction特性來實現(xiàn)。Kafka是在版本0.11之后開始提供事務(wù)特性的。我們可以將Consumer讀取數(shù)據(jù)和Producer寫入數(shù)據(jù)放進(jìn)一個同一個事務(wù)中,在事務(wù)沒有成功結(jié)束前,所有的這個事務(wù)中包含的消息都被標(biāo)記為uncommitted。只有事務(wù)執(zhí)行成功后,所有的消息才會被標(biāo)記為committed。

我們知道,offset信息是以消息的方式存儲在Broker的__consumer_offsets topic中的。因此在事務(wù)開始后,Consumer讀取消息后,所有的offset消息都是uncommitted狀態(tài)。所有的Producer寫入的消息也都是uncommitted狀態(tài)。

而Consumer可以通過配置isolation.level來決定uncommitted狀態(tài)的message是否對Consumer可見。isolation.level擁有兩個可選值:read_committed和read_uncommitted。默認(rèn)值為read_uncommitted。

當(dāng)我們將isolation.level配置為read_committed后,那么所有事務(wù)未提交的數(shù)據(jù)就都對Consumer不可見了,也就實現(xiàn)了Kafka的事務(wù)語義。

kafka 事務(wù)

Kafka事務(wù)的使用

  • 生產(chǎn)者發(fā)送多條消息可以封裝在一個事務(wù)中,形成一個原子操作。多條消息要么都發(fā)送成功,要么都發(fā)送失敗。
  • read-process-write模式:將消息消費(fèi)和生產(chǎn)封裝在一個事務(wù)中,形成一個原子操作。在一個流式處理的應(yīng)用中,常常一個服務(wù)需要從上游接收消息,然后經(jīng)過處理后送達(dá)到下游,這就對應(yīng)著消息的消費(fèi)和生成。

Kafka事務(wù)配置

  • 對于Producer,需要設(shè)置transactional.id屬性,這個屬性的作用下文會提到。設(shè)置了transactional.id屬性后,enable.idempotence屬性會自動設(shè)置為true。
  • 對于Consumer,需要設(shè)置isolation.level = read_committed,這樣Consumer只會讀取已經(jīng)提交了事務(wù)的消息。另外,需要設(shè)置enable.auto.commit = false來關(guān)閉自動提交Offset功能。

Kafka事務(wù)特性

Kafka的事務(wù)特性本質(zhì)上代表了三個功能:原子寫操作,拒絕僵尸實例(Zombie fencing)和讀事務(wù)消息。

  • 原子寫
    Kafka的事務(wù)特性本質(zhì)上是支持了Kafka跨分區(qū)和Topic的原子寫操作。在同一個事務(wù)中的消息要么同時寫入成功,要么同時寫入失敗。我們知道,Kafka中的Offset信息存儲在一個名為_consumed_offsets的Topic中,因此read-process-write模式,除了向目標(biāo)Topic寫入消息,還會向_consumed_offsets 這個Topic中寫入已經(jīng)消費(fèi)的Offsets數(shù)據(jù)。因此read-process-write本質(zhì)上就是跨分區(qū)和Topic的原子寫操作。Kafka的事務(wù)特性就是要確??绶謪^(qū)的多個寫操作的原子性。

  • 拒絕僵尸實例(Zombie fencing)
    在分布式系統(tǒng)中,一個instance的宕機(jī)或失聯(lián),集群往往會自動啟動一個新的實例來代替它的工作。此時若原實例恢復(fù)了,那么集群中就產(chǎn)生了兩個具有相同職責(zé)的實例,此時前一個instance就被稱為“僵尸實例(Zombie Instance)”。在Kafka中,兩個相同的producer同時處理消息并生產(chǎn)出重復(fù)的消息(read-process-write模式),這樣就嚴(yán)重違反了Exactly Once Processing的語義。這就是僵尸實例問題。

Kafka事務(wù)特性通過epoch屬性來解決僵尸實例問題。所有具有相同transaction-id的Producer都會被分配相同的pid,同時每一個Producer還會被分配一個遞增的epoch。Kafka收到事務(wù)提交請求時,如果檢查當(dāng)前事務(wù)提交者的epoch不是最新的,那么就會拒絕該P(yáng)roducer的請求。從而達(dá)成拒絕僵尸實例的目標(biāo)。

  • 讀事務(wù)消息
    為了保證事務(wù)特性,Consumer如果設(shè)置了isolation.level = read_committed,那么它只會讀取已經(jīng)提交了的消息。在Producer成功提交事務(wù)后,Kafka會將所有該事務(wù)中的消息的Transaction Marker從uncommitted標(biāo)記為committed狀態(tài),從而所有的Consumer都能夠消費(fèi)。

Kafka事務(wù)實現(xiàn)

Kafka為了支持事務(wù)特性,引入一個新的組件:Transaction Coordinator。主要負(fù)責(zé)分配pid,記錄事務(wù)狀態(tài)等操作。下面時Kafka開啟一個事務(wù)到提交一個事務(wù)的流程圖:

image

主要分為以下步驟:

1. 查找Tranaction Corordinator

Producer向任意一個brokers發(fā)送 FindCoordinatorRequest請求來獲取Transaction Coordinator的地址。

2. 初始化事務(wù) initTransaction

Producer發(fā)送InitpidRequest給Transaction Coordinator,獲取pid。Transaction Coordinator在Transaciton Log中記錄這<TransactionId,pid>的映射關(guān)系。另外,它還會做兩件事:

  • 恢復(fù)(Commit或Abort)之前的Producer未完成的事務(wù)
  • 對PID對應(yīng)的epoch進(jìn)行遞增,這樣可以保證同一個app的不同實例對應(yīng)的PID是一樣,而epoch是不同的。

只要開啟了冪等特性即必須執(zhí)行InitpidRequest,而無須考慮該P(yáng)roducer是否開啟了事務(wù)特性。

3. 開始事務(wù)beginTransaction

執(zhí)行Producer的beginTransacion(),它的作用是Producer在本地記錄下這個transaction的狀態(tài)為開始狀態(tài)。這個操作并沒有通知Transaction Coordinator,因為Transaction Coordinator只有在Producer發(fā)送第一條消息后才認(rèn)為事務(wù)已經(jīng)開啟。

4. read-process-write流程

一旦Producer開始發(fā)送消息,Transaction Coordinator會將該<Transaction, Topic, Partition>存于Transaction Log內(nèi),并將其狀態(tài)置為BEGIN。另外,如果該<Topic, Partition>為該事務(wù)中第一個<Topic, Partition>,Transaction Coordinator還會啟動對該事務(wù)的計時(每個事務(wù)都有自己的超時時間)。

在注冊<Transaction, Topic, Partition>到Transaction Log后,生產(chǎn)者發(fā)送數(shù)據(jù),雖然沒有還沒有執(zhí)行commit或者abort,但是此時消息已經(jīng)保存到Broker上了。即使后面執(zhí)行abort,消息也不會刪除,只是更改狀態(tài)字段標(biāo)識消息為abort狀態(tài)。

5. 事務(wù)提交或終結(jié) commitTransaction/abortTransaction

在Producer執(zhí)行commitTransaction/abortTransaction時,Transaction Coordinator會執(zhí)行一個兩階段提交:

  • 第一階段,將Transaction Log內(nèi)的該事務(wù)狀態(tài)設(shè)置為PREPARE_COMMITPREPARE_ABORT
  • 第二階段,將Transaction Marker寫入該事務(wù)涉及到的所有消息(即將消息標(biāo)記為committedaborted)。這一步驟Transaction Coordinator會發(fā)送給當(dāng)前事務(wù)涉及到的每個<Topic, Partition>的Leader,Broker收到該請求后,會將對應(yīng)的Transaction Marker控制信息寫入日志。

一旦Transaction Marker寫入完成,Transaction Coordinator會將最終的COMPLETE_COMMITCOMPLETE_ABORT狀態(tài)寫入Transaction Log中以標(biāo)明該事務(wù)結(jié)束。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 夜鶯2517閱讀 128,087評論 1 9
  • 版本:ios 1.2.1 亮點: 1.app角標(biāo)可以實時更新天氣溫度或選擇空氣質(zhì)量,建議處女座就不要選了,不然老想...
    我就是沉沉閱讀 7,361評論 1 6
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月,有人笑有人哭,有人歡樂有人憂愁,有人驚喜有人失落,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,814評論 28 54
  • 兔子雖然是枚小碩 但學(xué)校的碩士四人寢不夠 就被分到了博士樓里 兩人一間 在學(xué)校的最西邊 靠山 兔子的室友身體不好 ...
    待業(yè)的兔子閱讀 2,759評論 2 9

友情鏈接更多精彩內(nèi)容