Kafka Consumer自動(dòng)提交機(jī)制

Consumer消費(fèi)消息之后不需要手動(dòng)提交,consumer客戶端會(huì)自動(dòng)提交已經(jīng)消費(fèi)的消息的offset。

相關(guān)參數(shù)設(shè)置:

// 是否自動(dòng)提交偏移量
props.put("enable.auto.commit", "true");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動(dòng)提交偏移量
props.put("auto.commit.interval.ms", "5000");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");  // 默認(rèn)為5000ms

自動(dòng)提交可能執(zhí)行的時(shí)機(jī)

1、消費(fèi)者手動(dòng)指定自己需要消費(fèi)的分區(qū)(此處是異步提交)

調(diào)用棧為:
KafkaConsumer#assign
ConsumerCoordinator#maybeAutoCommitOffsetsNow

    public void maybeAutoCommitOffsetsNow() {
        // 必須要設(shè)置自動(dòng)提交且已經(jīng)和服務(wù)端的協(xié)調(diào)者建立連接
        // 1、如果消費(fèi)者還沒(méi)有開始消費(fèi)指定分區(qū)是不會(huì)觸發(fā)自動(dòng)提交位移
        // 2、如果消費(fèi)者在消費(fèi)的過(guò)程中受到一條KafkaConsumer#assign的指令,此時(shí)消
        //    費(fèi)訂閱的分區(qū)極有可能發(fā)生改變,所以一定要將之前訂閱的分區(qū)相關(guān)信息提交
        //    給服務(wù)端的協(xié)調(diào)者。
        if (autoCommitEnabled && !coordinatorUnknown())
            doAutoCommitOffsetsAsync();
        }
    }
2、消費(fèi)者拉取消息的時(shí)候(此處是異步提交)

調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
ConsumerCoordinator#maybeAutoCommitOffsetsAsync

private void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        if (coordinatorUnknown()) {
            this.nextAutoCommitDeadline = now + retryBackoffMs;
        // 并不是每次poll的時(shí)候會(huì)調(diào)用自動(dòng)提交位移
        // 條件為:now > oldNow + auto.commit.interval.ms
        // 觸發(fā)條件和用戶設(shè)置的auto.commit.interval.ms有關(guān),設(shè)置時(shí)間長(zhǎng)
        // 則觸發(fā)的次數(shù)少,設(shè)置時(shí)間短則觸發(fā)次數(shù)多
        } else if (now >= nextAutoCommitDeadline) {
            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
            doAutoCommitOffsetsAsync();
        }
    }
}
3、消費(fèi)者以消費(fèi)者組模式啟動(dòng),加入組重新rebalance之前(此處是同步提交)

調(diào)用棧為:
KafkaConsumer#poll
KafkaConsumer#pollOnce
ConsumerCoordinator#poll
AbstractCoordinator#ensureActiveGroup
AbstractCoordinator#joinGroupIfNeeded
AbstractCoordinator#onJoinPrepare
ConsumerCoordinator#maybeAutoCommitOffsetsSync
只要開啟了自動(dòng)提交,此處是一定會(huì)向協(xié)調(diào)者同步提交位移,因?yàn)樾枰匦聄ebalance,消費(fèi)者組中各個(gè)消費(fèi)者的分區(qū)既有可能會(huì)發(fā)生改變,重新消費(fèi)之前一定要獲取最新的唯一,盡最大努力避免重復(fù)消費(fèi)。

4、消費(fèi)者關(guān)閉的時(shí)候(此處是同步提交)

調(diào)用棧為:
KafkaConsumer#close
ConsumerCoordinator#close
關(guān)閉的時(shí)候肯定是要同步提交消費(fèi)位移的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容