Flink kafka source源碼解析(二)

offset提交模式(非checkpoint)

消費(fèi)kafka topic最為重要的部分就是對(duì)offset的管理,對(duì)于kafka提交offset的機(jī)制,可以參考kafka官方網(wǎng)。

而在flink kafka source中offset的提交模式有3種:

public enum OffsetCommitMode {

   /** Completely disable offset committing. */
   DISABLED,

   /** Commit offsets back to Kafka only when checkpoints are completed. */
   ON_CHECKPOINTS,

   /** Commit offsets periodically back to Kafka, using the auto commit functionality of internal Kafka clients. */
   KAFKA_PERIODIC;
}

初始化offsetCommitMode

FlinkKafkaConsumerBase#open方法中初始化offsetCommitMode

// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
        ((StreamingRuntimeContext)getRuntimeContext()).isCheckpointingEnabled());
  • 方法getIsAutoCommitEnabled()的實(shí)現(xiàn)如下:

    protected boolean getIsAutoCommitEnabled() {
       return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&
          PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;
    }
    

    也就是說(shuō)只有enable.auto.commit=true并且auto.commit.interval.ms>0這個(gè)方法才會(huì)返回true

  • 變量enableCommitOnCheckpoints默認(rèn)是true,可以調(diào)用setCommitOffsetsOnCheckpoints改變這個(gè)值

  • 當(dāng)代碼中調(diào)用了env.enableCheckpointing方法,isCheckpointingEnabled才會(huì)返回true

通過(guò)下面的代碼返回真正的提交模式

/**
 * Determine the offset commit mode using several configuration values.
 *
 * @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.
 * @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.
 * @param enableCheckpointing whether or not checkpoint is enabled for the consumer.
 *
 * @return the offset commit mode to use, based on the configuration values.
 */
public static OffsetCommitMode fromConfiguration(
      boolean enableAutoCommit,
      boolean enableCommitOnCheckpoint,
      boolean enableCheckpointing) {

   if (enableCheckpointing) {
      // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
      return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
   } else {
      // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
      return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
   }
}

本文暫時(shí)不考慮checkpoint的場(chǎng)景,所以只考慮(enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;

也就是如果客戶端設(shè)置了enable.auto.commit=true那么就是KAFKA_PERIODIC,否則就是DISABLED。

offset的提交

自動(dòng)提交

這種方式完全依靠kafka自身的特性進(jìn)行提交,如下方式指定參數(shù)即可

Properties properties = new Properties();
properties.put("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "1000");
new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

非自動(dòng)提交

通過(guò)上面的分析,如果enable.auto.commit=false,那么offsetCommitMode就是DISABLED
kafka官方文檔中,提到當(dāng)enable.auto.commit=false時(shí)候需要手動(dòng)提交offset,也就是需要調(diào)用consumer.commitSync();方法提交。
但是在flink中,非checkpoint模式下,不會(huì)調(diào)用consumer.commitSync();,一旦關(guān)閉自動(dòng)提交,意味著kafka不知道當(dāng)前的consumer group每次消費(fèi)到了哪。
可以從兩方面證實(shí)這個(gè)問(wèn)題:

  • 源碼
    KafkaConsumerThread#run方法中是有consumer.commitSync();,但是只有當(dāng)commitOffsetsAndCallback != null的時(shí)候才會(huì)調(diào)用。只有開啟了checkpoint功能才會(huì)不為null,這個(gè)變量會(huì)在后續(xù)的文章中詳細(xì)分析。

  • 測(cè)試

    1. 可以通過(guò)消費(fèi)__consumer_offsets觀察是否有offset的提交
    2. 重啟程序,還是會(huì)重復(fù)消費(fèi)之前消費(fèi)過(guò)的數(shù)據(jù)

總結(jié)

本文介紹了在非checkpoint模式下,flink kafka source提交offset的方式,后續(xù)會(huì)重點(diǎn)介紹checkpoint模式下提交offset的流程。

注:本文基于flink 1.9.0和kafka 2.3

參考

Flink kafka source源碼解析(一)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容