二.Flume部署及使用

4.1、文件配置

查詢JAVA_HOME: echo $JAVA_HOME

顯示/opt/module/jdk1.8.0_144 ?/opt/module/jdk1.8.0_144

安裝Flume

[itstar@bigdata113 software]$ tar -zxvf apache-flume1.8.0-bin.tar.gz -C /opt/module/

改名:

[itstar@bigdata113 conf]$ mv flume-env.sh.template flume-env.sh

flume-env.sh涉及修改項(xiàng):

export JAVA_HOME=/opt/module/jdk1.8.0_144


4.2、案例

4.2.1、案例一:監(jiān)控端口數(shù)據(jù)

目標(biāo):Flume監(jiān)控一端Console,另一端Console發(fā)送消息,使被監(jiān)控端實(shí)時(shí)顯示。

分步實(shí)現(xiàn):

1) 安裝telnet工具

【聯(lián)網(wǎng)狀態(tài)】yum -y install telnet

【安裝完成】


2) 創(chuàng)建Flume Agent配置文件flume-telnet.conf

#定義Agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#定義source

a1.sources.r1.type = netcat

a1.sources.r1.bind = bigdata113

a1.sources.r1.port = 44445

# 定義sink

a1.sinks.k1.type = logger

# 定義memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 雙向鏈接

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

3) 判斷44444端口是否被占用

$ netstat -tunlp | grep 44445

4) 啟動(dòng)flume配置文件

/opt/module/flume-1.8.0/bin/flume-ng agent \

--conf /opt/module/flume-1.8.0/conf/ \

--name a1 \

--conf-file /opt/module/flume-1.8.0/jobconf/flume-telnet.conf \

-Dflume.root.logger==INFO,console

flume-ng 啟動(dòng)命令

--conf 配置所在的目錄

--name agent的名字

--conf-file 配置文件所在的路徑

-Dflume.root.logger==INFO,console 控制臺(tái)打印

5) 使用telnet工具向本機(jī)的44444端口發(fā)送內(nèi)容

$ telnet bigdata113 44445

4.2.2、案例二:實(shí)時(shí)讀取本地文件到HDFS

1) 創(chuàng)建flume-hdfs.conf文件

# 1 agent

a2.sources = r2

a2.sinks = k2

a2.channels = c2

# 2 source

a2.sources.r2.type = exec

a2.sources.r2.command = tail -F /opt/Andy

a2.sources.r2.shell = /bin/bash -c

# 3 sink

a2.sinks.k2.type = hdfs

a2.sinks.k2.hdfs.path = hdfs://bigdata111:9000/flume/%Y%m%d/%H

#上傳文件的前綴

a2.sinks.k2.hdfs.filePrefix = logs-

#是否按照時(shí)間滾動(dòng)文件夾

a2.sinks.k2.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a2.sinks.k2.hdfs.roundValue = 1

#重新定義時(shí)間單位

a2.sinks.k2.hdfs.roundUnit = hour

#是否使用本地時(shí)間戳

a2.sinks.k2.hdfs.useLocalTimeStamp = true

#積攢多少個(gè)Event才flush到HDFS一次

a2.sinks.k2.hdfs.batchSize = 1000

#設(shè)置文件類型,可支持壓縮

a2.sinks.k2.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件

a2.sinks.k2.hdfs.rollInterval = 600

#設(shè)置每個(gè)文件的滾動(dòng)大小

a2.sinks.k2.hdfs.rollSize = 134217700

#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)

a2.sinks.k2.hdfs.rollCount = 0

#最小副本數(shù)

a2.sinks.k2.hdfs.minBlockReplicas = 1

# 定義 memory

a2.channels.c2.type = memory

a2.channels.c2.capacity = 1000

a2.channels.c2.transactionCapacity = 100

#雙向鏈接channel

a2.sources.r2.channels = c2

a2.sinks.k2.channel = c2

3) 執(zhí)行監(jiān)控配置

/opt/module/flume-1.8.0/bin/flume-ng agent \

--conf /opt/module/flume-1.8.0/conf/ \

--name a2 \

--conf-file /opt/module/flume-1.8.0/jobconf/flume-hdfs.conf

4.2.3、案例三:實(shí)時(shí)讀取目錄文件到HDFS

目標(biāo):使用flume監(jiān)聽整個(gè)目錄的文件

分步實(shí)現(xiàn)

1) 創(chuàng)建配置文件flume-dir.conf

#1 Agent

a3.sources = r3

a3.sinks = k3

a3.channels = c3

#2 source

a3.sources.r3.type = spooldir

a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload

a3.sources.r3.fileSuffix = .COMPLETED

a3.sources.r3.fileHeader = true

#忽略所有以.tmp結(jié)尾的文件,不上傳

a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# 3 sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://bigdata111:9000/flume/%H

#上傳文件的前綴

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照時(shí)間滾動(dòng)文件夾

a3.sinks.k3.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a3.sinks.k3.hdfs.roundValue = 1

#重新定義時(shí)間單位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地時(shí)間戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#積攢多少個(gè)Event才flush到HDFS一次

a3.sinks.k3.hdfs.batchSize = 100

#設(shè)置文件類型,可支持壓縮

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件

a3.sinks.k3.hdfs.rollInterval = 600

#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)

a3.sinks.k3.hdfs.rollCount = 0

#最小副本數(shù)

a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2) 執(zhí)行測(cè)試:執(zhí)行如下腳本后,請(qǐng)向upload文件夾中添加文件試試

/opt/module/flume1.8.0/bin/flume-ng agent \

--conf /opt/module/flume1.8.0/conf/ \

--name a3 \

--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf

尖叫提示:?在使用Spooling Directory Source時(shí)

1) 不要在監(jiān)控目錄中創(chuàng)建并持續(xù)修改文件

2) 上傳完成的文件會(huì)以.COMPLETED結(jié)尾

3) 被監(jiān)控文件夾每500毫秒掃描一次文件變動(dòng)

4.2.4、案例四:FlumeFlume之間數(shù)據(jù)傳遞:?jiǎn)?/b>FlumeChannelSink


目標(biāo):使用flume1監(jiān)控文件變動(dòng),flume1將變動(dòng)內(nèi)容傳遞給flume-2,flume-2負(fù)責(zé)存儲(chǔ)到HDFS。同時(shí)flume1將變動(dòng)內(nèi)容傳遞給flume-3,flume-3負(fù)責(zé)輸出到local

分步實(shí)現(xiàn):

1) 創(chuàng)建flume1.conf,用于監(jiān)控某文件的變動(dòng),同時(shí)產(chǎn)生兩個(gè)channel和兩個(gè)sink分別輸送給flume2和flume3:

# 1.agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2

# 將數(shù)據(jù)流復(fù)制給多個(gè)channel

a1.sources.r1.selector.type = replicating

# 2.source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.shell = /bin/bash -c

# 3.sink1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = bigdata111

a1.sinks.k1.port = 4141

# sink2

a1.sinks.k2.type = avro

a1.sinks.k2.hostname = bigdata111

a1.sinks.k2.port = 4142

# 4.channel—1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 4.channel—2

a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1 c2

a1.sinks.k1.channel = c1

a1.sinks.k2.channel = c2

2) 創(chuàng)建flume2.conf,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink,將數(shù)據(jù)輸送給hdfs:

# 1 agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1

# 2 source

a2.sources.r1.type = avro

a2.sources.r1.bind = bigdata111

a2.sources.r1.port = 4141

# 3 sink

a2.sinks.k1.type = hdfs

a2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H

#上傳文件的前綴

a2.sinks.k1.hdfs.filePrefix = flume2-

#是否按照時(shí)間滾動(dòng)文件夾

a2.sinks.k1.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a2.sinks.k1.hdfs.roundValue = 1

#重新定義時(shí)間單位

a2.sinks.k1.hdfs.roundUnit = hour

#是否使用本地時(shí)間戳

a2.sinks.k1.hdfs.useLocalTimeStamp = true

#積攢多少個(gè)Event才flush到HDFS一次

a2.sinks.k1.hdfs.batchSize = 100

#設(shè)置文件類型,可支持壓縮

a2.sinks.k1.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件

a2.sinks.k1.hdfs.rollInterval = 600

#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M

a2.sinks.k1.hdfs.rollSize = 134217700

#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)

a2.sinks.k1.hdfs.rollCount = 0

#最小副本數(shù)

a2.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

