Kafka基礎(chǔ)運維

在軟件項目的生命周期中,開發(fā)只占開始的一小部分,大部分時間我們要對項目進(jìn)行運行維護(hù),Kafka相關(guān)的項目也不例外。所以這里介紹了一些Kafka服務(wù)器基礎(chǔ)的運維方法。

關(guān)于Kafka的日志

日志的英語是“l(fā)og”,但Kafka的數(shù)據(jù)文件也被稱為log,所以很多時候會造成一定的歧義。在Kafka中,日志分為兩種:

  • 數(shù)據(jù)日志
  • 操作日志

數(shù)據(jù)日志是指Kafka的topic中存儲的數(shù)據(jù),這種日志的路徑是在$KAFKA_HOME/config/server.properties文件中配置,配置項為log.dirs。如果此項沒有被配置,默認(rèn)會使用配置項 log.dir(請仔細(xì)觀察,兩個配置項最后差了一個s)。log.dir的默認(rèn)路徑為/tmp/kafka-logs,大家知道,/tmp路徑下的文件在計算機(jī)重啟的時候是會被刪除的,因此,強(qiáng)烈推薦將文件目錄設(shè)置在其他可以永久保存的路徑。另一種日志是操作日志,類似于我們在自己開發(fā)的程序中輸出的log日志(log4j),這種日志的路徑是在啟動Kafka的路徑下。比如一般我們在KAFKA_HOME路徑下啟動Kafka服務(wù),那么操作日志的路徑為KAFKA_HOME/logs。

數(shù)據(jù)日志清理

數(shù)據(jù)日志有兩種類型的清理方式,一種是按照日志被發(fā)布的時間來刪除,另一種是按照日志文件的size來刪除。有專門的配置項可以配置這個刪除策略:

按時間刪除:

Kafka提供了配置項讓我們可以按照日志被發(fā)布的時間來刪除。它們分別是:

  • log.retention.ms
  • log.retention.minutes
  • log.retention.hours

根據(jù)配置項的名稱很容易理解它們的含義。log.retention.ms表示日志會被保留多少毫秒,如果為null,則Kafka會使用使用log.retention.minutes配置項。log.retention.minutes表示日志會保留多少分鐘,如果為null,則Kafka會使用log.retention.hours選項。默認(rèn)情況下,log.retention.ms和log.retention.minutes均為null,log.retention.hours為168,即Kafka的數(shù)據(jù)日志默認(rèn)會被保留7天。如果想修改Kafka中數(shù)據(jù)日志被保留的時間長度,可以通過修改這三個選項來實現(xiàn)。

按size刪除

Kafka除了提供了按時間刪除的配置項外,也提供了按照日志文件的size來刪除的配置項:

  • log.retention.bytes

即日志文件到達(dá)多少byte后再刪除日志文件。默認(rèn)為-1,即無限制。需要注意的是,這個選項的值如果小于segment文件大小的話是不起作用的。segment文件的大小取決于log.segment.bytes配置項,默認(rèn)為1G。
另外,Kafka的日志刪除策略并不是非常嚴(yán)格的(比如如果log.retention.bytes設(shè)置了10G的話,并不是超過10G的部分就會立刻刪除,只是被標(biāo)記為待刪除,Kafka會在恰當(dāng)?shù)臅r候再真正刪除),所以請預(yù)留足夠的磁盤空間。當(dāng)磁盤空間剩余量為0時,Kafka服務(wù)會被kill掉。

操作日志清理

目前Kafka的操作日志暫時不提供自動清理的機(jī)制,需要運維人員手動干預(yù),比如使用shell和crontab命令進(jìn)行定時備份、清理等。

Kafka配置IP訪問

在默認(rèn)情況下,訪問Kafka集群需要在/etc/hosts文件下配置所有Kafka集群的hostname,這樣比較麻煩。可以通過配置advertised.listeners配置項來使用ip來訪問Kafka集群,比如單臺Kafka broker的ip地址為192.168.1.100,那么可以使用如下配置來使客戶端來使用ip訪問:

advertised.listeners=PLAINTEXT://192.168.1.100:9092

Kafka的基本操作

