一:flink kafka offset配置
1. setStartFromGroupOffsets(默認的):
example:
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
查看partition offset
kafka-consumer-groups --bootstrap-server xxx:9092 --group groupId --describe
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
xxx 0 13949 13949 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 1 13871 13871 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 2 13974 13974 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 3 14192 14192 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
xxx 4 14036 14036 0 xxx-1-25efc288-c534-4b1b-a57b-4cfdce853439 /xxxx xxx-1
1.1、消費者組在kafka中提交的offsets開始讀取partition;
1.2、如果分區(qū)中offset沒有找到,則使用kafka properties中的auto.offset.reset配置(比如:latest、earliest)
2. setStartFromEarliest()
從最早的記錄開始,使用此配置,在kafka中已經(jīng)提交的offset將被忽略,不會被使用
3. setStartFromLatest()
從最新的開始,使用此配置,在kafka中已經(jīng)提交的offset將被忽略,不會被使用
4. setStartFromTimestamp(long)
- 從指定的時間開始消費;
- 對于每個partition,記錄的時間大于等于指定的時間將作為起始消費點;
- 如果partition的記錄時間早于指定時間,則從最近的數(shù)據(jù)記錄開始消費;
- 此模式下,在kafka中已經(jīng)提交的offset將被忽略不會作為消費起點。
5. properties配置offset
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
解釋:
- earliest
當各partition有消費者組已提交的offset時,從提交的offset開始消費;無提交的offset時,從起始開始消費 - latest
當各partition下有消費者組已提交的offset時,從提交的offset開始消費;無提交的offset時,消費最新的該partition下的數(shù)據(jù) - none
topic各partition都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常
二:kafka消費offset提交配置:
1. checkpoint禁用:
flink kafka消費依賴于內(nèi)部kafka客戶端自動定期的offset提交
配置:enable.auto.commit / auto.commit.interval.ms
2. checkpoint啟用:
flink kafka consumer在checkpoint完成時自動提交offset在checkpoint state中;
配置:setCommitOffsetsOnCheckpoints(boolean) 來啟用關(guān)閉;默認情況下,是開啟的true
此模式下,配置在properties中自動周期性的offset提交將被忽略;