#5 Bind

a2.sources.r1.channels = c1

a2.sinks.k1.channel = c1

3) 創(chuàng)建flume3.conf,用于接收f(shuō)lume1的event,同時(shí)產(chǎn)生1個(gè)channel和1個(gè)sink,將數(shù)據(jù)輸送給本地目錄:

#1 agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1

# 2 source

a3.sources.r1.type = avro

a3.sources.r1.bind = bigdata111

a3.sources.r1.port = 4142

#3 sink

a3.sinks.k1.type = file_roll

#備注:此處的文件夾需要先創(chuàng)建好

a3.sinks.k1.sink.directory = /opt/flume3

# 4 channel

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

# 5 Bind

a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

尖叫提示:輸出的本地目錄必須是已經(jīng)存在的目錄,如果該目錄不存在,并不會(huì)創(chuàng)建新的目錄。

4) 執(zhí)行測(cè)試:分別開啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume1,flume-2,flume-3),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume1.conf

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume2.conf

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume3.conf

4.2.5、案例五:FlumeFlume之間數(shù)據(jù)傳遞,多Flume匯總數(shù)據(jù)到單Flume


目標(biāo):flume11監(jiān)控文件hive.log,flume-22監(jiān)控某一個(gè)端口的數(shù)據(jù)流,flume11與flume-22將數(shù)據(jù)發(fā)送給flume-33,flume33將最終數(shù)據(jù)寫入到HDFS。

分步實(shí)現(xiàn):

1) 創(chuàng)建flume11.conf,用于監(jiān)控hive.log文件,同時(shí)sink數(shù)據(jù)到flume-33:

# 1 agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 2 source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.shell = /bin/bash -c

# 3 sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = bigdata111

a1.sinks.k1.port = 4141

# 4 channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# 5. Bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2) 創(chuàng)建flume22.conf,用于監(jiān)控端口44444數(shù)據(jù)流,同時(shí)sink數(shù)據(jù)到flume-33:

# 1 agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1

# 2 source

a2.sources.r1.type = netcat

a2.sources.r1.bind = bigdata111

a2.sources.r1.port = 44444

#3 sink

a2.sinks.k1.type = avro

a2.sinks.k1.hostname = bigdata111

a2.sinks.k1.port = 4141

# 4 channel

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

# 5 Bind

a2.sources.r1.channels = c1

a2.sinks.k1.channel = c1

3) 創(chuàng)建flume33.conf,用于接收f(shuō)lume11與flume22發(fā)送過(guò)來(lái)的數(shù)據(jù)流,最終合并后sink到HDFS:

# 1 agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1

# 2 source

a3.sources.r1.type = avro

a3.sources.r1.bind = bigdata111

a3.sources.r1.port = 4141

# 3 sink

a3.sinks.k1.type = hdfs

a3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H

#上傳文件的前綴

a3.sinks.k1.hdfs.filePrefix = flume3-

#是否按照時(shí)間滾動(dòng)文件夾

a3.sinks.k1.hdfs.round = true

#多少時(shí)間單位創(chuàng)建一個(gè)新的文件夾

a3.sinks.k1.hdfs.roundValue = 1

#重新定義時(shí)間單位

a3.sinks.k1.hdfs.roundUnit = hour

#是否使用本地時(shí)間戳

a3.sinks.k1.hdfs.useLocalTimeStamp = true

#積攢多少個(gè)Event才flush到HDFS一次

a3.sinks.k1.hdfs.batchSize = 100

#設(shè)置文件類型,可支持壓縮

a3.sinks.k1.hdfs.fileType = DataStream

#多久生成一個(gè)新的文件

a3.sinks.k1.hdfs.rollInterval = 600

#設(shè)置每個(gè)文件的滾動(dòng)大小大概是128M

a3.sinks.k1.hdfs.rollSize = 134217700

#文件的滾動(dòng)與Event數(shù)量無(wú)關(guān)

a3.sinks.k1.hdfs.rollCount = 0

#最小冗余數(shù)

a3.sinks.k1.hdfs.minBlockReplicas = 1

# 4 channel

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

# 5 Bind

a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

4) 執(zhí)行測(cè)試:分別開啟對(duì)應(yīng)flume-job(依次啟動(dòng)flume-33,flume-22,flume11),同時(shí)產(chǎn)生文件變動(dòng)并觀察結(jié)果:

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf

數(shù)據(jù)發(fā)送

[if !supportLists]1)?[endif]telnet bigdata111 44444 ???打開后發(fā)送5555555

[if !supportLists]2)?[endif]在/opt/Andy 中追加666666

4.2.6、案例六:Flume攔截器

時(shí)間戳攔截器

Timestamp.conf

#1.定義agent名,?source、channel、sink的名稱

a4.sources = r1

a4.channels = c1

a4.sinks = k1

#2.具體定義source

a4.sources.r1.type = spooldir

a4.sources.r1.spoolDir = /opt/module/flume-1.8.0/upload

#定義攔截器,為文件最后添加時(shí)間戳

a4.sources.r1.interceptors = i1

a4.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#具體定義channel

a4.channels.c1.type = memory

a4.channels.c1.capacity = 10000

a4.channels.c1.transactionCapacity = 100

#具體定義sink

a4.sinks.k1.type = hdfs

a4.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume-interceptors/%H

a4.sinks.k1.hdfs.filePrefix = events-

a4.sinks.k1.hdfs.fileType = DataStream

#不按照條數(shù)生成文件

a4.sinks.k1.hdfs.rollCount = 0

#HDFS上的文件達(dá)到128M時(shí)生成一個(gè)文件

a4.sinks.k1.hdfs.rollSize = 134217728

#HDFS上的文件達(dá)到60秒生成一個(gè)文件

a4.sinks.k1.hdfs.rollInterval = 60

#組裝source、channel、sink

a4.sources.r1.channels = c1

a4.sinks.k1.channel = c1

啟動(dòng)命令

/opt/module/flume-1.8.0/bin/flume-ng agent -n a4 \

-f /opt/module/flume-1.8.0/jobconf/Timestamp.conf \

-c /opt/module/flume-1.8.0/conf \

-Dflume.root.logger=INFO,console


主機(jī)名攔截器

Host.conf

#1.定義agent

a1.sources= r1

a1.sinks = k1

a1.channels = c1

#2.定義source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/Andy

#攔截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = host

#參數(shù)為true時(shí)用IP192.168.1.111,參數(shù)為false時(shí)用主機(jī)名,默認(rèn)為true

a1.sources.r1.interceptors.i1.useIP = false

a1.sources.r1.interceptors.i1.hostHeader = agentHost

?#3.定義sinks

a1.sinks.k1.type=hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flumehost/%H

a1.sinks.k1.hdfs.filePrefix = Andy_%{agentHost}

#往生成的文件加后綴名.log

a1.sinks.k1.hdfs.fileSuffix = .log

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text

a1.sinks.k1.hdfs.rollInterval = 10

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動(dòng)命令:

bin/flume-ng agent -c conf/ -f jobconf/Host.conf -n a1 -Dflume.root.logger=INFO,console

UUID攔截器

uuid.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.interceptors = i1

#type的參數(shù)不能寫成uuid,得寫具體,否則找不到類

a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

#如果UUID頭已經(jīng)存在,它應(yīng)該保存

a1.sources.r1.interceptors.i1.preserveExisting = true

a1.sources.r1.interceptors.i1.prefix = UUID_

#如果sink類型改為HDFS,那么在HDFS的文本中沒(méi)有headers的信息數(shù)據(jù)

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

# bin/flume-ng agent -c conf/ -f jobconf/uuid.conf -n a1 -Dflume.root.logger==INFO,console

查詢替換攔截器

search.conf

#1 agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#2 source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = search_replace

#遇到數(shù)字改成itstar,A123會(huì)替換為Aitstar

a1.sources.r1.interceptors.i1.searchPattern = [0-9]+

a1.sources.r1.interceptors.i1.replaceString = itstar

a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink

a1.sinks.k1.type = logger

#4 Chanel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#5 bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

# bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console

正則過(guò)濾攔截器

filter.conf

#1 agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#2 source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = regex_filter

a1.sources.r1.interceptors.i1.regex = ^A.*

#如果excludeEvents設(shè)為false,表示過(guò)濾掉不是以A開頭的events。如果excludeEvents設(shè)為true,則表示過(guò)濾掉以A開頭的events。

