Consumer消費(fèi)消息之后不需要手動(dòng)提交,consumer客戶端會(huì)自動(dòng)提交已經(jīng)消費(fèi)的消息的offset。
相關(guān)參數(shù)設(shè)置:
// 是否自動(dòng)提交偏移量
props.put("enable.auto.commit", "true");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動(dòng)提交偏移量
props.put("auto.commit.interval.ms", "5000");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000"); // 默認(rèn)為5000ms
自動(dòng)提交可能執(zhí)行的時(shí)機(jī)
1、消費(fèi)者手動(dòng)指定自己需要消費(fèi)的分區(qū)(此處是異步提交)
調(diào)用棧為:
KafkaConsumer#assign
ConsumerCoordinator#maybeAutoCommitOffsetsNow
public void maybeAutoCommitOffsetsNow() {
// 必須要設(shè)置自動(dòng)提交且已經(jīng)和服務(wù)端的協(xié)調(diào)者建立連接
// 1、如果消費(fèi)者還沒(méi)有開始消費(fèi)指定分區(qū)是不會(huì)觸發(fā)自動(dòng)提交位移
// 2、如果消費(fèi)者在消費(fèi)的過(guò)程中受到一條KafkaConsumer#assign的指令,此時(shí)消
// 費(fèi)訂閱的分區(qū)極有可能發(fā)生改變,所以一定要將之前訂閱的分區(qū)相關(guān)信息提交
// 給服務(wù)端的協(xié)調(diào)者。
if (autoCommitEnabled && !coordinatorUnknown())
doAutoCommitOffsetsAsync();
}
}
2、消費(fèi)者拉取消息的時(shí)候(此處是異步提交)
調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
ConsumerCoordinator#maybeAutoCommitOffsetsAsync
private void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
if (coordinatorUnknown()) {
this.nextAutoCommitDeadline = now + retryBackoffMs;
// 并不是每次poll的時(shí)候會(huì)調(diào)用自動(dòng)提交位移
// 條件為:now > oldNow + auto.commit.interval.ms
// 觸發(fā)條件和用戶設(shè)置的auto.commit.interval.ms有關(guān),設(shè)置時(shí)間長(zhǎng)
// 則觸發(fā)的次數(shù)少,設(shè)置時(shí)間短則觸發(fā)次數(shù)多
} else if (now >= nextAutoCommitDeadline) {
this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
doAutoCommitOffsetsAsync();
}
}
}
3、消費(fèi)者以消費(fèi)者組模式啟動(dòng),加入組重新rebalance之前(此處是同步提交)
調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
AbstractCoordinator#ensureActiveGroup
AbstractCoordinator#joinGroupIfNeeded
AbstractCoordinator#onJoinPrepare
ConsumerCoordinator#maybeAutoCommitOffsetsSync
只要開啟了自動(dòng)提交,此處是一定會(huì)向協(xié)調(diào)者同步提交位移,因?yàn)樾枰匦聄ebalance,消費(fèi)者組中各個(gè)消費(fèi)者的分區(qū)既有可能會(huì)發(fā)生改變,重新消費(fèi)之前一定要獲取最新的唯一,盡最大努力避免重復(fù)消費(fèi)。
4、消費(fèi)者關(guān)閉的時(shí)候(此處是同步提交)
調(diào)用棧為:
KafkaConsumer#close
ConsumerCoordinator#close
關(guān)閉的時(shí)候肯定是要同步提交消費(fèi)位移的。