在進(jìn)入主題之前,我們先思考一個問題。
問題
kafka消費者使用自動提交的模式,提交間隔為2s,消費者在獲取數(shù)據(jù)的時候處理0.5s,從kafka拉取過來的數(shù)據(jù)只夠處理1秒。那么消費者下次拉取過來的數(shù)據(jù)是否是已經(jīng)消費完的數(shù)據(jù)?或者說由于數(shù)據(jù)已經(jīng)消費,但是偏移量沒有被提交,是否會造成下次獲取的數(shù)據(jù)是從舊的偏移量開始拉???
答案
不會是舊數(shù)據(jù),kafka的消費者也有自己偏移量,這個偏移量是從kafka中讀取的量,和kafka提交的偏移量不一樣。假設(shè)變成自動提交偏移量,而且沒有寫提交的邏輯,同一個消費者,除了第一次或者rebalance會根據(jù)已提交的offset來獲取數(shù)據(jù),剩下的時候都是根據(jù)自己本地的偏移量來獲取的。這個模式有點類似于用桶取水,用瓢來喝水。消費者就是桶的角色,poll就是瓢的角色。
重復(fù)消費的情況
我們把重復(fù)消費的情況分為2種,一種是想避免的,一種是故意如此的。
想避免的場景
- 消費者使用了自動提交模式,當(dāng)還沒有提交的時候,有新的消費者加入或者移除,發(fā)生了rebalance。再次消費的時候,消費者會根據(jù)提交的偏移量來,于是重復(fù)消費了數(shù)據(jù)。
- 使用異步提交,并且在callback里寫了失敗重試,但是沒有注意順序。例如提交5的時候,發(fā)送網(wǎng)絡(luò)故障,由于是異步,程序繼續(xù)運行,再次提交10的時候,提交成功,此時正好運行到5的重試,并且成功。當(dāng)發(fā)生了rebalance,又會重復(fù)消費了數(shù)據(jù)。
注:這里不討論那個消費者提交的offset的作用。
故意的場景
- 使用不同的組消費同一個topic。改個 group.id屬性即可。
- 自己手動提交偏移量。
這里的麻煩的地方就是需要理解開頭的問題,并不是說你提交完就可以了。你得想個辦法去讀取那個偏移量再次消費。下面提供一個暴力的手段,關(guān)閉消費者,然后再次開啟新的。
public static void consumer(Properties properties,String info) {
System.out.println(info);
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.subscribe(Arrays.asList(new String[]{"hello"}));
boolean flag = false;
while (true) {
ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
if (!poll.isEmpty()) {
for (ConsumerRecord<String, String> o : poll) {
System.out.println(o.value() + o.offset());
//假設(shè)場景為重復(fù)消費3,這里需要根據(jù)業(yè)務(wù)來提交便宜量
if (o.offset() == 3) {
//手動提交偏移量
Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
//提交的偏移量,這個偏移量就是下次消費的第一條數(shù)據(jù)
currentOffset.put(new TopicPartition(o.topic(), o.partition()), new OffsetAndMetadata(o.offset()+1, ""));
kafkaConsumer.commitSync(currentOffset);
flag = true;
break;
}
}
}
if(flag){
kafkaConsumer.close();
break;
}
}
}
這里也必須注意,kafka并不是數(shù)據(jù)庫,他保存的數(shù)據(jù)有持久化的時間和大小的限制,可能你提交的偏移量的數(shù)據(jù)已經(jīng)被kafka清理掉了。