a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

# bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console

正則抽取攔截器

extractor.conf

#1 agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#2 source

a1.sources.r1.type = exec

a1.sources.r1.channels = c1

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = regex_extractor

a1.sources.r1.interceptors.i1.regex = hostname is (.*?)?ip is?(.*)

a1.sources.r1.interceptors.i1.serializers = s1 s2

a1.sources.r1.interceptors.i1.serializers.s1.name = hostname

a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

# bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console

注:正則抽取攔截器的headers不會(huì)出現(xiàn)在文件名和文件內(nèi)容中


4.2.7、案例七:Flume自定義攔截器

字母小寫變大寫

1.Pom.xml

????<dependencies>

????????<!-- flume核心依賴?-->

????????<dependency>

????????????<groupId>org.apache.flume</groupId>

????????????<artifactId>flume-ng-core</artifactId>

????????????<version>1.8.0</version>

????????</dependency>

????</dependencies>

????<build>

????????<plugins>

????????????<!-- 打包插件?-->

????????????<plugin>

????????????????<groupId>org.apache.maven.plugins</groupId>

????????????????<artifactId>maven-jar-plugin</artifactId>

????????????????<version>2.4</version>

????????????????<configuration>

????????????????????<archive>

????????????????????????<manifest>

????????????????????????????<addClasspath>true</addClasspath>

????????????????????????????<classpathPrefix>lib/</classpathPrefix>

????????????????????????????<mainClass></mainClass>

????????????????????????</manifest>

????????????????????</archive>

????????????????</configuration>

????????????</plugin>

????????????<!-- 編譯插件?-->

????????????<plugin>

????????????????<groupId>org.apache.maven.plugins</groupId>

????????????????<artifactId>maven-compiler-plugin</artifactId>

????????????????<configuration>

????????????????????<source>1.8</source>

????????????????????<target>1.8</target>

????????????????????<encoding>utf-8</encoding>

????????????????</configuration>

????????????</plugin>

????????</plugins>

????</build>

2.自定義實(shí)現(xiàn)攔截器

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;

import java.util.List;

