kafka是什么
Kafka本質(zhì)上是一個(gè)MQ,它是分布式的,基于發(fā)布/訂閱模式,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
應(yīng)用場景
異步處理(MQ的傳統(tǒng)應(yīng)用場景)
使用消息隊(duì)列的好處:
- 解耦
允許你獨(dú)立的擴(kuò)展或者修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。 - 可恢復(fù)性
系統(tǒng)的一部分組件失效時(shí),不會影響到整個(gè)系統(tǒng)。即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。 - 緩沖
可以解決兩邊速度不一致的問題,更多的是處理生產(chǎn)大于消費(fèi)的問題。 - 峰值處理能力
在訪問量劇增的情況下,使用消息隊(duì)列能夠使關(guān)鍵組件頂住突發(fā)的訪問壓力。
消息隊(duì)列的兩種模式
- 點(diǎn)對點(diǎn)模式(一對一,消費(fèi)者主動拉取數(shù)據(jù),消息收到后消息清除)
- 發(fā)布/訂閱模式(一對多,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會清除消息)
生產(chǎn)者發(fā)布消息
消費(fèi)者訂閱消息,通過長輪詢拉取消息,kafka采用這種形式 (還有另一種訂閱方式,就是隊(duì)列推送消息給消費(fèi)者,但是這種方式容易造成消費(fèi)者資源浪費(fèi)或者處理跟不上)
基礎(chǔ)架構(gòu)
生產(chǎn)者
Kafka集群 (多個(gè)broker)
topic主題:消息的分類
partition分區(qū):提高某個(gè)topic的負(fù)載均衡,提高并發(fā)度
分區(qū)的leader:leader和follower一定不在同一個(gè)broker上
分區(qū)的follower
replication副本消費(fèi)者
消費(fèi)者組:多個(gè)消費(fèi)者可以放在一個(gè)消費(fèi)者組里。同一個(gè)分區(qū)只能被同消費(fèi)者組里的一個(gè)消費(fèi)者消費(fèi)。所以消費(fèi)者個(gè)數(shù)跟topic的分區(qū)數(shù)個(gè)數(shù)相等時(shí),消費(fèi)速度最高。
消費(fèi)者組可以提高消費(fèi)能力。Zookeeper
Kafka集群正常工作依賴于zk。
4.1. zk幫助kafka集群存儲一些信息。
要想把多臺broker設(shè)置為一個(gè)kafka集群,只要讓它們連同一個(gè)zk就可以了。
4.2. 消費(fèi)者也會存儲一些數(shù)據(jù)在zk。
消費(fèi)的位移offset保存在zk,為了防止消費(fèi)者掛掉,起來后繼續(xù)消費(fèi)。不過在新版本kafka(0.9版本及之后)中,這個(gè)信息由kafka集群自己存儲了。
為什么改成存到kafka呢?因?yàn)橄M(fèi)者頻繁拉取kafka消息的同時(shí),如果還需要頻繁的和zk通信,會影響到消費(fèi)者和zk的效率。
常用命令行操作
本機(jī)MacOS已安裝好Kafka。
- 查看Kafka的配置文件,/usr/local/etc/kafka/, 這些配置主要是給命令行用的:
kafka $ ls -l
total 136
-rw-r--r-- 1 a123 admin 906 2 19 16:12 connect-console-sink.properties
-rw-r--r-- 1 a123 admin 909 2 19 16:12 connect-console-source.properties
-rw-r--r-- 1 a123 admin 5321 2 19 16:12 connect-distributed.properties
-rw-r--r-- 1 a123 admin 883 2 19 16:12 connect-file-sink.properties
-rw-r--r-- 1 a123 admin 881 2 19 16:12 connect-file-source.properties
-rw-r--r-- 1 a123 admin 1111 2 19 16:12 connect-log4j.properties
-rw-r--r-- 1 a123 admin 2262 2 19 16:12 connect-standalone.properties
-rw-r--r-- 1 a123 admin 1221 2 19 16:12 consumer.properties
-rw-r--r-- 1 a123 admin 4727 2 19 16:12 log4j.properties
-rw-r--r-- 1 a123 admin 1925 2 19 16:12 producer.properties
-rw-r--r-- 1 a123 admin 6865 2 19 16:12 server.properties
-rw-r--r-- 1 a123 admin 1032 2 19 16:12 tools-log4j.properties
-rw-r--r-- 1 a123 admin 1169 2 19 16:12 trogdor.conf
-rw-r--r-- 1 a123 admin 1037 2 19 16:12 zookeeper.properties
這里的server.properties里面配置了server的一些屬性,目前主要看下面幾個(gè):
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# A comma separated list of directories under which to store log files
# 這個(gè)目錄存的是kafka的暫存數(shù)據(jù),并不是log日志
log.dirs=/usr/local/var/lib/kafka-logs
# The minimum age of a log file to be eligible for deletion due to age
# kafka數(shù)據(jù)的暫存時(shí)間
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#kafka數(shù)據(jù)文件的大小
log.segment.bytes=1073741824
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
- 啟動Kafka集群
#啟動kafka server
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties
- 創(chuàng)建/查看/刪除topic
3.1 創(chuàng)建主題
#創(chuàng)建主題
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#副本數(shù)不能大于broker的數(shù)目
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic article
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2021-03-20 13:57:51,129] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
(kafka.admin.TopicCommand$)
3.2 查看主題
#查看所有主題
kafka-topics --describe --zookeeper localhost:2181
#查看某個(gè)主題 -topic
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
3.3 刪除主題
#刪除主題
$ kafka-topics --delete --zookeeper localhost:2181 -topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
#由于kafka中delete.topic.enable 默認(rèn)為false,所以這個(gè)topic并沒有真的被刪除,只是在zk上被標(biāo)成刪除。
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs: MarkedForDeletion:true
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
[zk: localhost:2181(CONNECTED) 14] ls /admin/delete_topics
[test]
#更改server.properties,設(shè)置delete.topic.enable=true,重啟kafka server.
#從kafka server啟動日志里可以看到,啟動時(shí)在處理刪除test topic的相關(guān)工作
[2021-03-20 12:49:20,174] INFO [GroupCoordinator 0]: Removed 0 offsets associated with deleted partitions: test-0. (kafka.coordinator.group.GroupCoordinator)
......
[2021-03-20 12:49:20,180] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaFetcherManager)
[2021-03-20 12:49:20,180] INFO [ReplicaAlterLogDirsManager on broker 0] Removed fetcher for partitions Set(test-0) (kafka.server.ReplicaAlterLogDirsManager)
......
[2021-03-20 12:49:20,247] INFO Log for partition test-0 is renamed to /usr/local/var/lib/kafka-logs/test-0.a70d1b38100b44d385f17d23b3b673d0-delete and is scheduled for deletion (kafka.log.LogManager)
#再次用命令行,及在zk上 查看test topic,已經(jīng)看不到了
$ kafka-topics --describe --zookeeper localhost:2181 -topic test
$
[zk: localhost:2181(CONNECTED) 3] ls /admin/delete_topics
[]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets]
- 啟動生產(chǎn)者,消費(fèi)者,互相通信
#啟動生產(chǎn)者
$ kafka-console-producer --broker-list localhost:9092 --topic test
>hello world
#啟動消費(fèi)者
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
hello world
- 消費(fèi)者組
# 查看所有消費(fèi)者組
$ kafka-consumer-groups --bootstrap-server localhost:9092 --list
console-consumer-33462
#查看某個(gè)消費(fèi)者組的詳細(xì)信息
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group console-consumer-33462
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test 0 - 3 - consumer-1-b79c3b8d-8efe-4e0f-a95a-e7482007295a /127.0.0.1 consumer-1