大數據系列之Flume+kafka 整合

關于Flume 的 一些核心概念:

組件名稱 ? ?功能介紹

Agent代理 ? 使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。

Client客戶端 ?生產數據,運行在一個獨立的線程。

Source源 ? 從Client收集數據,傳遞給Channel。

Sink接收器 ? 從Channel收集數據,進行相關操作,運行在一個獨立線程。

Channel通道 ? 連接 sources 和 sinks ,這個有點像一個隊列。

Events事件 ? 傳輸的基本數據負載。

文章和大數據系列之Flume+HDFS非常相似,不同的在于flume安裝目錄conf下新建了kafka.properties文件,啟動時也應當用此配置文件作為參數啟動。下面看具體內容:

1. kafka.properties:

agent.sources = s1 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?

agent.channels = c1?????????????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sinks = k1????????????????????????????????????????????????????????????????????????????????????????????????????????????????????


agent.sources.s1.type=exec??????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sources.s1.command=tail -F /tmp/logs/kafka.log????????????????????????????????????????????????????????????????????????????????

agent.sources.s1.channels=c1????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.channels.c1.type=memory???????????????????????????????????????????????????????????????????????????????????????????????????????

agent.channels.c1.capacity=10000????????????????????????????????????????????????????????????????????????????????????????????????????

agent.channels.c1.transactionCapacity=100???????????????????????????????????????????????????????????????????????????????????????????


#設置Kafka接收器????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink??????????????????????????????????????????????????????????????????????????

#設置Kafka的broker地址和端口號??????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sinks.k1.brokerList=master:9092???????????????????????????????????????????????????????????????????????????????????????????????

#設置Kafka的Topic???????????????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sinks.k1.topic=kafkatest??????????????????????????????????????????????????????????????????????????????????????????????????????

#設置序列化方式?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?


agent.sinks.k1.channel=c1????


?關于配置文件中注意3點:

  a. ?agent.sources.s1.command=tail -F /tmp/logs/kafka.log ??

  b. ?agent.sinks.k1.brokerList=master:9092

  c .?agent.sinks.k1.topic=kafkatest

2.很明顯,由配置文件可以了解到:

  a.我們需要在/tmp/logs下建一個kafka.log的文件,且向文件中輸出內容(下面會說到);

  b.flume連接到kafka的地址是 master:9092,注意不要配置出錯了;

  c.flume會將采集后的內容輸出到Kafka topic 為kafkatest上,所以我們啟動zk,kafka后需要打開一個終端消費topic kafkatest的內容。這樣就可以看到flume與kafka之間玩起來了~~


3.具體操作:

  a.在/tmp/logs下建立空文件kafka.log。在mfz 用戶目錄下新建腳本kafkaoutput.sh(一定要給予可執(zhí)行權限),用來向kafka.log輸入內容: kafka_test***

for((i=0;i<=1000;i++));

do?echo?"kafka_test-"+$i>>/tmp/logs/kafka.log;

done

b. 在kafka安裝目錄下執(zhí)行如下命令,啟動zk,kafka 。(不明白此處可參照大數據系列之Flume+HDFS)

1bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &

1bin/kafka-server-start.sh -daemon config/server.properties &

? ? c.新增Topic kafkatest

1bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkatest?

?  d.打開新終端,在kafka安裝目錄下執(zhí)行如下命令,生成對topic kafkatest 的消費

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafkatest --from-beginning --zookeeper master

?  e.啟動flume

1bin/flume-ng agent --conf-file? conf/kafka.properties -c conf/ --name agent -Dflume.root.logger=DEBUG,console

  ?d.執(zhí)行kafkaoutput.sh腳本(注意觀察kafka.log內容及消費終端接收到的內容)

  e.查看新終端消費信息

整體流程如圖:

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容