
1.日志采集Flume安裝
詳情見:http://www.itdecent.cn/p/a2590b997e8e?v=1698800791912
集群規(guī)劃:
| 服務(wù)器hadoop101 | 服務(wù)器hadoop102 | 服務(wù)器hadoop103 | |
|---|---|---|---|
| Flume(采集日志) | Flume | Flume |
2.項目經(jīng)驗之Flume組件選型
1)Source
(1)Taildir Source相比Exec Source、Spooling Directory Source的優(yōu)勢
TailDir Source:斷點續(xù)傳、多目錄。Flume1.6以前需要自己自定義Source記錄每次讀取文件位置,實現(xiàn)斷點續(xù)傳。
Exec Source可以實時搜集數(shù)據(jù),但是在Flume不運行或者Shell命令出錯的情況下,數(shù)據(jù)將會丟失。
Spooling Directory Source監(jiān)控目錄,支持?jǐn)帱c續(xù)傳。
(2)batchSize大小如何設(shè)置?
答:Event 1K左右時,500-1000合適(默認(rèn)為100)
2)Channel
采用Kafka Channel,省去了Sink,提高了效率。KafkaChannel數(shù)據(jù)存儲在Kafka里面,所以數(shù)據(jù)是存儲在磁盤中。
注意在Flume1.7以前,Kafka Channel很少有人使用,因為發(fā)現(xiàn)parseAsFlumeEvent這個配置起不了作用。也就是無論parseAsFlumeEvent配置為true還是false,都會轉(zhuǎn)為Flume Event。這樣的話,造成的結(jié)果是,會始終都把Flume的headers中的信息混合著內(nèi)容一起寫入Kafka的消息中,這顯然不是我所需要的,我只是需要把內(nèi)容寫入即可。
3.日志采集Flume配置

Flume直接讀log日志的數(shù)據(jù),log日志的格式是app.yyyy-MM-dd.log。
2)Flume的具體配置如下:
(1)在 /opt/module/flume 目錄下創(chuàng)建 job 目錄
[yobhel@hadoop101 flume]$ mkdir job
進入 job 目錄,創(chuàng)建 file-flume-kafka.conf 文件
[yobhel@hadoop101 job]$ vim file-flume-kafka.conf
在文件配置如下內(nèi)容
#為各組件命名
a1.sources = r1
a1.channels = c1
#描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data_mocker/log/app.*
a1.sources.r1.positionFile = /opt/module/data_mocker/log_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.yobhel.flume.interceptors.ETLInterceptor$Builder
#描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false
#綁定source和channel以及sink和channel的關(guān)系
a1.sources.r1.channels = c1
注意:com.yobhel.flume.interceptor.ETLInterceptor是自定義的攔截器的全類名。需要根據(jù)用戶自定義的攔截器做相應(yīng)修改。
4 Flume攔截器
https://github.com/Yobhel121/edu-flume-interceptor
1)需要先將打好的包放入到hadoop101的/opt/module/flume/lib文件夾下面。
[yobhel@hadoop101 lib]$ ls | grep edu-flume-interceptor
edu-flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
2)分發(fā)Flume到hadoop102、hadoop103
[yobhel@hadoop101 module]$ xsync flume/
3)在hadoop101上啟動Flume
[yobhel@hadoop101 flume]$ bin/flume-ng agent --name a1 --conf-file job/file-flume-kafka.conf &
5.日志采集Flume啟動停止腳本
1)在/home/yobhel/bin目錄下創(chuàng)建腳本f1.sh
[yobhel@hadoop101 bin]$ vim f1.sh
在腳本中填寫如下內(nèi)容
#! /bin/bash
case $1 in
"start"){
for i in hadoop101 hadoop102
do
echo " --------啟動 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/job/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop101 hadoop102
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done
};;
esac
- 說明1:nohup,該命令可以在你退出帳戶/關(guān)閉終端之后繼續(xù)運行相應(yīng)的進程。nohup就是不掛起的意思,不掛斷地運行命令。
- 說明2:awk 默認(rèn)分隔符為空格。
- 說明3:xargs 表示取出前面命令運行的結(jié)果,作為后面命令的輸入?yún)?shù)。
2)增加腳本執(zhí)行權(quán)限
[yobhel@hadoop101 bin]$ chmod +x f1.sh
3)f1集群啟動腳本
[yobhel@hadoop101 module]$ f1.sh start
4)f1集群停止腳本
[yobhel@hadoop101 module]$ f1.sh stop