問題描述
從kafka的架構(gòu)中,嚴(yán)格的一次消費(fèi)據(jù)我所知應(yīng)該是比較困難的,特別是在各種異常情況下。我們在工作中,由于性能原因,要求消費(fèi)者是多進(jìn)程+多線程的方式。我們發(fā)現(xiàn)在單個(gè)進(jìn)程的情況下,程序一般不會(huì)重復(fù)消費(fèi),但是在多進(jìn)程,哪怕是2個(gè)進(jìn)程,然后每個(gè)進(jìn)程有幾個(gè)線程情況下,幾乎每次都有重復(fù)消費(fèi)的問題。
這個(gè)問題我們查了比較久,問題主要出現(xiàn)在消費(fèi)者平衡的時(shí)候,我們設(shè)置的起始o(jì)ffset為開始位置,如果不設(shè)置初始位置顯然有問題,這個(gè)沒辦法改。
我們采用的是高級API,采用的版本是0.8.1版本,我們在消費(fèi)kafka的數(shù)據(jù)的時(shí)候,有個(gè)批量值,如果讀不到這么多,就等待一定時(shí)間自動(dòng)超時(shí)。在這個(gè)超時(shí)的時(shí)間內(nèi),如果另外的線程再讀這個(gè)topic(當(dāng)然本文說的多線程+多進(jìn)程消費(fèi)的是同一個(gè)topic且是同一個(gè)組)。那么顯然原來線程假設(shè)為A線程,讀到數(shù)據(jù)由于還在等待超時(shí)中,那么顯然還未處理,未處理數(shù)據(jù)我們是不提交是必須處理完成再提交,以保障至少處理一次,所以新來讀這個(gè)topic的線程從這個(gè)topic的此partition中讀取數(shù)據(jù),會(huì)從開始位置讀取。
解決辦法
問題根源在于Kafka的平衡機(jī)制,Kafka什么時(shí)候平衡我們無從知曉,而消費(fèi)又是沒平衡好就開始消費(fèi)了,所以解決也從這個(gè)角度來解決。
和網(wǎng)友交流了下,了解到,新版本的API在平衡的時(shí)候可以注冊一個(gè)對象,在平衡前和后可以調(diào)用這個(gè)對象的方法,我們在這個(gè)方法里面將此topic的stream提交(這可能會(huì)造成數(shù)據(jù)丟失,因?yàn)檫@些數(shù)據(jù)很可能還沒處理),這個(gè)新API測試了下,基本沒什么問題。
高級API如何解決?用類分布式鎖最終解決了這個(gè)問題,實(shí)現(xiàn)思路比較簡單,就是通過ZK來實(shí)現(xiàn),程序啟動(dòng)前先定義好需要啟動(dòng)的消費(fèi)者數(shù)量,如果還沒達(dá)到這個(gè)量,線程都不能啟動(dòng),達(dá)到這個(gè)線程數(shù)后,休眠幾秒后啟動(dòng),在啟動(dòng)的時(shí)候,消費(fèi)者線程已經(jīng)得到了平衡,除非線程死掉否則不會(huì)發(fā)生平衡了,所以暫時(shí)解決了這個(gè)問題。
思路共享出來,希望對大家有所幫助。