參考:https://zhuanlan.zhihu.com/p/571040157
如果確認沒有使用 zk 或者其他外部存儲來保存消費者的 offset 信息,那么一般默認在 kafka 的 __consumer_offsets 這個 topic 中保存 offset。
這個系統(tǒng) topic 默認不能被用戶消費,那么就需要解讀它的數(shù)據(jù)文件了(當然可以在配置文件 config/consumer.properties 中添加 exclude.internal.topics=false,默認是 true,讓用戶可以消費系統(tǒng) topic):
# 共 50 個分區(qū),每臺機器的分區(qū)數(shù) = 50*replicas/brokers
$ ll data/ | grep -E "__consumer_offsets-" -c
# 分區(qū)的數(shù)據(jù)文件
$ ll data/__consumer_offsets-*/*.log
log 文件是二進制文件,需要 kafka 提供的方法 kafka.tools.DumpLogSegments 來讀取
假設(shè)業(yè)務(wù)的 topic 信息如下:
$ topicName=TEST
$ consumerGroupId=TEST_CG
1、計算 offset 信息保存位置
計算這個 topic 的消費組 offset 信息保存在哪個 __consumer_offsets 分區(qū)
- 計算公式:
Math.abs(groupID.hashCode()) % numPartitions
- Java 代碼:HashCode.java
public class HashCode {
public static void main(String args[]) {
String Str = new String("TEST");
System.out.println(Math.abs(Str.hashCode()%50));
}
}
- 執(zhí)行計算:
$ javac HashCode.java
$ java HashCode
11
結(jié)果為 11,就是說 offset 數(shù)據(jù)保存在 __consumer_offsets-11 分區(qū)里面
2、查看 __consumer_offsets 的分區(qū) 11 的信息
$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe --topic __consumer_offsets | grep "Partition: 11"
Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
可知分區(qū) 11 在 broker.id = 2,1,3 的這3臺機器上,leader id 是 2,查看 broker.id:
$ grep broker.id conf/broker.properties
broker.id=2
其實定位 __consumer_offsets 分區(qū) 11 的方法挺多的:
- 上面闡述的 --describe --topic __consumer_offsets
- --describe 業(yè)務(wù) topic 的分區(qū)信息
-
ls data/ | grep __consumer_offsets-11或者ls data/ | grep ${topicName}-
因為 ACL 的原因,無法使用 describe 去獲取業(yè)務(wù) topic 的分區(qū)信息時,
--describe --topic __consumer_offsets獲或者ls data/還是可以幫得上忙的。
3、提取 offset 信息
因為 __consumer_offsets-11 的 leader 的 broker.id = 2,那么只需要看這臺機器的 log 文件即可,修改時間最新的那個就是我們想要的文件,這里是 00000000000004458198.log
$ ll -tr data/__consumer_offsets-11/*.log
-rw-r--r-- 1 test test 4804 Mar 6 21:20 data/__consumer_offsets-11/00000000000000000000.log
-rw-r--r-- 1 test test 315708 Mar 8 11:49 data/__consumer_offsets-11/00000000000004458198.log
開始讀取消費組的 offset 信息
$ logfile=$(ll -tr data/__consumer_offsets-11/*.log | tail -n 1 | awk '{print $9}') && echo $logfile
$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--deep-iteration \
--print-data-log \
--files ${logfile} | less
baseOffset: 4460036 lastOffset: 4460036 count: 1 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 335511 LogAppendTime: 1678255976868 size: 161 magic: 2 compresscodec: NONE crc: 4126368907 isvalid: true
| offset: 4460036 LogAppendTime: 1678255976868 keysize: 67 valuesize: 24 sequence: 0 headerKeys: [] key: ^@^A^@!TEST_CG^@^TEST^@^@^@^F payload: ^@^C^@^@^@^@^@ le????^@^@^@^@^A????