Kafka內(nèi)部提供了許多管理腳本,這些腳本都放在$KAFKA_HOME/bin目錄下,CDH版本放在/opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/lib/kafka/bin目錄

企業(yè)生產(chǎn)中,我們可以通過 bin/kafka-topics.sh、bin/kafka-consumer-groups.sh、bin/kafka-run-class.sh 內(nèi)部管理執(zhí)行腳本監(jiān)控Consumer的Group、Topic、Pid、消費(fèi)的Offset、總logSize大小和未消費(fèi)堆積的Lag等信息,結(jié)合shell或python調(diào)度腳本可以大有用途。
0. 進(jìn)入kafka的執(zhí)行目錄
cd /opt/cloudera/parcels/KAFKA-3.1.0-1.3.1.0.p0.35/lib/kafka/
1. 查看所有的kafka topic列表:
bin/kafka-topics.sh -zookeeper hadoop-5:2181,hadoop-6:2181 -list
2. 查看kafka特定topic詳情,使用--topic與--describe參數(shù)
bin/kafka-topics.sh -zookeeper hadoop-5:2181,hadoop-6:2181 --topic ztjy.dt.log.adv --describe
3. 查看 consumer group列表:
3.1.新版本 (信息保存在broker中)? > 0.9.0 version? 故 需要指定bootstrap--server參數(shù)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop-4:9092,hadoop-5:9092,hadoop-6:9092 --list
3.2.老版本 (信息保存在zookeeper中) <= 0.9.0? ? ? ? 故 需要指定zookeeper參數(shù)
bin/kafka-consumer-groups.sh --zookeeper hadoop-5:2181,hadoop-6:2181,hadoop-4:2181 --list
4. 查看 consumer group詳情:? ? 相同的
4.1.新版本 (信息保存在broker中)? > 0.9.0 version? 故 需要指定bootstrap--server參數(shù)? ?
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server hadoop-4:9092,hadoop-5:9092,hadoop-6:9092 --group kafka-hdfs --describe
4.2.老版本 (信息保存在zookeeper中) <= 0.9.0? ? ? ? 故 需要指定zookeeper參數(shù)
bin/kafka-consumer-groups.sh --zookeeper hadoop-5:2181,hadoop-6:2181,hadoop-4:2181 --group kafka-hdfs --describe
報(bào)錯: Error: The consumer group 'kafka-hdfs' does not exist.
因?yàn)?--group kafka-hdfs? 組是保存在broker中的,而不是在zookeeper中,所以用上面2方式查詢會報(bào)錯。
5.列出所有消費(fèi)者組的所有信息:? ? 老版本 <= 0.9.0
包括Group(消費(fèi)者組)、Topic、Pid(分區(qū)id)、Offset(當(dāng)前已消費(fèi)的條數(shù))、LogSize(總條數(shù))、Lag(未消費(fèi)的條數(shù))、Owner等信息。
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper hadoop-5:2181,hadoop-6:2181,hadoop-4:2181 --group kafka-hdfs --topic topic.log.uv
WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)