接下來簡單介紹一下關(guān)于Kafka集群的基本操作。所有這些用到的工具全部在Kafka目錄下的/bin目錄下可以找到。如果不添加任何參數(shù)運行這些工具腳本,控制臺上會打印使用的提示。

添加topic

在Kafka中添加topic有兩種方式,一種是手動添加,另一種是在數(shù)據(jù)第一次寫入的時候動態(tài)添加。一般情況下還是推薦手動添加topic。
使用topic工具創(chuàng)建topic:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor參數(shù)用來指定需要多少個副本(連同leader在內(nèi)),一般比較推薦設(shè)置為2或3。如果設(shè)置太少(比如1)導(dǎo)致可用性下降,如果設(shè)置太大會影響Kafka的性能。

partitions參數(shù)用來設(shè)置topic有多少個partition。首先,每個partition必須存在于單臺服務(wù)器上,就是說如果某個topic有20個partition,那么它們將最多分布在20臺服務(wù)器上(不算replicas)。另外很重要的一點是,partition的數(shù)量將直接影響消費端程序的并行度。所以創(chuàng)建topic時要根據(jù)實際情況決定partition數(shù)量。

修改topic

修改topic主要是為了修改topic的partition數(shù)量和topic層面的配置。修改partition數(shù)量可以使用此命令:

bin/kafka-topics.sh --zookeeper zk_host:port --alter --topic my_topic_name --partitions 40

關(guān)于修改partition數(shù)量,有兩點需要注意:

  1. partition只能增加,不支持減少
  2. 新增的partition只會對將來寫入的數(shù)據(jù)起作用,以前存在的數(shù)據(jù)不會被移動到新的partition中

除了修改partition數(shù)量,我們還可以修改topic層面的配置,增加配置項:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

刪除配置項:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x

刪除topic

Kafka刪除topic的方法也很簡單。首先,需要在config/server.properties文件中配置

delete.topic.enable=true

這個配置項默認(rèn)是關(guān)閉的,也就是不支持刪除,我們需要手動配置為true。然后使用命令:

./bin/kafka-topics.sh --zookeeper zk_host:port --delete --topic my_topic_name

就可以刪除topic了。

查看消費者位置

有些時候我們需要查看某個消費者組的消費情況,我們可以使用工具kafka-consumer-groups.sh來查看。

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test

查看當(dāng)前正在使用的消費者組

自從kafka 0.9.0.0版本,Kafka引入了新的Java consumer模塊代替了原來的Scala consumer模塊。消費者組的元數(shù)據(jù)從zookeeper遷移到了broker中,所以使用新的API查看消費者組列表:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看topic的每個partition狀態(tài)

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1

擴(kuò)展Kafka集群

本節(jié)基本翻譯自官網(wǎng):Expanding your cluster
向Kafka集群添加服務(wù)器是很簡單的,分配給它們一個唯一的broker id并且在新服務(wù)器上啟動Kafka服務(wù)器就可以了。但是這些新服務(wù)器卻不能被自動分配數(shù)據(jù)分區(qū)(partition),所以除非partition被移動到這些服務(wù)器上,否者這些服務(wù)器直到新建topic的時候才會發(fā)揮作用。所以,通常情況下當(dāng)你添加服務(wù)器到你的集群中時,你都希望遷移一些已經(jīng)存在的數(shù)據(jù)到這些服務(wù)器上。

遷移數(shù)據(jù)的的過程是手動啟動的,但是執(zhí)行是自動化的。內(nèi)在過程就是Kafka將這些新服務(wù)器添加作為它們正在遷移的partition的follower并且允許它全部復(fù)制這個分區(qū)中已存在的數(shù)據(jù)。當(dāng)新服務(wù)器完全復(fù)制此分區(qū)的內(nèi)容并加入同步副本時,現(xiàn)有副本之一將刪除其分區(qū)的數(shù)據(jù)。

重新分配分區(qū)工具(partition reassignment tool)可以被用來在brokers間移動partition。理想的partition分配會確??鏱roker的均勻的數(shù)據(jù)負(fù)載和分區(qū)大小。重新分配分區(qū)工具并沒有自動獲取Kafka集群中的數(shù)據(jù)分布信息并均衡負(fù)載的能力。因此,管理員必須清楚要移動哪些topic或partition。