public class MyInterceptor implements Interceptor {

????@Override

????public void initialize() {

????}

????@Override

????public void close() {

????}

????/**

?????* 攔截source發(fā)送到通道channel中的消息

?????*

?????* @param event 接收過(guò)濾的event

?????* @return event ???根據(jù)業(yè)務(wù)處理后的event

?????*/

????@Override

????public Event intercept(Event event) {

????????// 獲取事件對(duì)象中的字節(jié)數(shù)據(jù)

????????byte[] arr = event.getBody();

????????// 將獲取的數(shù)據(jù)轉(zhuǎn)換成大寫

????????event.setBody(new String(arr).toUpperCase().getBytes());

????????// 返回到消息中

????????return event;

????}

????// 接收被過(guò)濾事件集合

????@Override

????public List<Event> intercept(List<Event> events) {

????????List<Event> list = new ArrayList<>();

????????for (Event event : events) {

????????????list.add(intercept(event));

????????}

????????return list;

????}

????public static class Builder implements Interceptor.Builder {

????????// 獲取配置文件的屬性

????????@Override

????????public Interceptor build() {

????????????return new MyInterceptor();

????????}

????????@Override

????????public void configure(Context context) {

????????}

????}

使用Maven做成Jar包,在flume的目錄下mkdir jar,上傳此jar到j(luò)ar目錄中

[if !supportLists]2.?[endif]Flume配置文件

ToUpCase.conf

#1.agent

a1.sources = r1

a1.sinks =k1

a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.interceptors = i1

#全類名$Builder

a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = /ToUpCase1

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.rollInterval = 3

a1.sinks.k1.hdfs.rollSize = 20

a1.sinks.k1.hdfs.rollCount = 5

a1.sinks.k1.hdfs.batchSize = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件類型,默認(rèn)是?Sequencefile,可用?DataStream,則為普通文本

a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

運(yùn)行命令:

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/Flume-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

4.2.8、案例八:Fulme自定義Source

[if !supportLists]1.?[endif]代碼:自定義實(shí)現(xiàn)記錄偏移量,從而斷點(diǎn)續(xù)傳

import org.apache.commons.io.FileUtils;

import org.apache.flume.Context;

import org.apache.flume.EventDrivenSource;

import org.apache.flume.channel.ChannelProcessor;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.EventBuilder;

import org.apache.flume.source.AbstractSource;

import org.apache.flume.source.ExecSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.io.File;

import java.io.IOException;

import java.io.RandomAccessFile;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

/**

?*

?* ?自定義source,記錄偏移量

?* ?flume的生命周期: 先執(zhí)行構(gòu)造器,再執(zhí)行 config方法 -> start方法-> processor.process

?* ?讀取配置文件:(配置讀取的文件內(nèi)容:讀取個(gè)文件,編碼及、偏移量寫到那個(gè)文件,多長(zhǎng)時(shí)間檢測(cè)一下文件是否有新內(nèi)容

?*

?*/

public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {

????private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);

????private String filePath;

????private String charset;

????private String positionFile;

????private long interval;

????private ExecutorService executor;

????private FileRunnable fileRunnable;

????/**

?????* 讀取配置文件(flume在執(zhí)行一次job時(shí)定義的配置文件)

?????* (如果在flume的job的配置文件中不修改,就是用這些默認(rèn)的配置)

?????*

?????* @param context

?????*/

????@Override

????public void configure(Context context) {

????????//讀取哪個(gè)文件

????????filePath = context.getString("filePath");

????????//默認(rèn)使用utf-8

????????charset = context.getString("charset", "UTF-8");

????????//把偏移量寫到哪

????????positionFile = context.getString("positionFile");

????????//指定默認(rèn)每個(gè)一秒 去查看一次是否有新的內(nèi)容

????????interval = context.getLong("interval", 1000L);

????}

????/**

?????* 創(chuàng)建一個(gè)線程來(lái)監(jiān)聽一個(gè)文件

?????*/

????@Override

????public synchronized void start() {

????????//創(chuàng)建一個(gè)單線程的線程池

????????executor = Executors.newSingleThreadExecutor();

????????//獲取一個(gè)ChannelProcessor

????????final ChannelProcessor channelProcessor = getChannelProcessor();

????????fileRunnable = new FileRunnable(filePath, charset, positionFile, interval, channelProcessor);

????????//提交到線程池中

????????executor.submit(fileRunnable);

????????//調(diào)用父類的方法

????????super.start();

????}

????@Override

????public synchronized void stop() {

????????//停止

????????fileRunnable.setFlag(false);

????????//停止線程池

????????executor.shutdown();

????????while (!executor.isTerminated()) {

????????????logger.debug("Waiting for filer exec executor service to stop");

????????????try {

????????????????//等500秒在停

????????????????executor.awaitTermination(500, TimeUnit.MILLISECONDS);

????????????} catch (InterruptedException e) {

????????????????logger.debug("InterutedExecption while waiting for exec executor service" +

????????????????????????" to stop . Just exiting");

????????????????e.printStackTrace();

????????????}

????????}

????????super.stop();

????}

????private static class FileRunnable implements Runnable {

????????private String charset;

????????private long interval;

????????private long offset = 0L;

????????private ChannelProcessor channelProcessor;

????????private RandomAccessFile raf;

????????private boolean flag = true;

????????private File posFile;

????????/*

????????先于run方法執(zhí)行,構(gòu)造器只執(zhí)行一次

????????先看看有沒(méi)有偏移量,如果有就接著讀,如果沒(méi)有就從頭開始讀

?????????*/

????????public FileRunnable(String filePath, String charset, String positionFile, long interval, ChannelProcessor channelProcessor) {

????????????this.charset = charset;

????????????this.interval = interval;

????????????this.channelProcessor = channelProcessor;

????????????//讀取偏移量, 在postionFile文件

????????????posFile = new File(positionFile);

????????????if (!posFile.exists()) {

????????????????//如果不存在就創(chuàng)建一個(gè)文件

????????????????try {

????????????????????posFile.createNewFile();

????????????????} catch (IOException e) {

????????????????????e.printStackTrace();

????????????????????logger.error("創(chuàng)建保存偏移量的文件失敗:", e);

????????????????}

????????????}

????????????try {

????????????????//讀取文件的偏移量

????????????????String offsetString = FileUtils.readFileToString(posFile);

????????????????//以前讀取過(guò)

????????????????if (!offsetString.isEmpty() && null != offsetString && !"".equals(offsetString)) {

????????????????????//把偏移量穿換成long類型

????????????????????offset = Long.parseLong(offsetString);

????????????????}

????????????????//按照指定的偏移量讀取數(shù)據(jù)

????????????????raf = new RandomAccessFile(filePath, "r");

????????????????//按照指定的偏移量讀取

????????????????raf.seek(offset);

????????????} catch (IOException e) {

????????????????logger.error("讀取保存偏移量文件時(shí)發(fā)生錯(cuò)誤", e);

????????????????e.printStackTrace();

????????????}

????????}

????????@Override

????????public void run() {

????????????while (flag) {

????????????????//讀取文件中的新數(shù)據(jù)

????????????????try {

????????????????????String line = raf.readLine();

????????????????????if (line != null) {

????????????????????????//有數(shù)據(jù)進(jìn)行處理,避免出現(xiàn)亂碼

????????????????????????line = new String(line.getBytes("iso8859-1"), charset);

????????????????????????channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));

????????????????????????//獲取偏移量,更新偏移量

????????????????????????offset = raf.getFilePointer();

????????????????????????//將偏移量寫入到位置文件中

????????????????????????FileUtils.writeStringToFile(posFile, offset + "");

????????????????????} else {

????????????????????????//沒(méi)讀到睡一會(huì)兒

????????????????????????Thread.sleep(interval);

????????????????????}

????????????????????//發(fā)給channle

????????????????????//更新偏移量

????????????????????//每個(gè)時(shí)間間隔讀取一次

????????????????} catch (InterruptedException e) {

????????????????????e.printStackTrace();

????????????????????logger.error("read filethread Interrupted", e);

????????????????} catch (IOException e) {

????????????????????logger.error("read log file error", e);

????????????????}

????????????}

????????}

