Kafka集群搭建

概念

Kafka:是一個(gè)分布式消息系統(tǒng),由linkedin使用scala編寫,用作LinkedIn的活動(dòng)流(Activity Stream)和運(yùn)營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ)。具有高水平擴(kuò)展和高吞吐量。

Kafka和其他主流分布式消息系統(tǒng)的對比

ActiveMQ RabbitMQ Kafka
開發(fā)語言 Java Erlang Java
支持協(xié)議 OpenWire、STOMP、REST、XMP、AMQP AMQP AMQP
事物 支持 支持 不支持
集群 支持 支持 支持
負(fù)載均衡 支持 支持 支持
動(dòng)態(tài)擴(kuò)容 不支持 不支持 支持(zk)
礎(chǔ)知識
  • 消費(fèi)者:(Consumer):從消息隊(duì)列中請求消息的客戶端應(yīng)用程序
  • 生產(chǎn)者:(Producer) :向broker發(fā)布消息的應(yīng)用程序
  • AMQP服務(wù)端(broker):用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列,便于fafka將生產(chǎn)者發(fā)送的消息,動(dòng)態(tài)的添加到磁盤并給每一條消息一個(gè)偏移量,所以對于kafka一個(gè)broker就是一個(gè)應(yīng)用程序的實(shí)例
  • 主題(Topic):一個(gè)主題類似新聞中的體育、娛樂、教育等分類概念,在實(shí)際工程中通常一個(gè)業(yè)務(wù)一個(gè)主題
    -分區(qū)(Partition):一個(gè)Topic中的消息數(shù)據(jù)按照多個(gè)分區(qū)組織,分區(qū)是kafka消息隊(duì)列組織的最小單位,一個(gè)分區(qū)可以看作是一個(gè)FIFO( First Input First Output的縮寫,先入先出隊(duì)列)的隊(duì)列
  • 每一個(gè)分區(qū)都可以有多個(gè)副本,以防止數(shù)據(jù)的丟失
  • 某一個(gè)分區(qū)中的數(shù)據(jù)如果需要更新,都必須通過該分區(qū)所有副本中的leader來更新
  • 消費(fèi)者可以分組,比如有兩個(gè)消費(fèi)者組A和B,共同消費(fèi)一個(gè)topic:order_info,A和B所消費(fèi)的消息不會重復(fù),比如 order_info 中有100個(gè)消息,每個(gè)消息有一個(gè)id,編號從0-99,那么,如果A組消費(fèi)0-49號,B組就消費(fèi)50-99號
  • 消費(fèi)者在具體消費(fèi)某個(gè)topic中的消息時(shí),可以指定起始偏移量
kafka分區(qū)是提高kafka性能的關(guān)鍵所在,當(dāng)你發(fā)現(xiàn)你的集群性能不高時(shí),常用手段就是增加Topic的分區(qū),分區(qū)里面的消息是按照從新到老的順序進(jìn)行組織,
消費(fèi)者從隊(duì)列頭訂閱消息,生產(chǎn)者從隊(duì)列尾添加消息
Kafka架構(gòu)

生產(chǎn)者生產(chǎn)消息、kafka集群、消費(fèi)者獲取消息這樣一種架構(gòu),如下圖:


image.png

kafka集群中的消息,是通過Topic(主題)來進(jìn)行組織的,如下圖:


image.png

工作圖
image.png
Kafka集群搭建

Kafka集群是把狀態(tài)保存在Zookeeper中的,首先要搭建Zookeeper集群
搭建Zookeeper集群
這里三臺服務(wù)器分別是
192.1682.158
192.1682.152
192.1682.150
在三臺服務(wù)器上分別安裝kafka
kafka官網(wǎng)下載地址 http://kafka.apache.org/downloads

 wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.0.0/ /usr/local/kafka

配置文件說明

