flink kafka offset配置/提交

一:flink kafka offset配置


1. setStartFromGroupOffsets(默認的):

example:
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

查看partition offset

kafka-consumer-groups --bootstrap-server xxx:9092 --group groupId  --describe
TOPIC     PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                                       HOST            CLIENT-ID
xxx         0          13949           13949           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         1          13871           13871           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         2          13974           13974           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         3          14192           14192           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
xxx         4          14036           14036           0     xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439         /xxxx           xxx-1
  • 1.1、消費者組在kafka中提交的offsets開始讀取partition;

  • 1.2、如果分區(qū)中offset沒有找到,則使用kafka properties中的auto.offset.reset配置(比如:latest、earliest)


2. setStartFromEarliest()

從最早的記錄開始,使用此配置,在kafka中已經(jīng)提交的offset將被忽略,不會被使用


3. setStartFromLatest()

從最新的開始,使用此配置,在kafka中已經(jīng)提交的offset將被忽略,不會被使用


4. setStartFromTimestamp(long)

  • 從指定的時間開始消費;
  • 對于每個partition,記錄的時間大于等于指定的時間將作為起始消費點;
  • 如果partition的記錄時間早于指定時間,則從最近的數(shù)據(jù)記錄開始消費;
  • 此模式下,在kafka中已經(jīng)提交的offset將被忽略不會作為消費起點。

5. properties配置offset

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

解釋:

  • earliest
    當各partition有消費者組已提交的offset時,從提交的offset開始消費;無提交的offset時,從起始開始消費
  • latest
    當各partition下有消費者組已提交的offset時,從提交的offset開始消費;無提交的offset時,消費最新的該partition下的數(shù)據(jù)
  • none
    topic各partition都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常

二:kafka消費offset提交配置:


1. checkpoint禁用:

flink kafka消費依賴于內(nèi)部kafka客戶端自動定期的offset提交

配置:enable.auto.commit / auto.commit.interval.ms


2. checkpoint啟用:

flink kafka consumer在checkpoint完成時自動提交offset在checkpoint state中;

配置:setCommitOffsetsOnCheckpoints(boolean) 來啟用關(guān)閉;默認情況下,是開啟的true
此模式下,配置在properties中自動周期性的offset提交將被忽略;


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容