????????public void setFlag(boolean flag) {

????????????this.flag = flag;

????????}

????}

}??

[if !supportLists]2.?[endif]配置文件

#定義agent名, source、channel、sink的名稱

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#具體定義source,這里的type是自定義的source的類的全路徑

a1.sources.r1.type = customSource.TailFileSource

#這里的參數(shù)名都和自定義類的參數(shù)一直

#讀取哪個(gè)文件

a1.sources.r1.filePath = /opt/Andy

#偏移量保存的文件

a1.sources.r1.positionFile = /opt/Cndy

#時(shí)間間隔,每隔多久讀取一次

a1.sources.r1.interval = 2000

#編碼

a1.sources.r1.charset = UTF-8

#具體定義channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#具體定義sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.sink.directory = /opt/Bndy

#組裝source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

啟動(dòng)命令:

bin/flume-ng agent -n a1 -f jar/ConsumSource.conf -c conf/ -C jar/ConsumSource.jar -Dflume.root.logger=INFO,console


4.2.8、案例七:Flume對(duì)接kafka

配置flume(flume-kafka.conf)

# define

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv

a1.sources.r1.shell = /bin/bash -c

# sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.brokerList = bigdata111:9092,bigdata112:9092,bigdata113:9092

a1.sinks.k1.topic = calllog

a1.sinks.k1.batchSize = 20

a1.sinks.k1.requiredAcks = 1

# channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

進(jìn)入flume根目錄下,啟動(dòng)flume

/opt/module/flume-1.8.0/bin/flume-ng agent --conf /opt/module/flume-1.8.0/conf/ --name a1 --conf-file /opt/jars/flume2kafka.conf

4.2.9、案例八:kafka對(duì)接Flume

kafka2flume.conf

agent.sources = kafkaSource

agent.channels = memoryChannel

agent.sinks = hdfsSink

# The channel can be defined as follows.

agent.sources.kafkaSource.channels = memoryChannel

agent.sources.kafkaSource.type=org.apache.flume.source.kafka.KafkaSource

agent.sources.kafkaSource.zookeeperConnect=bigdata111:2181,bigdata112:2181,bigdata113:2181

agent.sources.kafkaSource.topic=calllog

#agent.sources.kafkaSource.groupId=flume

agent.sources.kafkaSource.kafka.consumer.timeout.ms=100

agent.channels.memoryChannel.type=memory

agent.channels.memoryChannel.capacity=10000

