集群部署
- 配置 server.properties
#broker的全局唯一編號,不能重復(fù)
broker.id=0
#刪除topic功能使能,當(dāng)前版本此配置默認(rèn)為true,已從配置文件移除
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
其他服務(wù)器一樣配置
- 啟動集群
bin/kafka-server-start.sh -daemon config/server.properties
其他服務(wù)器一樣。
Kafka 命令行操作
topic 操作
腳本
kafka]$ bin\kafka-topics.sh
命令選項
| 選項 | 描述 |
|---|---|
| --alter | 更改分區(qū)數(shù),副本分配,和/或主題的配置。 |
| --at-min-isr-partitions | 如果在描述主題時設(shè)置,則僅顯示 isr 計數(shù)為的分區(qū)等于配置的最小值。 不是支持 --zookeeper 選項。 |
| --bootstrap-server <String: server to connect to> | 必需:要連接的 Kafka 服務(wù)器。 如果提供此項,則不需要直接的 Zookeeper 連接。 |
| --command-config <String: command config property file> | 包含要傳遞給管理客戶端的配置的屬性文件。 這僅與 --bootstrap-server 選項一起用于描述和更改代理配置。 |
| --config <String: name=value> | |
| --create | 創(chuàng)建一個新的topic |
| --delete | 刪除一個topic |
| --delete-config <String: name> | 要為現(xiàn)有主題刪除的主題配置覆蓋(請參閱 --config 選項下的配置列表)。 不支持 --bootstrap-server 選項。 |
| --describe | 列出給定主題的詳細(xì)信息。 |
| --disable-rack-aware | 禁用機架感知副本分配 |
| --exclude-internal | 運行 list 或 describe 命令時排除內(nèi)部主題。 默認(rèn)會列出內(nèi)部主題 |
| --force | 禁止控制臺提示 |
| --help | 打印幫助信息。 |
| --if-exists | 如果在更改或刪除或描述主題時設(shè)置,則該操作僅在主題存在時執(zhí)行。 不支持 --bootstrap-server 選項。 |
| --if-not-exists | 如果在創(chuàng)建主題時設(shè)置,則只有在主題不存在時才會執(zhí)行操作。 不支持 --bootstrap- 服務(wù)器選項。 |
| --list | 列出所有可用的topic。 |
| --partitions <Integer: # of partitions> | 設(shè)置topic 分區(qū)數(shù) |
| --replication-factor <Integer:replication factor> | 指定topic的副本數(shù) |
| --topic <String: topic> | 指定topic 名稱 |
| --topics-with-overrides | 如果在描述主題時設(shè)置,則僅顯示已覆蓋配置的主題 |
| --unavailable-partitions | 如果在描述主題時設(shè)置,則只顯示其領(lǐng)導(dǎo)者不可用的分區(qū) |
| --under-min-isr-partitions | 如果在描述主題時設(shè)置,則僅顯示 isr 計數(shù)小于配置的最小值的分區(qū)。 不支持 --zookeeper 選項。 |
| --under-replicated-partitions | 如果在描述主題時設(shè)置,則僅顯示在復(fù)制分區(qū)下 |
| --version | 展示Kafka版本 |
| --zookeeper <String: hosts> | 已棄用,zookeeper 連接的連接字符串,格式為 host:port。 可以提供多個主機以允許故障轉(zhuǎn)移。 |
案例
- 創(chuàng)建一個 topic
語法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本數(shù)> --partitions <分區(qū)數(shù)> --topic <副本名稱>
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#輸出結(jié)果
Created topic test.
- 查看當(dāng)前服務(wù)器中的所有 topic
語法:kafka-topics.sh --zookeeper <host>:<port> --list
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 輸出結(jié)果
__consumer_offsets
abc
test
- 刪除一個topic
語法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除。
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 輸出結(jié)果
Topic test is marked for deletion. # 并不會馬上刪除,而是先對該topic做一個標(biāo)記,后面再進(jìn)行刪除
#需要在 配置中設(shè)置 delete.topic.enable=true ,否則不會進(jìn)行刪除
Note: This will have no impact if delete.topic.enable is not set to true.
- 查看 topic 詳情
語法:--describe
[atguigu@hadoop102 bin]$ kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic abc
# topic abc 詳細(xì)信息
Topic: abc PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: abc Partition: 0 Leader: 1 Replicas: 2,0,1 Isr: 1,2,0
| 參數(shù) | 描述 |
|---|---|
| Topic | topic名稱 |
| PartitionCount | 分區(qū)數(shù) |
| ReplicationFactor | 定義的分區(qū)數(shù) |
| Configs | 配置 |
| Partition | 當(dāng)前分區(qū)位置 |
| Leader | 當(dāng)前那個broker為Leader |
| Replicas | 副本位置 |
| Isr | lsr同步隊列 |
producer 操作
腳本
kafka]$ bin\kafka-console-producer.sh
命令選項
| 選項 | 描述 |
|---|---|
| --batch-size <Integer: size> | 如果消息不是同步發(fā)送的,則要在單個批次中發(fā)送的消息數(shù)。 (默認(rèn)值:200) |
| --broker-list <String: broker-list> | 鏈接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。 |
| --compression-codec [String: compression-codec] | 支持的壓縮方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默認(rèn) 'gzip' |
| --help | 打印幫助信息 |
| --line-reader <String: reader_class> | 用于從標(biāo)準(zhǔn)輸入讀取行的類的類名。默認(rèn)情況下,每行都作為單獨的消息讀取。 (默認(rèn):kafka.tools.ConsoleProducer$LineMessageReader) |
| --max-block-ms <Long: > | 生產(chǎn)者發(fā)送的最大時間(默認(rèn):60000) |
| --max-memory-bytes <Long: > | 緩沖大小,以字節(jié)為單位 (默認(rèn):33554432) |
| --max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> | 合并數(shù)據(jù)的最小數(shù) (默認(rèn): 16384) |
| --message-send-max-retries <Integer> | 退休數(shù),默認(rèn)為3 |
| --metadata-expiry-ms <Long:> | 強制刷新數(shù)據(jù)條數(shù)默認(rèn)為300000,元數(shù)據(jù)以毫秒為單位的過期間隔時間段 |
| --producer-property <String> | 傳遞用戶定義的Producer_Prop的機制 |
| --producer.config <String: config file> | 指定配置文件。 請注意, [producer-property] 優(yōu)先于此配置。 |
| --property <String: prop> | 一種將用戶定義的屬性以 key=value 的形式傳遞給消息閱讀器的機制。 這允許對用戶定義的消息閱讀器進(jìn)行自定義配置。 |
| --request-required-acks <String:> | 設(shè)置ack(確認(rèn)收到)的三種模式(0,1,-1),默認(rèn)為1 |
| --request-timeout-ms <Integer:> | 設(shè)置ack 的超時時間(單位毫秒)默認(rèn)為 1500 |
| --retry-backoff-ms <Integer> | 等待選舉時間,默認(rèn)為100) |
| --socket-buffer-size <Integer: size> | 設(shè)置 tcp RECV 大小(默認(rèn): 102400) |
| --sync | 設(shè)置為同步的 |
| --timeout <Integer: timeout_ms> | 如果設(shè)置和生產(chǎn)者運行異步模式,這給一條消息的最長時間是否有足夠的隊列等待批處理大小。該值以ms為單位。(默認(rèn):1000) |
| --topic <String: topic> | 生產(chǎn)的消息發(fā)送給定的主題 |
| --version | 顯示Kafka版本 |
- 發(fā)送消息
語法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名稱>
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#輸出
>hello
hadoop102 接收 topic abc 消息
[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello
hadoop103 接收 topic abc 消息
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello
consumer操作
腳本
kafka]$ bin/kafka-console-consumer.sh
命令選項
| 選項 | 描述 |
|---|---|
| --bootstrap-server <String: server to connect to> | 需:要連接的服務(wù)器。 |
| --consumer-property <String: consumer_prop> | 一種將用戶定義的屬性以 key=value 的形式傳遞給消費者的機制。 |
| --consumer.config <String: config file> | consumer配置屬性文件。 請注意, [consumer-property] 優(yōu)先于此配置。 |
| --enable-systest-events | 記錄消費者的消息及生命周期,用于系統(tǒng)測試 |
| --formatter <String: class> | 用于格式化 kafka 消息以供顯示的類的名稱。 (默認(rèn):kafka.tools.DefaultMessageFormatter) |
| --from-beginning | 如果消費者還沒有一個既定的偏移量來消費,那么從日志中出現(xiàn)的最早的消息而不是最新的消息開始。 |
| --group <String: consumer group id> | 消費者的消費者組ID。 |
| --help | 打印幫助信息 |
| --isolation-level <String> | 設(shè)置為 read_committed 以過濾掉未提交的事務(wù)消息。 設(shè)置為 read_uncommitted 以讀取所有消息。 (默認(rèn)值:read_uncommitted) |
| --key-deserializer <String: deserializer for key> | 設(shè)置 密鑰的解串器 |
| --max-messages <Integer: num_messages> | 退出前消費的最大消息數(shù)。 如果未設(shè)置,則消耗是連續(xù)的。 |
| --offset <String: consume offset> | 要消耗的偏移量 id(非負(fù)數(shù)),或 'earliest' 表示從開始,或 'latest' 表示從結(jié)束(默認(rèn)值:latest) |
| --partition <Integer: partition> | 要消費的分區(qū)。 除非指定了“--offset”,否則消耗從分區(qū)的末尾開始。 |
| --property <String: prop> | 初始化消息格式化程序的屬性 |
| --skip-message-on-error | 如果在處理消息時出現(xiàn)錯誤,請?zhí)^而不是暫停。 |
| --timeout-ms <Integer: timeout_ms> | 如果指定,則在指定的時間間隔內(nèi)沒有可供消費的消息時退出。要消費的主題 ID。 |
| --value-deserializer <String: deserializer for values> | 值的解串器 |
| --version | 顯示Kafka版本 |
| --whitelist <String: whitelist> | 指定要包含以供使用的主題白名單的正則表達(dá)式。 |
案例
- 消費消息
語法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名稱>
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello
- 消費所有的消息
語法:--from-beginning
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生產(chǎn)者推送的消息
sh
nihao
發(fā)哦那旮
ka
niha
hdalfajkl
你好
股東大法師
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf