雖然自動提交offset十分簡潔便利,但由于其是基于時間提交的,開發(fā)人員難以把握offset提交的時機(jī)。因此Kafka還提供了手動提交offset的API。
手動提交的offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。兩者的相同點是,都會將本次poll的一批數(shù)據(jù)最高的偏移量提交;不同點是 commitSync阻塞當(dāng)前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。?;而commitAsync則沒有失敗重試機(jī)制,故有可能提交失敗。
自定義存儲offset
Kafka0.9版本之前,offset存儲在zookeeper,0.9版本之及之后,默認(rèn)將offset存儲在Kafka的一個內(nèi)置的topic中。除此之外,Kafka還可以選擇自定義存儲offset。
offset的維護(hù)是相當(dāng)繁瑣的,因為需要考慮到消費者的Rebalance。
當(dāng)有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)生變化,就會觸發(fā)到分區(qū)的重新分配,重新分配的過程叫做Rebalance。
消費者發(fā)生Rebalance之后,每個消費者消費的分區(qū)就會發(fā)生變化。因此消費者要首先獲取到自己被重新分配到的分區(qū),并且定位到每個分區(qū)最近提交的offset位置繼續(xù)消費。
要實現(xiàn)自定義存儲offset,需要借助ConsumerRebalanceListener。