4.1、文件配置
查詢JAVA_HOME: echo $JAVA_HOME
顯示/opt/module/jdk1.8.0_144 ?/opt/module/jdk1.8.0_144
安裝Flume
[itstar@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/
改名:
[itstar@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh
flume-env.sh涉及修改項(xiàng):
export JAVA_HOME=/opt/module/jdk1.8.0_144
4.2、案例
4.2.1、案例一:監(jiān)控端口數(shù)據(jù)
目標(biāo):Flume監(jiān)控一端Console,另一端Console發(fā)送消息,使被監(jiān)控端實(shí)時(shí)顯示。
分步實(shí)現(xiàn):
1) 安裝telnet工具
【聯(lián)網(wǎng)狀態(tài)】yum -y install telnet
【安裝完成】

2) 創(chuàng)建Flume Agent配置文件flume-telnet.conf
#定義Agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#定義source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata113
a1.sources.r1.port = 44445
# 定義sink
a1.sinks.k1.type = logger
# 定義memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 雙向鏈接
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
3) 判斷44444端口是否被占用
$ netstat -tunlp | grep 44445
4) 啟動(dòng)flume配置文件
/opt/module/flume-1.8.0/bin/flume-ng agent \
--conf /opt/module/flume-1.8.0/conf/ \
--name a1 \
--conf-file /opt/module/flume-1.8.0/jobconf/flume-telnet.conf \
-Dflume.root.logger==INFO,console
flume-ng 啟動(dòng)命令
--conf 配置所在的目錄
--name agent的名字
--conf-file 配置文件所在的路徑
-Dflume.root.logger==INFO,console 控制臺(tái)打印
5) 使用telnet工具向本機(jī)的44444端口發(fā)送內(nèi)容
$ telnet bigdata113 44445
4.2.2、案例二:實(shí)時(shí)讀取本地文件到HDFS
1) 創(chuàng)建flume-hdfs.conf文件
# 1 agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# 2 source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/Andy
a2.sources.r2.shell = /bin/bash -c
# 3 sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H
#上傳文件的前綴
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k2.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k2.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#設(shè)置文件類型,可支持壓縮
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k2.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k2.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k2.hdfs.minBlockReplicas = 1
# 定義 memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
#雙向鏈接channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
3) 執(zhí)行監(jiān)控配置
/opt/module/flume-1.8.0/bin/flume-ng agent \
--conf /opt/module/flume-1.8.0/conf/ \
--name a2 \
--conf-file /opt/module/flume-1.8.0/jobconf/flume-hdfs.conf
4.2.3、案例三:實(shí)時(shí)讀取目錄文件到HDFS
目標(biāo):使用flume監(jiān)聽整個(gè)目錄的文件
分步實(shí)現(xiàn):
1) 創(chuàng)建配置文件flume-dir.conf
#1 Agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3
#2 source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp結(jié)尾的文件,不上傳
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# 3 sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H
#上傳文件的前綴
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k3.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k3.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k3.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k3.hdfs.rollCount = 0
#最小副本數(shù)
a3.sinks.k3.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2) 執(zhí)行測(cè)試:執(zhí)行如下腳本后,請(qǐng)向upload文件夾中添加文件試試
/opt/module/flume1.8.0/bin/flume-ng agent \
--conf /opt/module/flume1.8.0/conf/ \
--name a3 \
--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf
尖叫提示:?在使用Spooling Directory Source時(shí)
1) 不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件
2) 上傳完成的文件會(huì)以.COMPLETED結(jié)尾
3) 被監(jiān)控文件夾每500毫秒掃描一次文件變動(dòng)
4.2.4、案例四:Flume與Flume之間數(shù)據(jù)傳遞:?jiǎn)?/b>Flume多Channel、Sink

目標(biāo):使用flume1監(jiān)控文件變動(dòng),flume1將變動(dòng)內(nèi)容傳遞給flume-2,flume-2負(fù)責(zé)存儲(chǔ)到HDFS。同時(shí)flume1將變動(dòng)內(nèi)容傳遞給flume-3,flume-3負(fù)責(zé)輸出到local
分步實(shí)現(xiàn):
1) 創(chuàng)建flume1.conf,用于監(jiān)控某文件的變動(dòng),同時(shí)產(chǎn)生兩個(gè)channel和兩個(gè)sink分別輸送給flume2和flume3:
# 1.agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 將數(shù)據(jù)流復(fù)制給多個(gè)channel
a1.sources.r1.selector.type = replicating
# 2.source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c
# 3.sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigdata111
a1.sinks.k2.port = 4142
# 4.channel—1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4.channel—2
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
2) 創(chuàng)建flume2.conf,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink,將數(shù)據(jù)輸送給hdfs:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = avro
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 4141
# 3 sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H
#上傳文件的前綴
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照時(shí)間滾動(dòng)文件夾
a2.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a2.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a2.sinks.k1.hdfs.rollCount = 0
#最小副本數(shù)
a2.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3) 創(chuàng)建flume3.conf,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink,將數(shù)據(jù)輸送給本地目錄:
#1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4142
#3 sink
a3.sinks.k1.type = file_roll
#備注:此處的文件夾需要先創(chuàng)建好
a3.sinks.k1.sink.directory = /opt/flume3
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
尖叫提示:輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄。
4) 執(zhí)行測(cè)試:分別開啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume1,flume-2,flume-3),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf
4.2.5、案例五:Flume與Flume之間數(shù)據(jù)傳遞,多Flume匯總數(shù)據(jù)到單Flume

