分布式發(fā)布訂閱消息系統(tǒng)Kafka

Kafka概述

Kafka是一個(gè)高吞吐量、分布式的發(fā)布——訂閱消息系統(tǒng)。據(jù)Kafka官網(wǎng)介紹,當(dāng)前的Kafka已經(jīng)定位為一個(gè)分布式流式處理平臺(tái)(a distributed streaming platform),它以可水平擴(kuò)展和具有高吞吐量等特性而著稱。越來(lái)越多的開源分布式處理系統(tǒng)(Flume、Apache Storm 、Spark、Flink等)支持與KafKa集成。
Kafka能夠很好的滿足以下三個(gè)特性:

  • 能夠允許發(fā)布和訂閱流數(shù)據(jù)。從這個(gè)角度來(lái)講,平臺(tái)更像是一個(gè)消息隊(duì)列或者企業(yè)級(jí)的消息系統(tǒng);
  • 存儲(chǔ)流數(shù)據(jù)的時(shí)候提供相應(yīng)的容錯(cuò)機(jī)制;
  • 當(dāng)流數(shù)據(jù)達(dá)到的時(shí)候能夠即時(shí)的被處理;

Kafka架構(gòu)和核心概念

image.png
  • Topic:特指kafka處理的消息源
  • Partition(分區(qū)):Topic物理上的分組。一個(gè)Topic可以有多個(gè)Patition,每個(gè)Partition是一個(gè)有序的隊(duì)列
  • Message:消息,通信的基本單位
  • Producer:生產(chǎn)者。向kafka的一個(gè)topic發(fā)布消息的過(guò)程叫做生產(chǎn)
  • Consumer:消費(fèi)者,訂閱Topic并處理其發(fā)布的消息的過(guò)程叫做消費(fèi)
  • Broker:緩存代理,kafka集群中的一臺(tái)或者多臺(tái)服務(wù)器

Kafka部署和使用

Kafka需要用到zookeeper。所以在安裝之前需要先安裝zookeeper。

//下載并解壓zookeeper的安裝包到自定義目錄
tar -zxvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C ../apps/
// 配置環(huán)境變量并生效
vi ~/.bash_profile
              export ZK_HOME=/root/apps/zookeeper-3.4.5-cdh5.7.0
              export PATH=$ZK_HOME/bin:$PATH
source ~/.bash_profile
//進(jìn)入conf目錄,配置zoo.cfg文件
cp zoo_sample.cfg zoo.cfg
mkdir -p  ~/data/tmp/zk  // 創(chuàng)建數(shù)據(jù)目錄
vi zoo.cfg
       dataDir=/root/data/tmp/zk/   //默認(rèn)在tmp目錄下,在關(guān)機(jī)重啟后數(shù)據(jù)會(huì)丟失,這里做持久化保存配置
//進(jìn)入bin目錄,啟動(dòng)service
./zkServer.sh start
//驗(yàn)證
jps
      QuorumPeerMain //有zookeeper進(jìn)程表示啟動(dòng)成功

單節(jié)點(diǎn)單Broker部署和使用

  • 首先安裝Kafka,我版本選用的是2.2.0
1)解壓,配置系統(tǒng)環(huán)境變量 
tar -zxvf kafka_2.11-2.2.0.tgz -C ~/apps/
vi ~/.bash_profile
          export KAFKA_HOME=/root/apps/kafka_2.11-2.2.0
          export PATH=$KAFKA_HOME/bin:$PATH
source ~/. bash_profile
2)$KAFKA_HOME/config中配置文件server.properties中的參數(shù)說(shuō)明: 
①broker.id : 每個(gè)broker編號(hào),籃子編號(hào)不能沖突,只有一臺(tái)機(jī)器,默認(rèn)從0開始 
②監(jiān)聽端口默認(rèn)在9092上面 
③hostname:當(dāng)前機(jī)器
④log.dirs:用來(lái)存儲(chǔ)kafka日志的目錄   //默認(rèn)為tmp目錄,重啟后會(huì)丟失,需要改為自定義目錄
⑤num.partition:分區(qū)的數(shù)量,默認(rèn)為1 
⑥zookeeper.connection: zookeeper地址
  • 啟動(dòng)kafka(啟動(dòng)之前先要啟動(dòng)zookeeper)
kafka-server-start.sh $KAFKA_HOME/config/server.properties
注:如果只輸入kafka-server-start.sh 
會(huì)出現(xiàn)提示信息:USAGE: /home/hadoop/app/kafka_2.11-0.9.0.0/bin/kafka-server-start.sh [-daemon] server.properties [–override property=value]* 
-daemon:表示是否以后臺(tái)的形式運(yùn)行 
  • 啟動(dòng)后用jps可以看到kafka,用jps -m可以看到kafka對(duì)應(yīng)哪個(gè)server.properties文件
  • 創(chuàng)建topic:名稱test_hi,執(zhí)行命令需要到bin目錄。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_hi

創(chuàng)建topic需要指定zookeeper地址,副本系數(shù)為1,一個(gè)分區(qū)一個(gè)副本系數(shù)

  • 查看目前有多少個(gè)topic,所有topic
./kafka-topics.sh --list --zookeeper localhost:2181
  • 發(fā)送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test_hi

bin目錄下自帶了一個(gè)控制臺(tái)的生產(chǎn)者,要用broker-list,9092是前面配置的監(jiān)聽端口

  • 消費(fèi)消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_hi --from-beginning
  • 測(cè)試
    可以開兩個(gè)命令行終端,分別執(zhí)行生產(chǎn)和消費(fèi)消息命令,輸入即可實(shí)時(shí)顯示,在生產(chǎn)者終端中鍵入消息并看到它們出現(xiàn)在消費(fèi)者終端中。默認(rèn)情況下,每行作為單獨(dú)一個(gè)消息發(fā)送。可以發(fā)現(xiàn),消費(fèi)終端關(guān)閉再開啟,消息不丟失,這是因?yàn)橛昧?-from-beginning,說(shuō)明從頭開始消費(fèi)。如果不加這個(gè)參數(shù),只從當(dāng)前開始消費(fèi)。
  • 補(bǔ)充:
    1)./kafka-topics.sh --describe --zookeeper localhost:2181
    可以看到所有topic的詳細(xì)信息
    2)./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_hi
    可以查看指定topic的信息

單節(jié)點(diǎn)多Broker部署和使用

  • 先為每個(gè)broker配置一個(gè)server.properties,主要修改其中的broker.id、監(jiān)聽端口號(hào)和日志輸出目錄,因?yàn)槲覀兪窃谝慌_(tái)機(jī)器上運(yùn)行。
  • 啟動(dòng)zk
  • 啟動(dòng)kafka,現(xiàn)在以后臺(tái)的方式啟動(dòng),這樣終端關(guān)了也沒有關(guān)系
./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

./kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

4)啟動(dòng)后,用jps可以看到三個(gè)kafka,jps -m可以看到分別對(duì)應(yīng)的配置文件
5)創(chuàng)建topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-topic

3個(gè)broker對(duì)應(yīng)3個(gè)副本

  • 查看topic及詳細(xì)信息同上
    其中,Leader指的是主broker的id是3,replicas表示3個(gè)broker列表,Isr表示當(dāng)前存活的brokerId
  • 發(fā)送消息
kafka-console-producer.sh --broker-list localhost:9093,localhost:9094,localhost:9095 --topic my-topic

3個(gè)broker端口號(hào)與配置文件中一致

  • 消費(fèi)信息
./kafka-console-consumer.sh --bootstrap-server localhost:9093,localhost:9094,localhost:9095 --topic my-topic

現(xiàn)在可以啟動(dòng)三個(gè)終端輸入命令,實(shí)時(shí)接受信息

多節(jié)點(diǎn)多Broker部署和使用

和單節(jié)點(diǎn)多Broker部署類似。這里由于機(jī)器原因不做演示。

Kafka容錯(cuò)性測(cè)試

在多Broker案例中,我們隨便殺掉一個(gè)broker進(jìn)程,在生成者中發(fā)送消息,發(fā)現(xiàn)消費(fèi)者還是可以正常接收,查看topic信息,發(fā)現(xiàn)活得節(jié)點(diǎn)還有兩個(gè),所以說(shuō)在多Broker的環(huán)境中具有容錯(cuò)性。


image.png

Kafka和Flume整合完成實(shí)時(shí)數(shù)據(jù)采集

http://www.itdecent.cn/p/cc85af7c6cec
中已經(jīng)簡(jiǎn)述了Flume的安裝和使用?,F(xiàn)在整合Flume完成實(shí)時(shí)的數(shù)據(jù)采集,采集到的數(shù)據(jù)通過(guò)Kafka的消費(fèi)者消費(fèi),后續(xù)會(huì)學(xué)習(xí)spark streaming對(duì)收集到的數(shù)據(jù)進(jìn)行實(shí)時(shí)處理。

image.png

  • 配置agent
    agent1的配置
agentA.sources = r1
agentA.sinks = k1
agentA.channels = c1

# Describe/configure the source
agentA.sources.r1.type = exec
agentA.sources.r1.command= tail -F /root/data/data.log
agentA.sources.r1.shell = /bin/sh -c

# Describe the sink
agentA.sinks.k1.type = avro
agentA.sinks.k1.hostname  = localhost
agentA.sinks.k1.port = 44444

# Use a channel which buffers events in memory
agentA.channels.c1.type = memory

# Bind the source and sink to the channel
agentA.sources.r1.channels = c1
agentA.sinks.k1.channel = c1

agent2的配置

# Name the components on this agent
kafka-agent.sources = avro-source
kafka-agent.sinks = kafka-sink
kafka-agent.channels = memory-channel

# Describe/configure the source
kafka-agent.sources.avro-source.type = avro
kafka-agent.sources.avro-source.bind = localhost
kafka-agent.sources.avro-source.port = 44444

# Describe the sink
kafka-agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
kafka-agent.sinks.kafka-sink.brokerList = 192.168.30.131:9092
kafka-agent.sinks.kafka-sink.topic = test_hi
kafka-agent.sinks.kafka-sink.batchSize = 3
kafka-agent.sinks.kafka-sink.requiredAcks = 1

# Use a channel which buffers events in memory
kafka-agent.channels.memory-channel.type = memory

# Bind the source and sink to the channel
kafka-agent.sources.avro-source.channels = memory-channel
kafka-agent.sinks.kafka-sink.channel = memory-channel
  • 啟動(dòng)兩個(gè)agent
flume-ng agent --name agentA --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/clientA.conf -Dflume.root.logger=DEBUG,console

flume-ng agent --name kafka-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/kafka-agent.conf -Dflume.root.logger=DEBUG,console

  • 啟動(dòng)Kafka的消費(fèi)者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_hi
  • 在日志文件中輸入字符,模擬日志的生成。
echo kafka-test >> data.log
  • 在Kafka的消費(fèi)者界面查看是否消費(fèi)到了消息


    image.png

    如圖可見消費(fèi)者可以消費(fèi)到,接下來(lái)會(huì)學(xué)習(xí)spark streaming來(lái)替換消費(fèi)者。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容