kafka基本操作

? ? ? ?kafka的基本管理命令都在安裝目錄下面的bin文件夾。跳轉(zhuǎn)到安裝目錄,可以執(zhí)行一下基本的管理命令,比如創(chuàng)建topic,管理消費(fèi)組,消費(fèi)組消費(fèi)進(jìn)度查詢等

1. 創(chuàng)建topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot--create --topic my_topic_name? --partitions 20 --replication-factor 3

partitions為分區(qū)數(shù),分區(qū)越多,并發(fā)能力越強(qiáng)。replication_factor為副本數(shù),避免單點(diǎn)故障

2.?修改topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot--alter --topic my_topic_name? --partitions 40

partitions數(shù)量改為40,可能因?yàn)橄M(fèi)端能力不夠,需要增加消費(fèi)者

3.?優(yōu)雅的關(guān)機(jī)

? ? ? broker意外關(guān)閉的時(shí)候,沒有所謂的優(yōu)雅關(guān)機(jī),只能夠等待重新選舉出leader。如果broker主動(dòng)關(guān)閉,可以設(shè)置一個(gè)參數(shù)controlled.shutdown.enable=true來控制是否優(yōu)雅的關(guān)閉。如果參數(shù)為true,那么broker關(guān)閉前,會(huì)將自己節(jié)點(diǎn)的為leader的partition的領(lǐng)導(dǎo)權(quán)限轉(zhuǎn)交給其他節(jié)點(diǎn)。這樣,減少了重新選舉造成topic不可用的時(shí)間。

4.?broker自動(dòng)平衡

? ? ? 當(dāng)一個(gè)broker主動(dòng)關(guān)閉或者crash的時(shí)候,partition的領(lǐng)導(dǎo)權(quán)限將會(huì)轉(zhuǎn)交給其他broker。如果crash的broker重新啟動(dòng),它將只會(huì)是其他broker的follower,不會(huì)被客戶端使用。這樣會(huì)導(dǎo)致每個(gè)broker的壓力不平衡。這個(gè)時(shí)候,我們可以設(shè)置一些參數(shù),讓broker們自動(dòng)平衡:

# 是否自動(dòng)平衡broker之間的分配策略
auto.leader.rebalance.enable = true
# leader的不平衡比例,若是超過這個(gè)數(shù)值,會(huì)對(duì)分區(qū)進(jìn)行重新的平衡leader.imbalance.per.broker.percentage = 10
# 檢查leader是否不平衡的時(shí)間間隔
leader.imbalance.check.interval.seconds = 300

5. 查看消費(fèi)進(jìn)度

? ? ? 當(dāng)一個(gè)topic被很多消費(fèi)者消費(fèi)的時(shí)候,我們可能需要知道某一個(gè)消費(fèi)者的消費(fèi)進(jìn)度情況,可以運(yùn)行以下命令進(jìn)行查看:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

這條命令是查看消費(fèi)組my_group的消費(fèi)進(jìn)度。如果是老版本的kafka,需要通過如下命令查看

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group my-group

因?yàn)樾掳姹镜膋afka,offset才存放在broker,老版本0.8及以下存放在zookeeper。

6. 增加副本

當(dāng)我們新建topic沒有指定副本的時(shí)候,后續(xù)如果認(rèn)為不安全,需要增加副本的話,需要如下操作

cat increase-replication-factor.json

{"version":1,

"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

第一步,新建一個(gè)json文件,表示要修改哪個(gè)topic,副本存放到哪幾個(gè)broker。比如上面的例子是leader在5號(hào)broker,需要在6,7號(hào)broker新增2個(gè)副本。

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute

第二步,執(zhí)行定義好的分配方案,會(huì)有提示

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify

第三步,驗(yàn)證

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容