(九)kafka常見操作
A. 基礎(chǔ)操作
-
創(chuàng)建主題
- 命令:
./kafka-topics.sh --create --zookeeper xxx.xxx.xxx.xxx:2181 --replication-factor 2 --partitions 3 --topic test - 注意事項:新版的kafka的kafka-topics腳本也用--bootstrap-server替代了--zookeeper,直接代入kafka的ip和端口(比如localhost:9092)。
- 命令:
-
查看主題
- 命令:
./kafka-topics.sh --describe --topic test --zookeeper xxx.xxx.xxx.xxx:2181
- 命令:
-
模擬生產(chǎn)者
- 命令:
./kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxxx:9092 --topic test
- 命令:
-
模擬消費者
- 命令:
./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxxx:9092 --topic test --from-beginning - 注意事項:該腳本下,只支持新的--bootstrap-server字段。
- 命令:
B. 集群擴(kuò)容節(jié)點
-
背景概述
- kafka cluster若需要增加節(jié)點,必須將原有topic進(jìn)行重新分區(qū)分配(主要是原有分區(qū)的數(shù)據(jù)遷移),不然會導(dǎo)致broker端負(fù)載不均衡(具體內(nèi)容可參考第六章)。
- 原集群共有3臺brokers(0,1,2),現(xiàn)增加2臺brokers(3 & 4)。原有topics兩個,分別為push_event & third_event,replicas設(shè)置為2。
-
操作步驟
-
預(yù)準(zhǔn)備工作
- 需要重新分區(qū)分配的topic,將其寫入對應(yīng)的json文本,move.json的具體內(nèi)容為
{ "topics":[ { "topic": "push_event" }, { "topic": "third_event" } ] } -
執(zhí)行算法腳本獲取分配方案
- 使用kafka的kafka-reassign-partitions.sh腳本,將第一步的move.json作為參數(shù)獲取新的分區(qū)分配方案。
- 具體命令為
./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file move.json --broker-list "0,1,2,3,4" --generate,其中broker list中的內(nèi)容就是具體的broker id。
-
執(zhí)行分配方案
執(zhí)行完第二步后會返回分配結(jié)果,由兩部分組成current方案 & proposed方案。
-
current方案作為備份為回滾做準(zhǔn)備,存儲為backup.json(存儲時,第一行省略)。
Current partition replica assignment { "version":1, "partitions":[ {"topic":"open_push_event","partition":2,"replicas":[0,1]}, {"topic":"open_push_event","partition":4,"replicas":[2,1]}, {"topic":"open_push_event","partition":3,"replicas":[1,0]}, {"topic":"open_push_event","partition":0,"replicas":[1,2]}, {"topic":"open_push_event","partition":1,"replicas":[2,0]} ] } -
proposed方案則是本次更新分區(qū)的方案,存儲為update.json(存儲時,第一行省略)。
Proposed partition reassignment configuration { "version":1, "partitions":[ {"topic":"think_tank","partition":2,"replicas":[2,0]}, {"topic":"think_tank","partition":4,"replicas":[4,2]}, {"topic":"think_tank","partition":3,"replicas":[3,1]}, {"topic":"think_tank","partition":0,"replicas":[0,3]}, {"topic":"think_tank","partition":1,"replicas":[1,4]} ] } 使用kafka-reassign-partitions.sh執(zhí)行分區(qū)分配,傳入update.json作為具體參數(shù),命令為
./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file update.json --execute,若數(shù)據(jù)較大則該動作會持續(xù)較久。
-