agent.channels.memoryChannel.transactionCapacity=1000

agent.channels.memoryChannel.type=memory

agent.channels.memoryChannel.capacity=10000

agent.channels.memoryChannel.transactionCapacity=1000

# the sink of hdfs

agent.sinks.hdfsSink.type=hdfs

agent.sinks.hdfsSink.channel = memoryChannel

agent.sinks.hdfsSink.hdfs.path=hdfs://bigdata111:9000/kafka2flume

agent.sinks.hdfsSink.hdfs.writeFormat=Text

agent.sinks.hdfsSink.hdfs.fileType=DataStream

#這兩個(gè)不配置,會(huì)產(chǎn)生大量的小文件

agent.sinks.hdfsSink.hdfs.rollSize=0

agent.sinks.hdfsSink.hdfs.rollCount=0

啟動(dòng)命令

bin/flume-ng agent --conf conf --conf-file jobconf/kafka2flume.conf --name agent -Dflume.root.logger=INFO,console

注意:這個(gè)配置是從kafka過(guò)數(shù)據(jù),但是需要重新向kafkatopic灌數(shù)據(jù),他才會(huì)傳到HDFS

4.3、Flume事物機(jī)制

一:Flume的事務(wù)機(jī)制

比如spooling directory source 為文件的每一行創(chuàng)建一個(gè)事件,一旦事務(wù)中所有的事件全部傳遞到channel且提交成功,那么source就將該文件標(biāo)記為完成。

同理,事務(wù)以類似的方式處理從channel到sink的傳遞過(guò)程,如果因?yàn)槟撤N 原因使得事件無(wú)法記錄,那么事務(wù)將會(huì)回滾。且所有的事件都會(huì)保持到channel中,等待重新傳遞。

二: ?Flume的At-least-once提交方式

???????Flume的事務(wù)機(jī)制,總的來(lái)說(shuō),保證了source產(chǎn)生的每個(gè)事件都會(huì)傳送到sink中。但是值得一說(shuō)的是,實(shí)際上Flume作為高容量并行采集系統(tǒng)采用的是At-least-once(傳統(tǒng)的企業(yè)系統(tǒng)采用的是exactly-once機(jī)制)提交方式,這樣就造成每個(gè)source產(chǎn)生的事件至少到達(dá)sink一次,換句話說(shuō)就是同一事件有可能重復(fù)到達(dá)。這樣雖然看上去是一個(gè)缺陷,但是相比為了保證Flume能夠可靠地將事件從source,channel傳遞到sink,這也是一個(gè)可以接受的權(quán)衡。如上博客中spooldir的使用,F(xiàn)lume會(huì)對(duì)已經(jīng)處理完的數(shù)據(jù)進(jìn)行標(biāo)記。

三:Flume的批處理機(jī)制

為了提高效率,F(xiàn)lume盡可能的以事務(wù)為單位來(lái)處理事件,而不是逐一基于事件進(jìn)行處理。比如提到的spooling directory source以100行文本作為一個(gè)批次來(lái)讀?。˙atchSize屬性來(lái)配置,類似數(shù)據(jù)庫(kù)的批處理模式)。批處理的設(shè)置尤其有利于提高file channle的效率,這樣整個(gè)事務(wù)只需要寫入一次本地磁盤,或者調(diào)用一次fsync,速度回快很多。

流處理語(yǔ)義

[if !supportLists]l?[endif]At most once(最多一次):每條數(shù)據(jù)記錄最多被處理一次,潛臺(tái)詞也表明數(shù)據(jù)會(huì)有丟失(沒(méi)被處理掉)的可能。

[if !supportLists]l?[endif]At least once(最少一次):每條數(shù)據(jù)記錄至少被處理一次。這個(gè)比上一點(diǎn)強(qiáng)的地方在于這里至少保證數(shù)據(jù)不會(huì)丟,至少被處理過(guò),唯一不足之處在于數(shù)據(jù)可能會(huì)被重復(fù)處理。

Exactly once(恰好一次):每條數(shù)據(jù)記錄正好被處理一次。沒(méi)有數(shù)據(jù)丟失,也沒(méi)有重復(fù)的數(shù)據(jù)處理。這一點(diǎn)是3個(gè)語(yǔ)義里要求最高的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容