Kafka 集群

Kafka 是一個(gè)分布式消息系統(tǒng),具有高水平擴(kuò)展和高吞吐量的特點(diǎn)。在Kafka 集群中,沒有 “中心主節(jié)點(diǎn)” 的概念,集群中所有的節(jié)點(diǎn)都是對等的。

kafka

Kafka 幾大概念

Topic(主題)

Kafka 中可將消息分類,每一類的消息稱為一個(gè) Topic,消費(fèi)者可以對不同的 Topic 進(jìn)行不同的處理。

Broker(代理)

每個(gè) Broker 即一個(gè) Kafka 服務(wù)實(shí)例,多個(gè) Broker 構(gòu)成一個(gè) Kafka 集群,生產(chǎn)者發(fā)布的消息將保存在 Broker 中,消費(fèi)者將從 Broker 中拉取消息進(jìn)行消費(fèi)。

Producer(生產(chǎn)者)

負(fù)責(zé)生產(chǎn)消息并發(fā)送給 Broker 。

Consumer(生產(chǎn)者)

負(fù)責(zé)消費(fèi) Broker 中 Topic 消息,每個(gè) Consumer 實(shí)例歸屬于一個(gè) Consumer Group 查看更多介紹

Partition(分區(qū))

Partition 是 Kafka 中比較特色的部分,一個(gè) Topic 可以分為多個(gè) Partition,每個(gè) Partition 是一個(gè)有序的隊(duì)列,Partition 中的每條消息都存在一個(gè)有序的偏移量(Offest) ,同一個(gè) Consumer Group 中,只有一個(gè) Consumer 實(shí)例可消費(fèi)某個(gè) Partition 的消息。

Kafka 持久化

每個(gè) Topic 將消息分成多 Partition,每個(gè) Partition 在存儲層面是 append log 文件。任何發(fā)布到此 Partition 的消息都會被直接追加到 log 文件的尾部,每條消息在文件中的位置稱為 Offest(偏移量),Partition 是以文件的形式存儲在文件系統(tǒng)中,log 文件根據(jù) Broker 中的配置保留一定時(shí)間后刪除來釋放磁盤空間。

append log

Kafka 集群架構(gòu)圖

kafka cluster architecture

從圖中可以看出 Kafka 強(qiáng)依賴于 ZooKeeper ,通過 ZooKeeper 管理自身集群,如:Broker 列表管理Partition 與 Broker 的關(guān)系、Partition 與 Consumer 的關(guān)系、Producer 與 Consumer 負(fù)載均衡消費(fèi)進(jìn)度 Offset 記錄、消費(fèi)者注冊 等,所以為了達(dá)到高可用,ZooKeeper 自身也必須是集群。

官方提供的 Kafka 的下載包(Binary downloads) 內(nèi)默認(rèn)包含了 ZooKeeper 模塊,所以可以選擇不單獨(dú) 下載 ZooKeeper,不過實(shí)際情況可能還是需要單獨(dú)下載 ZooKeeper 來部署,畢竟環(huán)境的資源要求都不一樣。

下面的例子將基于 Windows 環(huán)境采用單獨(dú)搭建 ZooKeeper 的方式,Linux 環(huán)境基本類似,推薦用 Docker ,更簡單一些。

Kafka 環(huán)境搭建依賴 Java 環(huán)境,所以需要預(yù)先安裝好 JDK

ZooKeeper 集群

因?yàn)槭峭慌_機(jī)器上完成搭建,所以采用復(fù)制多份來完成

  1. 下載 解壓后復(fù)制出3份,zookeeper-1、zookeeper-2、zookeeper-3,創(chuàng)建 data 目錄存放數(shù)據(jù),創(chuàng)建 log 目錄存放日志;

  2. 在 data 的 zk1、zk2、zk2 目錄中分別創(chuàng)建文件 myid,文件內(nèi)容分別寫入 1、2、3;

    zookeeper dir
  3. 重命名 zookeeper-1/conf 文件下 zoo_sample.cfg => zoo.cfg,修改如下:

    # 數(shù)據(jù)存放目錄
    dataDir=E:\\cluster_zookeeper\\data\\zk1
    # 日志存放目錄
    dataLogDir=E:\\cluster_zookeeper\\log\\zk1
    # 監(jiān)聽端口  
    clientPort=2181
    
    # 集群配置
    # server.x 分別對應(yīng)myid文件的內(nèi)容(每個(gè) zoo.cfg 文件都需要添加)
    # 2287(通訊端口):3387(選舉端口)
    server.1=localhost:2287:3387
    server.2=localhost:2288:3388
    server.3=localhost:2289:3389
    
  4. zookeeper-2、zookeeper-3 類似 zookeeper-1 進(jìn)行調(diào)整,dataDir、dataLogDir、clientPort 都需要調(diào)整:

    dataDir=E:\\cluster_zookeeper\\data\\zk2
    dataLogDir=E:\\cluster_zookeeper\\log\\zk2
    clientPort=2182
    
    server.1=localhost:2287:3387
    server.2=localhost:2288:3388
    server.3=localhost:2289:3389
    
    dataDir=E:\\cluster_zookeeper\\data\\zk3
    dataLogDir=E:\\cluster_zookeeper\\log\\zk3
    clientPort=2183
    
    server.1=localhost:2287:3387
    server.2=localhost:2288:3388
    server.3=localhost:2289:3389
    
  5. 啟動 ZooKeeper

    分別通過 cmd 進(jìn)入每個(gè) ZooKeeper 實(shí)例的 bin 文件夾,輸入命令:

    zkServer
    

    從日志中發(fā)現(xiàn),目前 1、3 為 Follower,2 為 Leader:

    start zookeeper cluster

Kafka 集群

  1. 下載 解壓后復(fù)制出3份,kafka-1、kafka-2、kafka-3,創(chuàng)建 log 目錄存放消息日志;

    kafka dir
  2. 修改 kafka-1/config 文件夾下的 server.properties :

    # broker 編號,集群內(nèi)必須唯一
    broker.id=1
    # host 地址
    host.name=127.0.0.1
    # 端口
    port=9092
    # 消息日志存放地址
    log.dirs=E:\\cluster_kafka\\log\\k1
    # ZooKeeper 地址,多個(gè)用,分隔
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
  3. kafka-2、kafka-3 類似 kafka-1 進(jìn)行調(diào)整

    broker.id=2
    host.name=127.0.0.1
    port=9093
    log.dirs=E:\\cluster_kafka\\log\\k2
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
    broker.id=3
    host.name=127.0.0.1
    port=9094
    log.dirs=E:\\cluster_kafka\\log\\k3
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
    
  4. 啟動 Kafka

    分別通過 cmd 進(jìn)入每個(gè) Kafka 實(shí)例,輸入命令啟動:

    bin\windows\kafka-server-start.bat .\config\server.properties
    
    start kafka cluster

注意:如果出現(xiàn)以下錯(cuò)誤:

找不到或無法加載主類 Files\Java\jdk1.8.0...;

解決方案:在 bin\windows\kafka-run-class.bat 中找到 "set COMMAND=%JAVA%" 行,給 %CLASSPATH% 加上雙引號

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

通過以上的步驟,Kafka 集群搭建完成,接下來可以通過命令行或 Kafka 的 Java、C# 等語言的客戶端進(jìn)行消息的生產(chǎn)和消費(fèi)測試。

測試

  1. 創(chuàng)建 testTopic

    .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic testTopic
    
  2. 生產(chǎn)消息

    .\bin\windows\kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic testTopic
    
  3. 消費(fèi)消息

    .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic testTopic
    
    test result

參考鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容