broker.id=0  #當(dāng)前機(jī)器在集群中的唯一標(biāo)識,和zookeeper的myid性質(zhì)一樣
#listeners=PLAINTEXT://192.168.2.152:9092 #當(dāng)前kafka對外提供服務(wù)的端口默認(rèn)是9092
num.network.threads=3 #這個(gè)是borker進(jìn)行網(wǎng)絡(luò)處理的線程數(shù)
num.io.threads=8 #這個(gè)是borker進(jìn)行I/O處理的線程數(shù)
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個(gè)目錄可以配置為“,”逗號分割的表達(dá)式,上面的num.io.threads要大于這個(gè)目錄的個(gè)數(shù)這個(gè)目錄,如果配置多個(gè)目錄,新創(chuàng)建的topic他把消息持久化的地方是,當(dāng)前以逗號分割的目錄中,那個(gè)分區(qū)數(shù)最少就放那一個(gè)
socket.send.buffer.bytes=102400 #發(fā)送緩沖區(qū)buffer大小,數(shù)據(jù)不是一下子就發(fā)送的,先回存儲到緩沖區(qū)了到達(dá)一定的大小后在發(fā)送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區(qū)大小,當(dāng)數(shù)據(jù)到達(dá)一定大小后在序列化到磁盤
socket.request.max.bytes=104857600 #這個(gè)參數(shù)是向kafka請求消息或者向kafka發(fā)送消息的請請求的最大數(shù),這個(gè)值不能超過java的堆棧大小
num.partitions=1 #默認(rèn)的分區(qū)數(shù),一個(gè)topic默認(rèn)1個(gè)分區(qū)數(shù)
log.retention.hours=168 #默認(rèn)消息的最大持久化時(shí)間,168小時(shí),7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)
replica.fetch.max.bytes=5242880  #取消息的最大直接數(shù)
log.segment.bytes=1073741824 #這個(gè)參數(shù)是:因?yàn)閗afka的消息是以追加的形式落地到文件,當(dāng)超過這個(gè)值的時(shí)候,kafka會新起一個(gè)文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時(shí)間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181#設(shè)置zookeeper的連接端口

主要修改配置這幾個(gè)地方

#每臺服務(wù)器的broker.id都不能相同
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181

三臺服務(wù)器分別啟動(dòng)Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

檢查服務(wù)是否啟動(dòng)

jps
image.png

在kafka集群中創(chuàng)建一個(gè)topic:

/usr/local/kafka/bin/kafka-topics.sh  --create --zookeeper 192.168.2.152:2181 --replication-factor 3 --partitions 1 --topic order

解釋:
--replication-factor 3 #復(fù)制兩份
--partitions 1 #創(chuàng)建1個(gè)分區(qū)
--topic #主題為order
查看一下自己創(chuàng)建的topic:

/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.152:2181
image.png

在192.168.2.152機(jī)器上創(chuàng)建一個(gè)producer,發(fā)布者

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.152:9092 --topic order

在192.168.2.150與192.168.2.158機(jī)器上分別創(chuàng)建一個(gè)consumer,消費(fèi)者者

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.150:9092 --topic order --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.158:9092 --topic order --from-beginning
測試

在發(fā)布者機(jī)器上輸入內(nèi)容


image.png

在兩臺消費(fèi)者機(jī)器上可以看到


image.png

并且在zookeeper可以到kafka的一些情況
zkCli.sh -server 192.168.2.152:2181
image.png

上面的顯示結(jié)果中:只有zookeeper是,zookeeper原生的,其他都是Kafka創(chuàng)建的

標(biāo)注一個(gè)重要的

get /brokers/ids/0
image.png

刪除topic命令

bin/kafka-topics.sh --delete --zookeeper ip:port --topic order

查看某個(gè)Topic的詳情

/usr/local/kafka/bin/kafka-topics.sh --topic order --describe --zookeeper 192.168.2.152:2181 
image.png
  • PartitionCount 分區(qū)數(shù)量
  • ReplicationFactor 復(fù)制因子數(shù)量
  • leader 是在給出的所有partitons中負(fù)責(zé)讀寫的節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都有可能成為leader
  • replicas 顯示給定partiton所有副本所存儲節(jié)點(diǎn)的節(jié)點(diǎn)列表,不管該節(jié)點(diǎn)是否是leader或者是否存活。
  • isr 副本都已同步的的節(jié)點(diǎn)集合,這個(gè)集合中的所有節(jié)點(diǎn)都是存活狀態(tài),并且跟leader同步
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容