Flume采集日志到Kafka

image.png

1.日志采集Flume安裝

詳情見:http://www.itdecent.cn/p/a2590b997e8e?v=1698800791912

集群規(guī)劃:

服務(wù)器hadoop101 服務(wù)器hadoop102 服務(wù)器hadoop103
Flume(采集日志) Flume Flume

2.項目經(jīng)驗之Flume組件選型

1)Source

(1)Taildir Source相比Exec Source、Spooling Directory Source的優(yōu)勢
TailDir Source:斷點續(xù)傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現(xiàn)斷點續(xù)傳。
Exec Source可以實時搜集數(shù)據(jù),但是在Flume不運行或者Shell命令出錯的情況下,數(shù)據(jù)將會丟失。
Spooling Directory Source監(jiān)控目錄,支持?jǐn)帱c續(xù)傳。

(2)batchSize大小如何設(shè)置?
答:Event 1K左右時,500-1000合適(默認(rèn)為100)

2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel數(shù)據(jù)存儲在Kafka里面,所以數(shù)據(jù)是存儲在磁盤中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因為發(fā)現(xiàn)parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉(zhuǎn)為Flume Event。這樣的話,造成的結(jié)果是,會始終都把Flume的headers中的信息混合著內(nèi)容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內(nèi)容寫入即可。

3.日志采集Flume配置

image.png

Flume直接讀log日志的數(shù)據(jù),log日志的格式是app.yyyy-MM-dd.log。

2)Flume的具體配置如下:
(1)在 /opt/module/flume 目錄下創(chuàng)建 job 目錄

[yobhel@hadoop101 flume]$ mkdir job

進入 job 目錄,創(chuàng)建 file-flume-kafka.conf 文件

[yobhel@hadoop101 job]$ vim file-flume-kafka.conf

在文件配置如下內(nèi)容

#為各組件命名
a1.sources = r1
a1.channels = c1

#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/log/app.*
a1.sources.r1.positionFile = /opt/module/data_mocker/log_position.json
a1.sources.r1.interceptors =  i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.ETLInterceptor$Builder

#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

#綁定source和channel以及sink和channel的關(guān)系
a1.sources.r1.channels = c1

注意:com.yobhel.flume.interceptor.ETLInterceptor是自定義的攔截器的全類名。需要根據(jù)用戶自定義的攔截器做相應(yīng)修改。

4 Flume攔截器

https://github.com/Yobhel121/edu-flume-interceptor

1)需要先將打好的包放入到hadoop101的/opt/module/flume/lib文件夾下面。

[yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor
edu-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

2)分發(fā)Flume到hadoop102、hadoop103

[yobhel@hadoop101 module]$ xsync flume/

3)在hadoop101上啟動Flume

[yobhel@hadoop101 flume]$ bin/flume-ng agent --name a1 --conf-file job/file-flume-kafka.conf &

5.日志采集Flume啟動停止腳本

1)在/home/yobhel/bin目錄下創(chuàng)建腳本f1.sh

[yobhel@hadoop101 bin]$ vim f1.sh

在腳本中填寫如下內(nèi)容

#! /bin/bash

case $1 in
"start"){
        for i in hadoop101 hadoop102
        do
                echo " --------啟動 $i 采集flume-------"
                ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1  &"
        done
};; 
"stop"){
        for i in hadoop101 hadoop102
        do
                echo " --------停止 $i 采集flume-------"
                ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk  '{print \$2}' | xargs -n1 kill -9 "
        done

};;
esac
  • 說明1:nohup,該命令可以在你退出帳戶/關(guān)閉終端之后繼續(xù)運行相應(yīng)的進程。nohup就是不掛起的意思,不掛斷地運行命令。
  • 說明2:awk 默認(rèn)分隔符為空格。
  • 說明3:xargs 表示取出前面命令運行的結(jié)果,作為后面命令的輸入?yún)?shù)。
    2)增加腳本執(zhí)行權(quán)限
[yobhel@hadoop101 bin]$ chmod +x f1.sh

3)f1集群啟動腳本

[yobhel@hadoop101 module]$ f1.sh start

4)f1集群停止腳本

[yobhel@hadoop101 module]$ f1.sh stop
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容