重新分配分區(qū)工具有以下三種互斥的運行模式:

  • generate:在這種模式下,給出topic列表和broker列表,分區(qū)工具將生成一個候選的分區(qū)方案用來將指定topic的所有分區(qū)移動到新broker上。這個選項僅僅提供了一種通過給出的topic列表和broker列表生成分區(qū)重新分配方案的簡便方式。
  • execute:這種模式下,分區(qū)工具將根據(jù)用戶提供的重新分區(qū)方案開始重新分區(qū)(使用 -- --reassignment-json-file )。這個選項既可以執(zhí)行管理員手寫的重新分區(qū)文件也可以執(zhí)行通過--generate命令生成的重新分區(qū)文件。
  • verify:這種模式下,分區(qū)工具驗證上次執(zhí)行-generate以來被重新分區(qū)的所有partition的狀態(tài)。這些狀態(tài)可能是successfully completed(成功完成), failed(失?。┗騣n progress(進(jìn)行中)

自動遷移數(shù)據(jù)到新的服務(wù)器上

重新分區(qū)工具可以將當(dāng)前brokers上的topics移動到新的brokers上。這在擴(kuò)展現(xiàn)有集群時很有用,因為將整個topic移動到新brokers上比一次移動一個partition更容易。然后,分區(qū)工具可以在新的brokers中均勻分配給定topic集合中的所有partition。在移動過程中,replication factor(副本數(shù))保持不變。

舉例來說,下面的例子將會把topics foo1、foo2的所有partition移動到新的brokers 5,6。當(dāng)移動行為結(jié)束以后,foo1和foo2的所有partition將僅存在于broker 5、6。

因為分區(qū)工具接受將topic列表寫入json文件的形式,所以你需要指定需要移動的topics并創(chuàng)建json文件,就像下面這樣:

cat topics-to-move.json
{ "topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }

當(dāng)json文件準(zhǔn)備好 了以后,使用重新分區(qū)工具自動生成一個候選的重新分配計劃:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate</br>
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]

}</br>
    Proposed partition reassignment configuration</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo1","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]}]
  }

分區(qū)工具生成的候選計劃將會將topic foo1,foo2的所有分區(qū)移動到broker 5,6。記住,盡管到了這一步,partition的移動卻尚未開始,而僅僅告訴你當(dāng)前的分區(qū)情況(Current partition replica assignment)和建議的新分區(qū)方案(Proposed partition reassignment configuration)。當(dāng)前分區(qū)情況應(yīng)該保留起來,以防你想要回滾。新分區(qū)方案應(yīng)該被存入一個josn文件中,(比如:expand-cluster-reassignment.json),然后作為分區(qū)工具 --execute選項的輸入文件。就像下面這樣:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute</br>
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}</br>
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}

最后,--verify選項可以用來檢查重新分配分區(qū)的狀態(tài)。注意此處應(yīng)該使用與執(zhí)行 --execute 選項時相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

自定義分區(qū)分配和移動

分區(qū)重新分配工具也可以被用來移動broker被選中的分區(qū)。當(dāng)使用這種方式的時候,我們假設(shè)用戶了解重新分配分區(qū)方案并且不需要使用分區(qū)工具去自動生成候選重新分區(qū)方案。為了高效,我們跳過了--generate步驟,直接來到--execute步驟

舉例來說,下面的例子將topic foo1的partition 0移動到 broker 5,6,將topic foo2的partition 1 移動到broker 2,3:
第一步來手寫自定義重新分區(qū)計劃的json文件:

cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

然后,使用--execute選項和json文件來開始重新分配分區(qū)的操作:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment</br>
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
{"topic":"foo2","partition":1,"replicas":[3,4]}]
}</br>
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}

用--verify選項來查看被重新分配分區(qū)的狀態(tài)。注意此處應(yīng)該使用與執(zhí)行 --execute 選項時相同的expand-cluster-reassignment.json 文件:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
Status of partition reassignment:

  Reassignment of partition [foo1,0] completed successfully
  Reassignment of partition [foo2,1] completed successfully
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容