簡單總結(jié):
消費端重復(fù)消費:建立去重表
消費端丟失數(shù)據(jù):關(guān)閉自動提交offset,處理完之后受到移位,enable.auto.commit=false 關(guān)閉自動提交位移
生產(chǎn)端重復(fù)發(fā)送:消費端消費之前從去重表中判重
生產(chǎn)端丟失數(shù)據(jù):這個是最麻煩的情況
解決策略:
1.異步方式緩沖區(qū)滿了,就阻塞在那,等著緩沖區(qū)可用,不能清空緩沖區(qū)
2.發(fā)送消息之后回調(diào)函數(shù),發(fā)送成功就發(fā)送下一條,發(fā)送失敗就記在日志中,等著定時腳本來掃描(發(fā)送失敗可能并不真的發(fā)送失敗,只是沒收到反饋,定時腳本可能會重發(fā))
數(shù)據(jù)丟失情況:
1)使用同步模式的時候,有3種狀態(tài)保證消息被安全生產(chǎn),在配置為1(只保證寫入leader成功)的話,如果剛好leader partition掛了,數(shù)據(jù)就會丟失。
2)還有一種情況可能會丟失消息,就是使用異步模式的時候,當(dāng)緩沖區(qū)滿了,如果配置為0(還沒有收到確認的情況下,緩沖池一滿,就清空緩沖池里的消息),數(shù)據(jù)就會被立即丟棄掉。
只要能避免上述兩種情況,那么就可以保證消息不會被丟失。
1)就是說在同步模式的時候,確認機制設(shè)置為-1,也就是讓消息寫入leader和所有的副本。
2)還有,在異步模式下,如果消息發(fā)出去了,但還沒有收到確認的時候,緩沖池滿了,在配置文件中設(shè)置成不限制阻塞超時的時間,也就說讓生產(chǎn)端一直阻塞,這樣也能保證數(shù)據(jù)不會丟失。
ack:
ack確認機制設(shè)置為0,表示不等待響應(yīng),不等待borker的確認信息,最小延遲,producer無法知道消息是否發(fā)生成功,消息可能丟失,但具有最大吞吐量。
ack確認機制設(shè)置為-1,也就是讓消息寫入leader和所有的副本,ISR列表中的所有replica都返回確認消息。
ack確認機制設(shè)置為1,leader已經(jīng)接收了數(shù)據(jù)的確認信息,replica異步拉取消息,比較折中。
ack確認機制設(shè)置為2,表示producer寫partition leader和其他一個follower成功的時候,broker就返回成功,無論其他的partition follower是否寫成功。
ack確認機制設(shè)置為 "all" 即所有副本都同步到數(shù)據(jù)時send方法才返回, 以此來完全判斷數(shù)據(jù)是否發(fā)送成功, 理論上來講數(shù)據(jù)不會丟失。
min.insync.replicas=1 意思是至少有1個replica返回成功,否則product異常
總結(jié):
消息的完整性和系統(tǒng)的吞吐量是互斥的,為了確保消息不丟失就必然會損失系統(tǒng)的吞吐量
producer:
1、ack設(shè)置-1
2、設(shè)置副本同步成功的最小同步個數(shù)為副本數(shù)-1
3、加大重試次數(shù)
4、同步發(fā)送
5、對于單條數(shù)據(jù)過大,要設(shè)置可接收的單條數(shù)據(jù)的大小
6、對于異步發(fā)送,通過回調(diào)函數(shù)來感知丟消息,使用KafkaProducer.send(record, callback)方法而不是send(record)方法
7、配置不允許非ISR(In-Sync Replicas,副本同步隊列)集合中的副本當(dāng)leader。所有的副本(replicas)統(tǒng)稱為 Assigned Replicas,即 AR
8、客戶端緩沖區(qū)滿了也可能會丟消息;或者異步情況下消息在客戶端緩沖區(qū)還未發(fā)送,客戶端就宕機
9、block.on.buffer.full = true
consumer:
1、enable.auto.commit=false 關(guān)閉自動提交位移
unclean.leader.election.enable 設(shè)置為 false(默認參數(shù)為 true),意思是,當(dāng)存有你最新一條記錄的 replication 宕機的時候,Kafka 自己會選舉出一個主節(jié)點,如果默認允許還未同步你最新數(shù)據(jù)的 replication 所在的節(jié)點被選舉為主節(jié)點的話,你的數(shù)據(jù)將會丟失,因此這里應(yīng)該按需將參數(shù)調(diào)控為 false;
retries設(shè)置大一些。設(shè)置大于0的值將使客戶端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗。注意,這些重試與客戶端接收到發(fā)送錯誤時的重試沒有什么不同。允許重試將潛在的改變數(shù)據(jù)的順序,如果這兩個消息記錄都是發(fā)送到同一個partition,則第一個消息失敗第二個發(fā)送成功,則第二條消息會比第一條消息出現(xiàn)要早。
replication.factor > min.insync.replicas。如果兩者相等,當(dāng)一個副本掛掉了分區(qū)也就沒法正常工作了。通常設(shè)置replication.factor = min.insync.replicas + 1即可。
同一分區(qū)消息亂序:
假設(shè)a,b兩條消息,a先發(fā)送后由于發(fā)送失敗重試,這時順序就會在b的消息后面,可以設(shè)置max.in.flight.requests.per.connection=1來避免
max.in.flight.requests.per.connection:限制客戶端在單個連接上能夠發(fā)送的未響應(yīng)請求的個數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請求之前client不能再向同一個broker發(fā)送請求,但吞吐量會下降
0.11.0之后的版本:
冪等性發(fā)送:
引入了Producer ID(PID)和Sequence Number實現(xiàn)Producer的冪等語義。
- Producer ID:每個新的Producer在初始化的時候會被分配一個唯一的PID
- Sequence Number:對于每個PID,該Producer發(fā)送數(shù)據(jù)的每個<Topic, Partition>都對應(yīng)一個從0開始單調(diào)遞增的Sequence Number。
Broker端也會為每個<PID, Topic, Partition>維護一個序號,并且每次Commit一條消息時將其對應(yīng)序號遞增。對于接收的每條消息,如果其序號比Broker維護的序號(即最后一次Commit的消息的序號)大一,則Broker會接受它,否則將其丟棄:
- 如果消息序號比Broker維護的序號大一以上,說明中間有數(shù)據(jù)尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
- 如果消息序號小于等于Broker維護的序號,說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
這種機制很好的解決了數(shù)據(jù)重復(fù)和數(shù)據(jù)亂序的問題。
事務(wù)機制:
多個操作要么全部成功要么全部失敗。Kafka事務(wù)的本質(zhì)是,將一組寫操作(如果有)對應(yīng)的消息與一組讀操作(如果有)對應(yīng)的Offset的更新進行同樣的標(biāo)記(即Transaction Marker)來實現(xiàn)事務(wù)中涉及的所有讀寫操作同時對外可見或同時對外不可見。
補充ISR:
HW 俗稱高水位,HighWatermark 的縮寫,取一個 partition 對應(yīng)的 ISR 中最小的 LEO 作為 HW,consumer 最多只能消費到 HW 所在的位置。另外每個 replica 都有 HW,leader 和 follower 各自負責(zé)更新自己的 HW 的狀態(tài)。對于 leader 新寫入的消息,consumer 不能立刻消費,leader 會等待該消息被所有 ISR 中的 replicas 同步后更新 HW,此時消息才能被 consumer 消費。這樣就保證了如果 leader 所在的 broker 失效,該消息仍然可以從新選舉的 leader 中獲取。對于來自內(nèi)部 broKer 的讀取請求,沒有 HW 的限制。
下圖詳細的說明了當(dāng) producer 生產(chǎn)消息至 broker 后,ISR 以及 HW 和 LEO 的流轉(zhuǎn)過程:

由此可見,Kafka 的復(fù)制機制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實上,同步復(fù)制要求所有能工作的 follower 都復(fù)制完,這條消息才會被 commit,這種復(fù)制方式極大的影響了吞吐率。而異步復(fù)制方式下,follower 異步的從 leader 復(fù)制數(shù)據(jù),數(shù)據(jù)只要被 leader 寫入 log 就被認為已經(jīng) commit,這種情況下如果 follower 都還沒有復(fù)制完,落后于 leader 時,突然 leader 宕機,則會丟失數(shù)據(jù)。而 Kafka 的這種使用 ISR 的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。
Kafka 的 ISR 的管理最終都會反饋到 Zookeeper 節(jié)點上。具體位置為:/brokers/topics/[topic]/partitions/[partition]/state。目前有兩個地方會對這個 Zookeeper 的節(jié)點進行維護:
1.Controller 來維護:Kafka 集群中的其中一個 Broker 會被選舉為 Controller,主要負責(zé) Partition 管理和副本狀態(tài)管理,也會執(zhí)行類似于重分配 partition 之類的管理任務(wù)。在符合某些特定條件下,Controller 下的 LeaderSelector 會選舉新的 leader,ISR 和新的 leader_epoch 及 controller_epoch 寫入 Zookeeper 的相關(guān)節(jié)點中。同時發(fā)起 LeaderAndIsrRequest 通知所有的 replicas。
2.leader 來維護:leader 有單獨的線程定期檢測 ISR 中 follower 是否脫離 ISR, 如果發(fā)現(xiàn) ISR 變化,則會將新的 ISR 的信息返回到 Zookeeper 的相關(guān)節(jié)點中。
參考文獻:
https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability