Kafka的重復(fù)、丟數(shù)據(jù)及順序消費(fèi)等問(wèn)題

順序消費(fèi)

①、kafka的順序消息僅僅是通過(guò)partitionKey,將某類消息寫入同一個(gè)partition,一個(gè)partition只能對(duì)應(yīng)一個(gè)消費(fèi)線程,以保證數(shù)據(jù)有序。

②、除了發(fā)送消息需要指定partitionKey外,producer和consumer實(shí)例化無(wú)區(qū)別。

③、kafka broker宕機(jī),kafka會(huì)有自選擇,所以宕機(jī)不會(huì)減少partition數(shù)量,也就不會(huì)影響partitionKey的sharding。

  • 但是消費(fèi)者里可能會(huì)有多個(gè)線程來(lái)并發(fā)來(lái)處理消息。因?yàn)槿绻M(fèi)者是單線程消費(fèi)數(shù)據(jù),那么這個(gè)吞吐量太低了。而多個(gè)線程并發(fā)的話,順序可能就亂掉了
  • 寫N個(gè)queue,將具有相同key的數(shù)據(jù)都存儲(chǔ)在同一個(gè)queue,然后對(duì)于N個(gè)線程,每個(gè)線程分別消費(fèi)一個(gè)queue即可。

丟數(shù)據(jù)

acks設(shè)置為0:broker接收消息立即返回,還沒(méi)寫入磁盤,容易丟失數(shù)據(jù)

acks設(shè)置為1:等待broker的ack,如果leader落盤了就返回ack,如果follower同步完成前l(fā)eader掛了就會(huì)丟失未同步的數(shù)據(jù)(follower選舉)

acks設(shè)置為-1:等待所有l(wèi)eader和follower都落盤后返回ack,如果follower已同步,但是broker返回ack前l(fā)eader掛了,則會(huì)重復(fù)發(fā)送消息。

consumer自動(dòng)提交offset,但其實(shí)未處理好消息,容易丟數(shù)據(jù)??梢赃x擇手動(dòng)提交,處理完后再提交offset

手動(dòng)提交 offset 的方法有兩種:分別是 commitSync(同步提交)和 commitAsync(異步 提交)。兩者的相同點(diǎn)是,都會(huì)將本次 poll 的一批數(shù)據(jù)最高的偏移量提交;不同點(diǎn)是, commitSync 阻塞當(dāng)前線程,一直到提交成功,并且會(huì)自動(dòng)失敗重試(由不可控因素導(dǎo)致, 也會(huì)出現(xiàn)提交失敗);而 commitAsync 則沒(méi)有失敗重試機(jī)制,故有可能提交失敗。

  • 給 topic設(shè)置 replication.factor ,這個(gè)值必須大于 1,保證每個(gè) partition 必須至少有 2 個(gè)副本
  • 在 kafka 服務(wù)端設(shè)置 min.insync.replicas 參數(shù),這個(gè)值必須大于 1,這個(gè)是要求一個(gè)leader至少感知到有至少一個(gè)follower還跟自己保持聯(lián)系,沒(méi)掉隊(duì),這樣才能確保 leader掛了還有一個(gè)follower,保證至少一個(gè) follower能和leader保持正常的數(shù)據(jù)同步。

0.9版本的kafka改進(jìn)了coordinator的設(shè)計(jì),提出了group coordinator——每個(gè)consumer group都會(huì)被分配一個(gè)這樣的coordinator用于組管理和位移管理。這個(gè)group coordinator比原來(lái)承擔(dān)了更多的責(zé)任,比如組成員管理、位移提交保護(hù)機(jī)制等。當(dāng)新版本consumer group的第一個(gè)consumer啟動(dòng)的時(shí)候,它會(huì)去和kafka server確定誰(shuí)是它們組的coordinator。之后該group內(nèi)的所有成員都會(huì)和該coordinator進(jìn)行協(xié)調(diào)通信。顯而易見(jiàn),這種coordinator設(shè)計(jì)不再需要zookeeper了,性能上可以得到很大的提升。

每個(gè) Group 都會(huì)選擇一個(gè) Coordinator 來(lái)完成自己組內(nèi)各 PartitionOffset 信息,選擇的規(guī)則如下: 1. 計(jì)算 Group 對(duì)應(yīng)在 __consumer_offsets 上的 Partition 2. 根據(jù)對(duì)應(yīng)的Partition尋找該P(yáng)artition的leader所對(duì)應(yīng)的Broker,該Broker上的Group Coordinator即就是該Group的Coordinator

Rebalance

RangeAssignor:

//計(jì)算每個(gè)consumer分到的partition數(shù)量
                int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
                //計(jì)算平均以后剩余partition數(shù)量
                int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

                //從0開始作為Partition Index, 構(gòu)造TopicPartition對(duì)象
                List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
                for (int i = 0, n = consumersForTopic.size(); i < n; i++) {//對(duì)于當(dāng)前這個(gè)topic的每一個(gè)consumer
                    //一定是前面幾個(gè)consumer會(huì)被分配一個(gè)額外的TopicPartitiion
                    int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                    int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                    assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
                }

numPartitionsPerConsumer=counsumer/partitions——》5/3=1,每個(gè)消費(fèi)者至少被分配一個(gè)partition

consumersWithExtraPartition=counsumer%partitions——》5%3=2

i=0,start=0,length=2;

i=1,start=2,length=2;

i=2,start=4,length=1;

如果是4個(gè)partitions和3個(gè)consumer

i=0,start=0,length=2;

i=1,start=2,length=1;

i=2,start=3,length=1;

RoundRobin:

for(每一個(gè)TopicPartition)

? 以RoundRobin的方式選擇一個(gè)訂閱了這個(gè)Topic的Consumer,將這個(gè)TopicPartition分派給這個(gè)Consumer end

StickyAssignor分配策略

“sticky”這個(gè)單詞可以翻譯為“粘性的”,Kafka從0.11.x版本開始引入這種分配策略,它主要有兩個(gè)目的:

  1. 分區(qū)的分配要盡可能的均勻;
  2. 分區(qū)的分配盡可能的與上次分配的保持相同。
    當(dāng)兩者發(fā)生沖突時(shí),第一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo)。鑒于這兩個(gè)目標(biāo),StickyAssignor策略的具體實(shí)現(xiàn)要比RangeAssignor和RoundRobinAssignor這兩種分配策略要復(fù)雜很多。
image-20200909225735741
image-20200909230725291

為什么會(huì)重復(fù)消費(fèi):第一種可能是生產(chǎn)者重復(fù)發(fā)送消息。第二種可能是消費(fèi)者手動(dòng)提交時(shí)掛掉了,導(dǎo)致消費(fèi)了數(shù)據(jù)但是沒(méi)有提交offset。

  • 拿數(shù)據(jù)要寫庫(kù),首先檢查下主鍵,如果有數(shù)據(jù),則不插入,進(jìn)行一次update
  • 如果是寫 redis,就沒(méi)問(wèn)題,反正每次都是 set ,天然冪等性
  • 生產(chǎn)者發(fā)送消息的時(shí)候帶上一個(gè)全局唯一的id,消費(fèi)者拿到消息后,先根據(jù)這個(gè)id去 redis里查一下,之前有沒(méi)消費(fèi)過(guò),沒(méi)有消費(fèi)過(guò)就處理,并且寫入這個(gè) id 到 redis,如果消費(fèi)過(guò)了,則不處理。
  • 基于數(shù)據(jù)庫(kù)的唯一鍵

為什么會(huì)丟失數(shù)據(jù):第一種可能是ack非-1的情況下,follower未同步完全,leader掛了。第二種可能是消費(fèi)者自動(dòng)提交,但其實(shí)還沒(méi)完成消費(fèi)。

怎么保證生產(chǎn)者消息不重復(fù),0.11后,生產(chǎn)者會(huì)生成pid,和一個(gè)sequence number,通過(guò)pid sequence number brokerid作為key,如果在partition中已經(jīng)存在,則只持久化一條。且Producer重啟可以通過(guò)TransactionID拿到原來(lái)的pid,所以可以跨會(huì)話的保持一致

保證順序消費(fèi):需要保證順序的消息發(fā)到同一個(gè)partition中,consumer會(huì)自己根據(jù)順序消費(fèi)

ISR:

0.9.0.0 版本之前判斷副本之間是否同步,主要是靠參數(shù) replica.lag.max.messages 決定的,即允許 follower 副本落后 leader 副本的消息數(shù)量,超過(guò)這個(gè)數(shù)量后,follower 會(huì)被踢出 ISR。

replica.lag.max.messages 也很難在生產(chǎn)上給出一個(gè)合理值,如果給的小,會(huì)導(dǎo)致 follower 頻繁被踢出 ISR,如果給的大,broker 發(fā)生宕機(jī)導(dǎo)致 leader 變更時(shí),肯能會(huì)發(fā)生日志截?cái)?,?dǎo)致消息嚴(yán)重丟失的問(wèn)題。

在 0.9.0.0 版本之后,Kafka 給出了一個(gè)更好的解決方案,去除了 replica.lag.max.messages,,用 replica.lag.time.max.ms 參數(shù)來(lái)代替,該參數(shù)的意思指的是允許 follower 副本不同步消息的最大時(shí)間值,即只要在 replica.lag.time.max.ms 時(shí)間內(nèi) follower 有同步消息,即認(rèn)為該 follower 處于 ISR 中,這就很好地避免了在某個(gè)瞬間生產(chǎn)者一下子發(fā)送大量消息到 leader 副本導(dǎo)致該分區(qū) ISR 頻繁收縮與擴(kuò)張的問(wèn)題了。

controller

Kafka集群中多個(gè)broker,有一個(gè)會(huì)被選舉為controller leader,負(fù)責(zé)管理整個(gè)集群中分區(qū)和副本的狀態(tài),比如partition的leader 副本故障,由controller 負(fù)責(zé)為該partition重新選舉新的leader 副本;當(dāng)檢測(cè)到ISR列表發(fā)生變化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某個(gè)topic分區(qū)的時(shí)候也會(huì)由controller管理分區(qū)的重新分配工作

實(shí)際上,Broker 在啟動(dòng)時(shí),會(huì)嘗試去 ZooKeeper 中創(chuàng)建 /controller 節(jié)點(diǎn)。Kafka 當(dāng)前選舉控制器的規(guī)則是:第一個(gè)成功創(chuàng)建 /controller 節(jié)點(diǎn)的 Broker 會(huì)被指定為控制器。

  • 主題管理(創(chuàng)建、刪除、增加分區(qū))
  • 分區(qū)重分配
  • 所有主題信息。包括具體的分區(qū)信息,比如領(lǐng)導(dǎo)者副本是誰(shuí),ISR 集合中有哪些副本等,所有 Broker 信息。包括當(dāng)前都有哪些運(yùn)行中的 Broker,哪些正在關(guān)閉中的 Broker 等。所有涉及運(yùn)維任務(wù)的分區(qū)。包括當(dāng)前正在進(jìn)行 Preferred 領(lǐng)導(dǎo)者選舉以及分區(qū)重分配的分區(qū)列表。

故障轉(zhuǎn)移

? 當(dāng) Broker 0 宕機(jī)后,ZooKeeper 通過(guò) Watch 機(jī)制感知到并刪除了 /controller 臨時(shí)節(jié)點(diǎn)。之后,所有存活的 Broker 開始競(jìng)選新的控制器身份。Broker 3 最終贏得了選舉,成功地在 ZooKeeper 上重建了 /controller 節(jié)點(diǎn)。之后,Broker 3 會(huì)從 ZooKeeper 中讀取集群元數(shù)據(jù)信息,并初始化到自己的緩存中。

img
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容