概念
Kafka:是一個(gè)分布式消息系統(tǒng),由linkedin使用scala編寫,用作LinkedIn的活動(dòng)流(Activity Stream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)。具有高水平擴(kuò)展和高吞吐量。
Kafka和其他主流分布式消息系統(tǒng)的對比
| ActiveMQ | RabbitMQ | Kafka | |
|---|---|---|---|
| 開發(fā)語言 | Java | Erlang | Java |
| 支持協(xié)議 | OpenWire、STOMP、REST、XMP、AMQP | AMQP | AMQP |
| 事物 | 支持 | 支持 | 不支持 |
| 集群 | 支持 | 支持 | 支持 |
| 負(fù)載均衡 | 支持 | 支持 | 支持 |
| 動(dòng)態(tài)擴(kuò)容 | 不支持 | 不支持 | 支持(zk) |
礎(chǔ)知識
- 消費(fèi)者:(Consumer):從消息隊(duì)列中請求消息的客戶端應(yīng)用程序
- 生產(chǎn)者:(Producer) :向broker發(fā)布消息的應(yīng)用程序
- AMQP服務(wù)端(broker):用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列,便于fafka將生產(chǎn)者發(fā)送的消息,動(dòng)態(tài)的添加到磁盤并給每一條消息一個(gè)偏移量,所以對于kafka一個(gè)broker就是一個(gè)應(yīng)用程序的實(shí)例
- 主題(Topic):一個(gè)主題類似新聞中的體育、娛樂、教育等分類概念,在實(shí)際工程中通常一個(gè)業(yè)務(wù)一個(gè)主題
-分區(qū)(Partition):一個(gè)Topic中的消息數(shù)據(jù)按照多個(gè)分區(qū)組織,分區(qū)是kafka消息隊(duì)列組織的最小單位,一個(gè)分區(qū)可以看作是一個(gè)FIFO( First Input First Output的縮寫,先入先出隊(duì)列)的隊(duì)列 - 每一個(gè)分區(qū)都可以有多個(gè)副本,以防止數(shù)據(jù)的丟失
- 某一個(gè)分區(qū)中的數(shù)據(jù)如果需要更新,都必須通過該分區(qū)所有副本中的leader來更新
- 消費(fèi)者可以分組,比如有兩個(gè)消費(fèi)者組A和B,共同消費(fèi)一個(gè)topic:order_info,A和B所消費(fèi)的消息不會重復(fù),比如 order_info 中有100個(gè)消息,每個(gè)消息有一個(gè)id,編號從0-99,那么,如果A組消費(fèi)0-49號,B組就消費(fèi)50-99號
- 消費(fèi)者在具體消費(fèi)某個(gè)topic中的消息時(shí),可以指定起始偏移量
kafka分區(qū)是提高kafka性能的關(guān)鍵所在,當(dāng)你發(fā)現(xiàn)你的集群性能不高時(shí),常用手段就是增加Topic的分區(qū),分區(qū)里面的消息是按照從新到老的順序進(jìn)行組織,
消費(fèi)者從隊(duì)列頭訂閱消息,生產(chǎn)者從隊(duì)列尾添加消息
Kafka架構(gòu)
生產(chǎn)者生產(chǎn)消息、kafka集群、消費(fèi)者獲取消息這樣一種架構(gòu),如下圖:

kafka集群中的消息,是通過Topic(主題)來進(jìn)行組織的,如下圖:

工作圖

Kafka集群搭建
Kafka集群是把狀態(tài)保存在Zookeeper中的,首先要搭建Zookeeper集群
搭建Zookeeper集群
這里三臺服務(wù)器分別是
192.1682.158
192.1682.152
192.1682.150
在三臺服務(wù)器上分別安裝kafka
kafka官網(wǎng)下載地址 http://kafka.apache.org/downloads
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.0.0/ /usr/local/kafka
配置文件說明
broker.id=0 #當(dāng)前機(jī)器在集群中的唯一標(biāo)識,和zookeeper的myid性質(zhì)一樣
#listeners=PLAINTEXT://192.168.2.152:9092 #當(dāng)前kafka對外提供服務(wù)的端口默認(rèn)是9092
num.network.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)
num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù)
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他把消息持久化的地方是,當(dāng)前以逗號分割的目錄中,那個(gè)分區(qū)數(shù)最少就放那一個(gè)
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個(gè)值不能超過java的堆棧大小
num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù)
log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)
replica.fetch.max.bytes=5242880 #取消息的最大直接數(shù)
log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過這個(gè)值的時(shí)候,kafka會新起一個(gè)文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181#設(shè)置zookeeper的連接端口
主要修改配置這幾個(gè)地方
#每臺服務(wù)器的broker.id都不能相同
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181
三臺服務(wù)器分別啟動(dòng)Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
檢查服務(wù)是否啟動(dòng)
jps

在kafka集群中創(chuàng)建一個(gè)topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.152:2181 --replication-factor 3 --partitions 1 --topic order
解釋:
--replication-factor 3 #復(fù)制兩份
--partitions 1 #創(chuàng)建1個(gè)分區(qū)
--topic #主題為order
查看一下自己創(chuàng)建的topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.152:2181

在192.168.2.152機(jī)器上創(chuàng)建一個(gè)producer,發(fā)布者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.152:9092 --topic order
在192.168.2.150與192.168.2.158機(jī)器上分別創(chuàng)建一個(gè)consumer,消費(fèi)者者
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.150:9092 --topic order --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.158:9092 --topic order --from-beginning
測試
在發(fā)布者機(jī)器上輸入內(nèi)容

在兩臺消費(fèi)者機(jī)器上可以看到

并且在zookeeper可以到kafka的一些情況
zkCli.sh -server 192.168.2.152:2181

上面的顯示結(jié)果中:只有zookeeper是,zookeeper原生的,其他都是Kafka創(chuàng)建的
標(biāo)注一個(gè)重要的
get /brokers/ids/0

刪除topic命令
bin/kafka-topics.sh --delete --zookeeper ip:port --topic order
查看某個(gè)Topic的詳情
/usr/local/kafka/bin/kafka-topics.sh --topic order --describe --zookeeper 192.168.2.152:2181

- PartitionCount 分區(qū)數(shù)量
- ReplicationFactor 復(fù)制因子數(shù)量
- leader 是在給出的所有partitons中負(fù)責(zé)讀寫的節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有可能成為leader
- replicas 顯示給定partiton所有副本所存儲節(jié)點(diǎn)的節(jié)點(diǎn)列表,不管該節(jié)點(diǎn)是否是leader或者是否存活。
- isr 副本都已同步的的節(jié)點(diǎn)集合,這個(gè)集合中的所有節(jié)點(diǎn)都是存活狀態(tài),并且跟leader同步