kafka(二)Kafka快速入門

集群部署

  1. 配置 server.properties
#broker的全局唯一編號,不能重復(fù)
broker.id=0
#刪除topic功能使能,當(dāng)前版本此配置默認(rèn)為true,已從配置文件移除
delete.topic.enable=true
#kafka運行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

其他服務(wù)器一樣配置

  1. 啟動集群
bin/kafka-server-start.sh -daemon config/server.properties

其他服務(wù)器一樣。

Kafka 命令行操作

topic 操作

腳本
kafka]$ bin\kafka-topics.sh
命令選項

選項 描述
--alter 更改分區(qū)數(shù),副本分配,和/或主題的配置。
--at-min-isr-partitions 如果在描述主題時設(shè)置,則僅顯示 isr 計數(shù)為的分區(qū)等于配置的最小值。 不是支持 --zookeeper 選項。
--bootstrap-server <String: server to connect to> 必需:要連接的 Kafka 服務(wù)器。 如果提供此項,則不需要直接的 Zookeeper 連接。
--command-config <String: command config property file> 包含要傳遞給管理客戶端的配置的屬性文件。 這僅與 --bootstrap-server 選項一起用于描述和更改代理配置。
--config <String: name=value>
--create 創(chuàng)建一個新的topic
--delete 刪除一個topic
--delete-config <String: name> 要為現(xiàn)有主題刪除的主題配置覆蓋(請參閱 --config 選項下的配置列表)。 不支持 --bootstrap-server 選項。
--describe 列出給定主題的詳細(xì)信息。
--disable-rack-aware 禁用機架感知副本分配
--exclude-internal 運行 list 或 describe 命令時排除內(nèi)部主題。 默認(rèn)會列出內(nèi)部主題
--force 禁止控制臺提示
--help 打印幫助信息。
--if-exists 如果在更改或刪除或描述主題時設(shè)置,則該操作僅在主題存在時執(zhí)行。 不支持 --bootstrap-server 選項。
--if-not-exists 如果在創(chuàng)建主題時設(shè)置,則只有在主題不存在時才會執(zhí)行操作。 不支持 --bootstrap- 服務(wù)器選項。
--list 列出所有可用的topic。
--partitions <Integer: # of partitions> 設(shè)置topic 分區(qū)數(shù)
--replication-factor <Integer:replication factor> 指定topic的副本數(shù)
--topic <String: topic> 指定topic 名稱
--topics-with-overrides 如果在描述主題時設(shè)置,則僅顯示已覆蓋配置的主題
--unavailable-partitions 如果在描述主題時設(shè)置,則只顯示其領(lǐng)導(dǎo)者不可用的分區(qū)
--under-min-isr-partitions 如果在描述主題時設(shè)置,則僅顯示 isr 計數(shù)小于配置的最小值的分區(qū)。 不支持 --zookeeper 選項。
--under-replicated-partitions 如果在描述主題時設(shè)置,則僅顯示在復(fù)制分區(qū)下
--version 展示Kafka版本
--zookeeper <String: hosts> 已棄用,zookeeper 連接的連接字符串,格式為 host:port。 可以提供多個主機以允許故障轉(zhuǎn)移。

案例

  1. 創(chuàng)建一個 topic
    語法:kafka-topics.sh --create --zookeeper <host>:<port> --if-not-exists --replication-factor <副本數(shù)> --partitions <分區(qū)數(shù)> --topic <副本名稱>
bin]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --if-not-exists --replication-factor 3 --partitions 3 --topic test
#輸出結(jié)果
Created topic test.
  1. 查看當(dāng)前服務(wù)器中的所有 topic
    語法: kafka-topics.sh --zookeeper <host>:<port> --list
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --list
# 輸出結(jié)果
__consumer_offsets
abc
test
  1. 刪除一個topic
    語法:kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
    需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除。
bin]$ kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test
# 輸出結(jié)果
Topic test is marked for deletion. # 并不會馬上刪除,而是先對該topic做一個標(biāo)記,后面再進(jìn)行刪除
#需要在 配置中設(shè)置 delete.topic.enable=true ,否則不會進(jìn)行刪除
Note: This will have no impact if delete.topic.enable is not set to true.
  1. 查看 topic 詳情
    語法:--describe
[atguigu@hadoop102 bin]$ kafka-topics.sh  --describe --bootstrap-server hadoop102:9092 --topic abc
#  topic  abc 詳細(xì)信息
Topic: abc  PartitionCount: 1   ReplicationFactor: 3    Configs: segment.bytes=1073741824
    Topic: abc  Partition: 0    Leader: 1   Replicas: 2,0,1 Isr: 1,2,0
參數(shù) 描述
Topic topic名稱
PartitionCount 分區(qū)數(shù)
ReplicationFactor 定義的分區(qū)數(shù)
Configs 配置
Partition 當(dāng)前分區(qū)位置
Leader 當(dāng)前那個broker為Leader
Replicas 副本位置
Isr lsr同步隊列

producer 操作

腳本
kafka]$ bin\kafka-console-producer.sh
命令選項

