Kafka學(xué)習(xí)之怎么保證不丟,不重復(fù)消費(fèi)數(shù)據(jù)
1
消費(fèi)者pull數(shù)據(jù)時(shí),出現(xiàn)數(shù)據(jù)丟失?
- 自動(dòng)提交offset:可能出現(xiàn)數(shù)據(jù)丟失,當(dāng)消費(fèi)者的數(shù)量>partition數(shù)量時(shí),出現(xiàn)提交異常。。。多個(gè)consumer group 同時(shí)消費(fèi)一個(gè)分區(qū)的數(shù)據(jù),其中一個(gè)先提交了,另一個(gè)就丟失了。
- 手動(dòng)提交offset : 防止線程安全問題。
如果希望能夠嚴(yán)格的不丟數(shù)據(jù),解決辦法有兩個(gè):
手動(dòng)commit offset,并針對(duì)partition_num啟同樣數(shù)目的consumer進(jìn)程,這樣就能保證一個(gè)consumer進(jìn)程占有一個(gè)partition,commit offset的時(shí)候不會(huì)影響別的partition的offset。但這個(gè)方法比較局限,因?yàn)閜artition和consumer進(jìn)程的數(shù)目必須嚴(yán)格對(duì)應(yīng)。
另一個(gè)方法同樣需要手動(dòng)commit offset,另外在consumer端再將所有fetch到的數(shù)據(jù)緩存到queue里,當(dāng)把queue里所有的數(shù)據(jù)處理完之后,再批量提交offset,這樣就能保證只有處理完的數(shù)據(jù)才被commit。當(dāng)然這只是基本思路,實(shí)際上操作起來不是這么簡單,具體做法以后我再另開一篇。
2
afka作為當(dāng)下流行的高并發(fā)消息中間件,大量用于數(shù)據(jù)采集,實(shí)時(shí)處理等場(chǎng)景,我們?cè)谙硎芩母卟l(fā),高可靠時(shí),還是不得不面對(duì)可能存在的問題,最常見的就是丟包,重發(fā)問題。
2.1 丟包問題
丟包問題:消息推送服務(wù),每天早上,手機(jī)上各終端都會(huì)給用戶推送消息,這時(shí)候流量劇增,可能會(huì)出現(xiàn)kafka發(fā)送數(shù)據(jù)過快,導(dǎo)致服務(wù)器網(wǎng)卡爆滿,或者磁盤處于繁忙狀態(tài),可能會(huì)出現(xiàn)丟包現(xiàn)象。
解決辦法:
限速,啟用重試機(jī)制,重試間隔時(shí)間設(shè)置長一些,Kafka設(shè)置acks=all
首先對(duì)kafka進(jìn)行限速, 其次啟用重試機(jī)制,重試間隔時(shí)間設(shè)置長一些,最后Kafka設(shè)置acks=all,即需要相應(yīng)的所有處于ISR的分區(qū)都確認(rèn)收到該消息后,才算發(fā)送成功。
檢測(cè)方法:使用重放機(jī)制,查看問題所在。
kafka配置如下:
2.2 重發(fā)問題
重發(fā)問題:當(dāng)消費(fèi)者重新分配partition的時(shí)候,可能出現(xiàn)從頭開始消費(fèi)的情況,導(dǎo)致重發(fā)問題。當(dāng)消費(fèi)者消費(fèi)的速度很慢的時(shí)候,可能在一個(gè)session周期內(nèi)還未完成,導(dǎo)致心跳機(jī)制檢測(cè)報(bào)告出問題。
底層根本原因:已經(jīng)消費(fèi)了數(shù)據(jù),但是offset沒提交。
配置問題:設(shè)置了offset自動(dòng)提交
解決辦法:至少發(fā)一次+去重操作(冪等性)
問題場(chǎng)景:
1.設(shè)置offset為自動(dòng)提交,正在消費(fèi)數(shù)據(jù),kill消費(fèi)者線程;
2.設(shè)置offset為自動(dòng)提交,關(guān)閉kafka時(shí),如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會(huì)重復(fù)消費(fèi);
3.消費(fèi)kafka與業(yè)務(wù)邏輯在一個(gè)線程中處理,可能出現(xiàn)消費(fèi)程序業(yè)務(wù)處理邏輯阻塞超時(shí),導(dǎo)致一個(gè)周期內(nèi),offset還未提交;繼而重復(fù)消費(fèi),但是業(yè)務(wù)邏輯可能采用發(fā)送kafka或者其他無法回滾的方式;
重復(fù)消費(fèi)最常見的原因:
re-balance問題,通常會(huì)遇到消費(fèi)的數(shù)據(jù),處理很耗時(shí),導(dǎo)致超過了Kafka的session timeout時(shí)間(0.10.x版本默認(rèn)是30秒),那么就會(huì)re-balance重平衡,此時(shí)有一定幾率offset沒提交,會(huì)導(dǎo)致重平衡后重復(fù)消費(fèi)。
去重問題:消息可以使用唯一id標(biāo)識(shí)
保證不丟失消息:
生產(chǎn)者(ack=all 代表至少成功發(fā)送一次)
消費(fèi)者 (offset手動(dòng)提交,業(yè)務(wù)邏輯成功處理后,提交offset)
保證不重復(fù)消費(fèi):
落表(主鍵或者唯一索引的方式,避免重復(fù)數(shù)據(jù))
業(yè)務(wù)邏輯處理(選擇唯一主鍵存儲(chǔ)到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進(jìn)行業(yè)務(wù)邏輯處理)