參考圖書:深入理解Kafka:核心設(shè)計與實踐原理
京東購買:https://item.jd.com/12489649.html
本節(jié)結(jié)構(gòu)目錄:

主題管理
主題管理是通過kafka-topics.sh來實現(xiàn)的,該shell腳本的代碼如下:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
1. 主題的創(chuàng)建
-
分區(qū)副本按照kafka既定邏輯創(chuàng)建,示例
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test --partitions 4 --replication-factor 2注意事項:
這里需要注意的是:--zookeeper localhost:2181/kafka 這里zookeeper的配置必須和配置文件server.properties中zookeeper.connect所配置的屬性值一致,如果不一致,創(chuàng)建主題報錯:Error while executing topic command : Replication factor: 2 larger than available brokers: 0. -
自定義分區(qū)邏輯創(chuàng)建
命令說明如下:
--replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...>這種方式是按照分區(qū)號的大小從小到大排列
分區(qū)與分區(qū)之間用逗號隔開。
分區(qū)內(nèi)多個副本用冒號隔開使用--replica-assignment創(chuàng)建主題,不需要執(zhí)行partitions和replication-factor 這兩個參數(shù)
示例:
[root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1:2,2:1,2:0 Created topic test02.注意事項:
1). 同一個分區(qū),副本不能重復(fù),不能這樣寫:bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test03 --replica-assignment 0:0,1:1,2:1,2:0
2). 分區(qū)的副本數(shù)不一樣,報異常:Partition 1 has different replication factor bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1,2:1,2:0
3). 跳過一個分區(qū),報異常:
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,,2:1,2:0 -
帶配置參數(shù)的主題創(chuàng)建
[root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000 Created topic test-config. -
創(chuàng)建主題的注意事項
- 創(chuàng)建已經(jīng)存在主題, 參數(shù) --if-not-exists的作用
- 創(chuàng)建主題的時候 點 . 和 下劃線_ 的注意問題
- 機架感知主題的創(chuàng)建: 配置信息:broker.rack
2. 主題的查看
-
查看所有主題
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list test test-config test02 -
通過日志信息查看
示例:查看前文創(chuàng)建的test信息
master節(jié)點信息如下:
[root@master kafka-logs]# pwd /tmp/kafka-logs [root@master kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-0 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-1slave1節(jié)點信息如下:
[root@slave1 kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-1 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-2 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-3slave2節(jié)點信息如下:
[root@slave2 kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-0 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-2 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-3三個節(jié)點一共8個文件夾,8=4(分區(qū)數(shù))*2(每一個分區(qū)的副本數(shù))
-
通過zookeeper來查看
[zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/topics/test {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}} cZxid = 0x300000137 ctime = Mon Apr 29 09:15:46 EDT 2019 mZxid = 0x300000137 mtime = Mon Apr 29 09:15:46 EDT 2019 pZxid = 0x300000139 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 68 numChildren = 1其中
"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}表示:分區(qū)分配情況 -
通過describe命令來查看
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test Topic:test PartitionCount:4 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 -
參數(shù) --topics-with-overrides的作用:列出包含了和集群配置不一樣的主題
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topics-with-overrides Topic:test-config PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 -
參數(shù) --under-replicated-partitions 的作用:找出所有包含失效副本的分區(qū)
將node2下線:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions Topic: test Partition: 0 Leader: 0 Replicas: 2,0 Isr: 0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1 Topic: test Partition: 3 Leader: 1 Replicas: 2,1 Isr: 1將node2上線:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions沒有任何信息輸出
-
參數(shù) --unavailable-partitions 的作用:查看主題中沒有l(wèi)eader副本的分區(qū),這些分區(qū)已經(jīng)處于離線狀態(tài),對于外界的生產(chǎn)者和消費者來說處于不可用狀態(tài)
將node1,node2下線:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --unavailable-partitions Topic: test Partition: 2 Leader: -1 Replicas: 1,2 Isr: 1 Topic: test Partition: 3 Leader: -1 Replicas: 2,1 Isr: 1
3. 主題,分區(qū),副本,日志之間的關(guān)系
一個主題:多個分區(qū), 一個分區(qū):多個副本, 一個副本:一個日志

4. 分區(qū)副本的分配
- 無機架感知
- 有機架感知
5. 修改分區(qū)(alter指令)
-
修改分區(qū)個數(shù)
## 先查看主題 test-config [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 ## 修改 test-config 分區(qū)數(shù) [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! ## 再查看主題 test-config [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
變更主題配置:alter指令配置config參數(shù)
-
修改一個已經(jīng)覆蓋默認(rèn)配置的參數(shù)
## 修改參數(shù) --config max.message.bytes=20000 [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config max.message.bytes=20000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. ## 再查看 [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
修改一個沒有覆蓋默認(rèn)配置的參數(shù)
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config segment.bytes=1048577 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:segment.bytes=1048577,cleanup.policy=compact,max.message.bytes=20000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
刪除配置參數(shù):--delete-config
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --delete-config segment.bytes WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
6. 配置管理: 通過kafka-configs.sh腳本來實現(xiàn)
-
查看配置參數(shù)說明:
[root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh This tool helps to manipulate and describe entity config for a topic, client, user or broker Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': log.message.timestamp.type ssl.client.auth log.retention.ms sasl.login.refresh.window.jitter sasl.kerberos.ticket.renew.window. factor log.preallocate log.index.size.max.bytes sasl.login.refresh.window.factor ssl.truststore.type ssl.keymanager.algorithm log.cleaner.io.buffer.load.factor sasl.login.refresh.min.period.seconds ssl.key.password background.threads log.retention.bytes ssl.trustmanager.algorithm log.segment.bytes max.connections.per.ip.overrides log.cleaner.delete.retention.ms log.segment.delete.delay.ms min.insync.replicas ssl.keystore.location ssl.cipher.suites log.roll.jitter.ms log.cleaner.backoff.ms sasl.jaas.config principal.builder.class log.flush.interval.ms log.cleaner.dedupe.buffer.size log.flush.interval.messages advertised.listeners num.io.threads listener.security.protocol.map log.message.downconversion.enable sasl.enabled.mechanisms sasl.login.refresh.buffer.seconds ssl.truststore.password listeners metric.reporters ssl.protocol sasl.kerberos.ticket.renew.jitter ssl.keystore.password sasl.mechanism.inter.broker.protocol log.cleanup.policy sasl.kerberos.principal.to.local.rules sasl.kerberos.min.time.before.relogin num.recovery.threads.per.data.dir log.cleaner.io.max.bytes.per.second log.roll.ms ssl.endpoint.identification.algorithm unclean.leader.election.enable message.max.bytes log.cleaner.threads log.cleaner.io.buffer.size max.connections.per.ip sasl.kerberos.service.name ssl.provider follower.replication.throttled.rate log.index.interval.bytes log.cleaner.min.compaction.lag.ms log.message.timestamp.difference.max. ms ssl.enabled.protocols log.cleaner.min.cleanable.ratio replica.alter.log.dirs.io.max.bytes. per.second ssl.keystore.type ssl.secure.random.implementation ssl.truststore.location sasl.kerberos.kinit.cmd leader.replication.throttled.rate num.network.threads compression.type num.replica.fetchers For entity-type 'users': request_percentage producer_byte_rate SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate For entity-type 'clients': request_percentage producer_byte_rate consumer_byte_rate Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --alter Alter the configuration for the entity. --bootstrap-server <String: server to The Kafka server to connect to. This connect to> is required for describing and altering broker configs. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id) --entity-type <String> Type of entity (topics/clients/users/brokers) --force Suppress console prompts --help Print usage information. --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
-
查看主題
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 [root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name test-config Configs for topic 'test-config' are max.message.bytes=20000,cleanup.policy=compact -
增加配置 add-config的用法
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --add-config max.message.bytes=20000,cleanup.policy=compact Completed Updating config for entity: topic 'test-config'. [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=20000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
從zk中查看配置
[zk: localhost:2181(CONNECTED) 6] get /kafka/config/topics/test-config {"version":1,"config":{"max.message.bytes":"20000","cleanup.policy":"compact"}} cZxid = 0x300000161 ctime = Mon Apr 29 09:58:22 EDT 2019 mZxid = 0x700000065 mtime = Wed May 08 09:53:41 EDT 2019 pZxid = 0x300000161 cversion = 0 dataVersion = 5 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 79 numChildren = 0參數(shù)修改,會在/config/changes/下面生成對應(yīng)的持久順序節(jié)點,可以查看得到:
[zk: localhost:2181(CONNECTED) 7] ls /kafka/config/changes [config_change_0000000003, config_change_0000000004] [zk: localhost:2181(CONNECTED) 8] get /kafka/config/changes/config_change_0000000003 {"version":2,"entity_path":"topics/test-config"} cZxid = 0x700000060 ctime = Wed May 08 09:45:14 EDT 2019 mZxid = 0x700000060 mtime = Wed May 08 09:45:14 EDT 2019 pZxid = 0x700000060 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 48 numChildren = 0 [zk: localhost:2181(CONNECTED) 9] get /kafka/config/changes/config_change_0000000004 {"version":2,"entity_path":"topics/test-config"} cZxid = 0x700000067 ctime = Wed May 08 09:53:41 EDT 2019 mZxid = 0x700000067 mtime = Wed May 08 09:53:41 EDT 2019 pZxid = 0x700000067 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 48 numChildren = 0
-
刪除配置 delete-config的用法
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --delete-config max.message.bytes,cleanup.policy Completed Updating config for entity: topic 'test-config'. [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs: Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
7. 主題端參數(shù)
8. 刪除主題
-
1、刪除主題的基本使用
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config Topic test-config is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.需要注意的是:這里必須將broker端配置參數(shù)delete.topic.enable設(shè)置為true,這才能刪除主題。而這個值默認(rèn)就是true。
-
2、刪除內(nèi)部主題:報錯,
-
3、刪除不存在的主題:--if-exists
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config Error while executing topic command : Topics in [] does not exist [2019-05-08 10:19:24,526] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:377) at kafka.admin.TopicCommand$.main(TopicCommand.scala:68) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config --if-exists ##不輸出任何信息
-
4、手動刪除主題
主題的元數(shù)據(jù)存在zookeeper的
/brokers/topics和/config/topics路徑下。主題中的消息數(shù)據(jù)存儲在log.dir或log.dirs路徑下。我們只需要刪除這些內(nèi)容即可
-
5、通過zk客戶端刪除
通過kafka-topics.sh腳本刪除主題,本質(zhì)上是在zk中的/admin/delete-config路徑下創(chuàng)建一個與待刪除主題同名的節(jié)點,以此標(biāo)記該主題為待刪除狀態(tài)。