選項 描述
--batch-size <Integer: size> 如果消息不是同步發(fā)送的,則要在單個批次中發(fā)送的消息數(shù)。 (默認(rèn)值:200)
--broker-list <String: broker-list> 鏈接Kafka,必需:采用 HOST1:PORT1,HOST2:PORT2 形式的代理列表字符串。
--compression-codec [String: compression-codec] 支持的壓縮方式'none', 'gzip', 'snappy', 'lz4', or 'zstd'. 默認(rèn) 'gzip'
--help 打印幫助信息
--line-reader <String: reader_class> 用于從標(biāo)準(zhǔn)輸入讀取行的類的類名。默認(rèn)情況下,每行都作為單獨的消息讀取。 (默認(rèn):kafka.tools.ConsoleProducer$LineMessageReader)
--max-block-ms <Long: > 生產(chǎn)者發(fā)送的最大時間(默認(rèn):60000)
--max-memory-bytes <Long: > 緩沖大小,以字節(jié)為單位 (默認(rèn):33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> 合并數(shù)據(jù)的最小數(shù) (默認(rèn): 16384)
--message-send-max-retries <Integer> 退休數(shù),默認(rèn)為3
--metadata-expiry-ms <Long:> 強制刷新數(shù)據(jù)條數(shù)默認(rèn)為300000,元數(shù)據(jù)以毫秒為單位的過期間隔時間段
--producer-property <String> 傳遞用戶定義的Producer_Prop的機制
--producer.config <String: config file> 指定配置文件。 請注意, [producer-property] 優(yōu)先于此配置。
--property <String: prop> 一種將用戶定義的屬性以 key=value 的形式傳遞給消息閱讀器的機制。 這允許對用戶定義的消息閱讀器進(jìn)行自定義配置。
--request-required-acks <String:> 設(shè)置ack(確認(rèn)收到)的三種模式(0,1,-1),默認(rèn)為1
--request-timeout-ms <Integer:> 設(shè)置ack 的超時時間(單位毫秒)默認(rèn)為 1500
--retry-backoff-ms <Integer> 等待選舉時間,默認(rèn)為100)
--socket-buffer-size <Integer: size> 設(shè)置 tcp RECV 大小(默認(rèn): 102400)
--sync 設(shè)置為同步的
--timeout <Integer: timeout_ms> 如果設(shè)置和生產(chǎn)者運行異步模式,這給一條消息的最長時間是否有足夠的隊列等待批處理大小。該值以ms為單位。(默認(rèn):1000)
--topic <String: topic> 生產(chǎn)的消息發(fā)送給定的主題
--version 顯示Kafka版本
  1. 發(fā)送消息
    語法:kafka-console-producer.sh --broker-list <kafkaIP1>:<端口> <kafkaIP2>:<端口> --topic <topic名稱>
bin]$ kafka-console-producer.sh --broker-list hadoop102:9092 hadoop103:9092 --topic abc
#輸出
>hello

hadoop102 接收 topic abc 消息

[admin@hadoop102 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello

hadoop103 接收 topic abc 消息

[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello

consumer操作

腳本
kafka]$ bin/kafka-console-consumer.sh
命令選項

選項 描述
--bootstrap-server <String: server to connect to> 需:要連接的服務(wù)器。
--consumer-property <String: consumer_prop> 一種將用戶定義的屬性以 key=value 的形式傳遞給消費者的機制。
--consumer.config <String: config file> consumer配置屬性文件。 請注意, [consumer-property] 優(yōu)先于此配置。
--enable-systest-events 記錄消費者的消息及生命周期,用于系統(tǒng)測試
--formatter <String: class> 用于格式化 kafka 消息以供顯示的類的名稱。 (默認(rèn):kafka.tools.DefaultMessageFormatter)
--from-beginning 如果消費者還沒有一個既定的偏移量來消費,那么從日志中出現(xiàn)的最早的消息而不是最新的消息開始。
--group <String: consumer group id> 消費者的消費者組ID。
--help 打印幫助信息
--isolation-level <String> 設(shè)置為 read_committed 以過濾掉未提交的事務(wù)消息。 設(shè)置為 read_uncommitted 以讀取所有消息。 (默認(rèn)值:read_uncommitted)
--key-deserializer <String: deserializer for key> 設(shè)置 密鑰的解串器
--max-messages <Integer: num_messages> 退出前消費的最大消息數(shù)。 如果未設(shè)置,則消耗是連續(xù)的。
--offset <String: consume offset> 要消耗的偏移量 id(非負(fù)數(shù)),或 'earliest' 表示從開始,或 'latest' 表示從結(jié)束(默認(rèn)值:latest)
--partition <Integer: partition> 要消費的分區(qū)。 除非指定了“--offset”,否則消耗從分區(qū)的末尾開始。
--property <String: prop> 初始化消息格式化程序的屬性
--skip-message-on-error 如果在處理消息時出現(xiàn)錯誤,請?zhí)^而不是暫停。
--timeout-ms <Integer: timeout_ms> 如果指定,則在指定的時間間隔內(nèi)沒有可供消費的消息時退出。要消費的主題 ID。
--value-deserializer <String: deserializer for values> 值的解串器
--version 顯示Kafka版本
--whitelist <String: whitelist> 指定要包含以供使用的主題白名單的正則表達(dá)式。

案例

  1. 消費消息
    語法:kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名稱>
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc
#接收生產(chǎn)者推送的消息
hello
  1. 消費所有的消息
    語法:--from-beginning
[admin@hadoop103 bin]$ kafka-console-consumer.sh --bootstrap-server hadoop103:9092 --topic abc --from-beginning
#接收生產(chǎn)者推送的消息
sh
nihao
發(fā)哦那旮
ka
niha
hdalfajkl
你好
股東大法師
hello
python
hello
haoh
hello
hello
hflahfla
flajklfja
flajla
afadf
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容