目標(biāo):flume11監(jiān)控文件hive.log,flume-22監(jiān)控某一個(gè)端口的數(shù)據(jù)流,flume11與flume-22將數(shù)據(jù)發(fā)送給flume-33,flume33將最終數(shù)據(jù)寫入到HDFS。
分步實(shí)現(xiàn):
1) 創(chuàng)建flume11.conf,用于監(jiān)控hive.log文件,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.shell = /bin/bash -c
# 3 sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigdata111
a1.sinks.k1.port = 4141
# 4 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 5. Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 創(chuàng)建flume22.conf,用于監(jiān)控端口44444數(shù)據(jù)流,同時(shí)sink數(shù)據(jù)到flume-33:
# 1 agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# 2 source
a2.sources.r1.type = netcat
a2.sources.r1.bind = bigdata111
a2.sources.r1.port = 44444
#3 sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = bigdata111
a2.sinks.k1.port = 4141
# 4 channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# 5 Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
3) 創(chuàng)建flume33.conf,用于接收f(shuō)lume11與flume22發(fā)送過(guò)來(lái)的數(shù)據(jù)流,最終合并后sink到HDFS:
# 1 agent
a3.sources = r1
a3.sinks = k1
a3.channels = c1
# 2 source
a3.sources.r1.type = avro
a3.sources.r1.bind = bigdata111
a3.sources.r1.port = 4141
# 3 sink
a3.sinks.k1.type = hdfs
a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H
#上傳文件的前綴
a3.sinks.k1.hdfs.filePrefix = flume3-
#是否按照時(shí)間滾動(dòng)文件夾
a3.sinks.k1.hdfs.round = true
#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾
a3.sinks.k1.hdfs.roundValue = 1
#重新定義時(shí)間單位
a3.sinks.k1.hdfs.roundUnit = hour
#是否使用本地時(shí)間戳
a3.sinks.k1.hdfs.useLocalTimeStamp = true
#積攢多少個(gè)Event才flush到HDFS一次
a3.sinks.k1.hdfs.batchSize = 100
#設(shè)置文件類型,可支持壓縮
a3.sinks.k1.hdfs.fileType = DataStream
#多久生成一個(gè)新的文件
a3.sinks.k1.hdfs.rollInterval = 600
#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M
a3.sinks.k1.hdfs.rollSize = 134217700
#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)
a3.sinks.k1.hdfs.rollCount = 0
#最小冗余數(shù)
a3.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
# 5 Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
4) 執(zhí)行測(cè)試:分別開啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume-33,flume-22,flume11),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf
數(shù)據(jù)發(fā)送
[if !supportLists]1)?[endif]telnet bigdata111 44444 ???打開后發(fā)送5555555
[if !supportLists]2)?[endif]在/opt/Andy 中追加666666
4.2.6、案例六:Flume攔截器
時(shí)間戳攔截器
Timestamp.conf
#1.定義agent名,?source、channel、sink的名稱
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#2.具體定義source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload
#定義攔截器,為文件最后添加時(shí)間戳
a4.sources.r1.interceptors = i1
a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
#具體定義channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#具體定義sink
a4.sinks.k1.type = hdfs
a4.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume-interceptors/%H
a4.sinks.k1.hdfs.filePrefix = events-
a4.sinks.k1.hdfs.fileType = DataStream
#不按照條數(shù)生成文件
a4.sinks.k1.hdfs.rollCount = 0
#HDFS上的文件達(dá)到128M時(shí)生成一個(gè)文件
a4.sinks.k1.hdfs.rollSize = 134217728
#HDFS上的文件達(dá)到60秒生成一個(gè)文件
a4.sinks.k1.hdfs.rollInterval = 60
#組裝source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
啟動(dòng)命令
/opt/module/flume-1.8.0/bin/flume-ng agent -n a4 \
-f /opt/module/flume-1.8.0/jobconf/Timestamp.conf \
-c /opt/module/flume-1.8.0/conf \
-Dflume.root.logger=INFO,console
主機(jī)名攔截器
Host.conf
#1.定義agent
a1.sources= r1
a1.sinks = k1
a1.channels = c1
#2.定義source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
#攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host
#參數(shù)為true時(shí)用IP192.168.1.111,參數(shù)為false時(shí)用主機(jī)名,默認(rèn)為true
a1.sources.r1.interceptors.i1.useIP = false
a1.sources.r1.interceptors.i1.hostHeader = agentHost
?#3.定義sinks
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flumehost/%H
a1.sinks.k1.hdfs.filePrefix = Andy_%{agentHost}
#往生成的文件加后綴名.log
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)命令:
bin/flume-ng agent -c conf/ -f jobconf/Host.conf -n a1 -Dflume.root.logger=INFO,console
UUID攔截器
uuid.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#type的參數(shù)不能寫成uuid,得寫具體,否則找不到類
a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
#如果UUID頭已經(jīng)存在,它應(yīng)該保存
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.interceptors.i1.prefix = UUID_
#如果sink類型改為HDFS,那么在HDFS的文本中沒(méi)有headers的信息數(shù)據(jù)
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console
查詢替換攔截器
search.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace
#遇到數(shù)字改成itstar,A123會(huì)替換為Aitstar
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = itstar
a1.sources.r1.interceptors.i1.charset = UTF-8
#3 sink
a1.sinks.k1.type = logger
#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console
正則過(guò)濾攔截器
filter.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents設(shè)為false,表示過(guò)濾掉不是以A開頭的events。如果excludeEvents設(shè)為true,則表示過(guò)濾掉以A開頭的events。
a1.sources.r1.interceptors.i1.excludeEvents = true
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console
正則抽取攔截器
extractor.conf
#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?)?ip is?(.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = hostname
a1.sources.r1.interceptors.i1.serializers.s2.name = ip
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console
注:正則抽取攔截器的headers不會(huì)出現(xiàn)在文件名和文件內(nèi)容中
4.2.7、案例七:Flume自定義攔截器
字母小寫變大寫
1.Pom.xml
????<dependencies>
????????<!-- flume核心依賴?-->
????????<dependency>
????????????<groupId>org.apache.flume</groupId>
????????????<artifactId>flume-ng-core</artifactId>
????????????<version>1.8.0</version>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<!-- 打包插件?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-jar-plugin</artifactId>
????????????????<version>2.4</version>
????????????????<configuration>
????????????????????<archive>
????????????????????????<manifest>
????????????????????????????<addClasspath>true</addClasspath>
????????????????????????????<classpathPrefix>lib/</classpathPrefix>
????????????????????????????<mainClass></mainClass>
????????????????????????</manifest>
????????????????????</archive>
????????????????</configuration>
????????????</plugin>
????????????<!-- 編譯插件?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<configuration>
????????????????????<source>1.8</source>
????????????????????<target>1.8</target>
????????????????????<encoding>utf-8</encoding>
????????????????</configuration>
????????????</plugin>
????????</plugins>
????</build>
2.自定義實(shí)現(xiàn)攔截器
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
public class MyInterceptor implements Interceptor {
????@Override
????public void initialize() {
????}
????@Override
????public void close() {
????}
????/**
?????* 攔截source發(fā)送到通道channel中的消息
?????*
?????* @param event 接收過(guò)濾的event
?????* @return event ???根據(jù)業(yè)務(wù)處理后的event
?????*/
????@Override
????public Event intercept(Event event) {
????????// 獲取事件對(duì)象中的字節(jié)數(shù)據(jù)
????????byte[] arr = event.getBody();
????????// 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫
????????event.setBody(new String(arr).toUpperCase().getBytes());
????????// 返回到消息中
????????return event;
????}
????// 接收被過(guò)濾事件集合
????@Override
????public List<Event> intercept(List<Event> events) {
????????List<Event> list = new ArrayList<>();
????????for (Event event : events) {
????????????list.add(intercept(event));
????????}
????????return list;
????}
????public static class Builder implements Interceptor.Builder {
????????// 獲取配置文件的屬性
????????@Override
????????public Interceptor build() {
????????????return new MyInterceptor();
????????}
????????@Override
????????public void configure(Context context) {
????????}
????}
使用Maven做成Jar包,在flume的目錄下mkdir jar,上傳此jar到j(luò)ar目錄中
[if !supportLists]2.?[endif]Flume配置文件
ToUpCase.conf
#1.agent
a1.sources = r1
a1.sinks =k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Andy
a1.sources.r1.interceptors = i1
#全類名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /ToUpCase1
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件類型,默認(rèn)是?Sequencefile,可用?DataStream,則為普通文本
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
運(yùn)行命令:
bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console
4.2.8、案例八:Fulme自定義Source
[if !supportLists]1.?[endif]代碼:自定義實(shí)現(xiàn)記錄偏移量,從而斷點(diǎn)續(xù)傳
import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
?*
?* ?自定義source,記錄偏移量
?* ?flume的生命周期: 先執(zhí)行構(gòu)造器,再執(zhí)行 config方法 -> start方法-> processor.process
?* ?讀取配置文件:(配置讀取的文件內(nèi)容:讀取個(gè)文件,編碼及、偏移量寫到那個(gè)文件,多長(zhǎng)時(shí)間檢測(cè)一下文件是否有新內(nèi)容
?*
?*/
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {
????private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
????private String filePath;
????private String charset;
????private String positionFile;
????private long interval;
????private ExecutorService executor;
????private FileRunnable fileRunnable;
????/**
?????* 讀取配置文件(flume在執(zhí)行一次job時(shí)定義的配置文件)
?????* (如果在flume的job的配置文件中不修改,就是用這些默認(rèn)的配置)
?????*
?????* @param context
?????*/
????@Override
????public void configure(Context context) {
????????//讀取哪個(gè)文件
????????filePath = context.getString("filePath");
????????//默認(rèn)使用utf-8
????????charset = context.getString("charset", "UTF-8");
????????//把偏移量寫到哪
????????positionFile = context.getString("positionFile");
????????//指定默認(rèn)每個(gè)一秒 去查看一次是否有新的內(nèi)容
????????interval = context.getLong("interval", 1000L);
????}
????/**
?????* 創(chuàng)建一個(gè)線程來(lái)監(jiān)聽一個(gè)文件
?????*/
????@Override
????public synchronized void start() {
????????//創(chuàng)建一個(gè)單線程的線程池
????????executor = Executors.newSingleThreadExecutor();
????????//獲取一個(gè)ChannelProcessor
????????final ChannelProcessor channelProcessor = getChannelProcessor();
????????fileRunnable = new FileRunnable(filePath, charset, positionFile, interval, channelProcessor);
????????//提交到線程池中
????????executor.submit(fileRunnable);
????????//調(diào)用父類的方法
????????super.start();
????}
????@Override
????public synchronized void stop() {
????????//停止
????????fileRunnable.setFlag(false);
????????//停止線程池
????????executor.shutdown();
????????while (!executor.isTerminated()) {
????????????logger.debug("Waiting for filer exec executor service to stop");
????????????try {
????????????????//等500秒在停
????????????????executor.awaitTermination(500, TimeUnit.MILLISECONDS);
????????????} catch (InterruptedException e) {
????????????????logger.debug("InterutedExecption while waiting for exec executor service" +
????????????????????????" to stop . Just exiting");
????????????????e.printStackTrace();
????????????}
????????}
????????super.stop();
????}
????private static class FileRunnable implements Runnable {
????????private String charset;
????????private long interval;
????????private long offset = 0L;
????????private ChannelProcessor channelProcessor;
????????private RandomAccessFile raf;
????????private boolean flag = true;
????????private File posFile;
????????/*
????????先于run方法執(zhí)行,構(gòu)造器只執(zhí)行一次
????????先看看有沒(méi)有偏移量,如果有就接著讀,如果沒(méi)有就從頭開始讀
?????????*/
????????public FileRunnable(String filePath, String charset, String positionFile, long interval, ChannelProcessor channelProcessor) {
????????????this.charset = charset;
????????????this.interval = interval;
????????????this.channelProcessor = channelProcessor;
????????????//讀取偏移量, 在postionFile文件
????????????posFile = new File(positionFile);
????????????if (!posFile.exists()) {
????????????????//如果不存在就創(chuàng)建一個(gè)文件
????????????????try {
????????????????????posFile.createNewFile();
????????????????} catch (IOException e) {
????????????????????e.printStackTrace();
????????????????????logger.error("創(chuàng)建保存偏移量的文件失敗:", e);
????????????????}
????????????}
????????????try {
????????????????//讀取文件的偏移量
????????????????String offsetString = FileUtils.readFileToString(posFile);
????????????????//以前讀取過(guò)
????????????????if (!offsetString.isEmpty() && null != offsetString && !"".equals(offsetString)) {
????????????????????//把偏移量穿換成long類型
????????????????????offset = Long.parseLong(offsetString);
????????????????}
????????????????//按照指定的偏移量讀取數(shù)據(jù)
????????????????raf = new RandomAccessFile(filePath, "r");
????????????????//按照指定的偏移量讀取
????????????????raf.seek(offset);
????????????} catch (IOException e) {
????????????????logger.error("讀取保存偏移量文件時(shí)發(fā)生錯(cuò)誤", e);
????????????????e.printStackTrace();
????????????}
????????}
????????@Override
????????public void run() {
????????????while (flag) {
????????????????//讀取文件中的新數(shù)據(jù)
????????????????try {
????????????????????String line = raf.readLine();
????????????????????if (line != null) {
????????????????????????//有數(shù)據(jù)進(jìn)行處理,避免出現(xiàn)亂碼
????????????????????????line = new String(line.getBytes("iso8859-1"), charset);
????????????????????????channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
????????????????????????//獲取偏移量,更新偏移量
????????????????????????offset = raf.getFilePointer();
????????????????????????//將偏移量寫入到位置文件中
????????????????????????FileUtils.writeStringToFile(posFile, offset + "");
????????????????????} else {
????????????????????????//沒(méi)讀到睡一會(huì)兒
????????????????????????Thread.sleep(interval);
????????????????????}
????????????????????//發(fā)給channle
????????????????????//更新偏移量
????????????????????//每個(gè)時(shí)間間隔讀取一次
????????????????} catch (InterruptedException e) {
????????????????????e.printStackTrace();
????????????????????logger.error("read filethread Interrupted", e);
????????????????} catch (IOException e) {
????????????????????logger.error("read log file error", e);
????????????????}
????????????}
????????}
????????public void setFlag(boolean flag) {
????????????this.flag = flag;
????????}
????}
}??
[if !supportLists]2.?[endif]配置文件
#定義agent名, source、channel、sink的名稱
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具體定義source,這里的type是自定義的source的類的全路徑
a1.sources.r1.type = customSource.TailFileSource
#這里的參數(shù)名都和自定義類的參數(shù)一直
#讀取哪個(gè)文件
a1.sources.r1.filePath = /opt/Andy
#偏移量保存的文件
a1.sources.r1.positionFile = /opt/Cndy
#時(shí)間間隔,每隔多久讀取一次
a1.sources.r1.interval = 2000
#編碼
a1.sources.r1.charset = UTF-8
#具體定義channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#具體定義sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /opt/Bndy
#組裝source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
啟動(dòng)命令:
bin/flume-ng agent -n a1 -f jar/ConsumSource.conf -c conf/ -C jar/ConsumSource.jar -Dflume.root.logger=INFO,console
4.2.8、案例七:Flume對(duì)接kafka
配置flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
進(jìn)入flume根目錄下,啟動(dòng)flume
/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf
4.2.9、案例八:kafka對(duì)接Flume
kafka2flume.conf
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
# The channel can be defined as follows.
agent.sources.kafkaSource.channels = memoryChannel
agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181
agent.sources.kafkaSource.topic=calllog
#agent.sources.kafkaSource.groupId=flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms=100
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
agent.channels.memoryChannel.type=memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
# the sink of hdfs
agent.sinks.hdfsSink.type=hdfs
agent.sinks.hdfsSink.channel = memoryChannel
agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume
agent.sinks.hdfsSink.hdfs.writeFormat=Text
agent.sinks.hdfsSink.hdfs.fileType=DataStream
#這兩個(gè)不配置,會(huì)產(chǎn)生大量的小文件
agent.sinks.hdfsSink.hdfs.rollSize=0
agent.sinks.hdfsSink.hdfs.rollCount=0
啟動(dòng)命令
bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console
注意:這個(gè)配置是從kafka過(guò)數(shù)據(jù),但是需要重新向kafka的topic灌數(shù)據(jù),他才會(huì)傳到HDFS
4.3、Flume事物機(jī)制
一:Flume的事務(wù)機(jī)制
比如spooling directory source 為文件的每一行創(chuàng)建一個(gè)事件,一旦事務(wù)中所有的事件全部傳遞到channel且提交成功,那么source就將該文件標(biāo)記為完成。
同理,事務(wù)以類似的方式處理從channel到sink的傳遞過(guò)程,如果因?yàn)槟撤N 原因使得事件無(wú)法記錄,那么事務(wù)將會(huì)回滾。且所有的事件都會(huì)保持到channel中,等待重新傳遞。
二: ?Flume的At-least-once提交方式
???????Flume的事務(wù)機(jī)制,總的來(lái)說(shuō),保證了source產(chǎn)生的每個(gè)事件都會(huì)傳送到sink中。但是值得一說(shuō)的是,實(shí)際上Flume作為高容量并行采集系統(tǒng)采用的是At-least-once(傳統(tǒng)的企業(yè)系統(tǒng)采用的是exactly-once機(jī)制)提交方式,這樣就造成每個(gè)source產(chǎn)生的事件至少到達(dá)sink一次,換句話說(shuō)就是同一事件有可能重復(fù)到達(dá)。這樣雖然看上去是一個(gè)缺陷,但是相比為了保證Flume能夠可靠地將事件從source,channel傳遞到sink,這也是一個(gè)可以接受的權(quán)衡。如上博客中spooldir的使用,F(xiàn)lume會(huì)對(duì)已經(jīng)處理完的數(shù)據(jù)進(jìn)行標(biāo)記。
三:Flume的批處理機(jī)制
為了提高效率,F(xiàn)lume盡可能的以事務(wù)為單位來(lái)處理事件,而不是逐一基于事件進(jìn)行處理。比如提到的spooling directory source以100行文本作為一個(gè)批次來(lái)讀?。˙atchSize屬性來(lái)配置,類似數(shù)據(jù)庫(kù)的批處理模式)。批處理的設(shè)置尤其有利于提高file channle的效率,這樣整個(gè)事務(wù)只需要寫入一次本地磁盤,或者調(diào)用一次fsync,速度回快很多。
流處理語(yǔ)義
[if !supportLists]l?[endif]At most once(最多一次):每條數(shù)據(jù)記錄最多被處理一次,潛臺(tái)詞也表明數(shù)據(jù)會(huì)有丟失(沒(méi)被處理掉)的可能。
[if !supportLists]l?[endif]At least once(最少一次):每條數(shù)據(jù)記錄至少被處理一次。這個(gè)比上一點(diǎn)強(qiáng)的地方在于這里至少保證數(shù)據(jù)不會(huì)丟,至少被處理過(guò),唯一不足之處在于數(shù)據(jù)可能會(huì)被重復(fù)處理。
Exactly once(恰好一次):每條數(shù)據(jù)記錄正好被處理一次。沒(méi)有數(shù)據(jù)丟失,也沒(méi)有重復(fù)的數(shù)據(jù)處理。這一點(diǎn)是3個(gè)語(yǔ)義里要求最高的。