核心概念
- Agent:使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。
Client:生產(chǎn)數(shù)據(jù),運行在一個獨立的線程。 - Source:從Client專門用來收集數(shù)據(jù),傳遞給Channel,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
- Sink:從Channel收集數(shù)據(jù),運行在一個獨立線程,sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。
- Channel:連接 sources 和 sinks ,這個有點像一個隊列,source組件把數(shù)據(jù)收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進行簡單的緩存,可以存放在memory、jdbc、file等等。
- Events:可以是日志記錄、 avro 對象等。
Agent的概念
Flume以agent為最小的獨立運行單位。一個agent就是一個JVM,agent本身是一個Java進程,運行在日志收集節(jié)點—所謂日志收集節(jié)點就是服務(wù)器節(jié)點。
單agent由Source、Sink和Channel三大組件構(gòu)成,類似生產(chǎn)者、倉庫、消費者的架構(gòu).如下圖:
[站外圖片上傳中...(image-64c038-1541939331883)]
a single node flume
NetCat Source:監(jiān)聽一個指定的網(wǎng)絡(luò)端口,即只要應用程序向這個端口里面寫數(shù)據(jù),這個source組件就可以獲取到信息。
在/home/hadoop/script/flume下新建配置文件a-single-node.conf,配置文件如下:
#a-single-node.conf : A single node flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
保存之后運行,執(zhí)行命令:
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/a-single-node.conf \
-Dflume.root.logger=INFO,console
參數(shù)說明:
-n 指定agent名稱(與配置文件中代理的名字相同)
-c 指定flume中配置文件的目錄
-f 指定配置文件
-Dflume.root.logger=DEBUG,console 設(shè)置日志等級
通過telnet監(jiān)聽端口:
telnet localhost 44444
輸入任意數(shù)據(jù),在flume中可以看到輸出:
18/08/02 15:25:29 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D abc. }
采集指定文件數(shù)據(jù)存入到hdfs
source-channel-sink :exec - memory - hdfs
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
# 監(jiān)聽文件路徑
a1.sources.r1.command = tail -F /home/hadoop/data/flume/logs/access.log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
# hdfs路徑
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/tail
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.batchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
缺少這個配置的時候
a1.sinks.k1.hdfs.useLocalTimeStamp=true
會出現(xiàn)異常
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
原因是因為寫入到hfds時使用到了時間戳來區(qū)分目錄結(jié)構(gòu),flume的消息組件event在接受到之后在header中沒有發(fā)現(xiàn)時間戳參數(shù),導致該錯誤發(fā)生,有三種方法可以解決這個錯誤:
- agent1.sources.source1.interceptors = t1
agent1.sources.source1.interceptors.t1.type = timestamp 為source添加攔截,每條event頭中加入時間戳;(效率會慢一些) - agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 為sink指定該參數(shù)為true (如果客戶端和flume集群時間不一致數(shù)據(jù)時間會不準確)
- 在向source發(fā)送event時,將時間戳參數(shù)添加到event的header中即可,header是一個map,添加時mapkey為timestamp(推薦使用)
采集指定文件夾的內(nèi)容到控制臺
source - channel - sink :spooling - memory - logger
目錄下的文件如果已經(jīng)讀取完畢會增加后綴.COMPELETE,且文件名不能相同
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/temp
a1.sources.r1.fileHeader=true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
小文件問題
案例:采集指定文件夾內(nèi)容到hdfs taildir - memory - hdfs
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.hdfs.batchSize=15
#關(guān)鍵參數(shù) 三個是或的關(guān)系 滿足一個就會roll
a1.sinks.k1.hdfs.rollInterval= 0 #按時間 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollSize= 500 #按大小 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollCount = 0 #按記錄數(shù) 0為參數(shù)不生效
a1.sinks.k1.hdfs.useLocalTimeStamp=true
# Bind the source and sink to the channel
a1.sources.r1.channels =
多個channel

一個channel對應輸出到日志的sink,另外一個對應寫入到Hdfs的sink
配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
# Use a channel which buffers events in memory
a1.channels.c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type=memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Describe the sink
a1.sinks.k1.type = logger
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 100
# Bind the source and sink to the channel
a1.sources.r1.channels =c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
sink processor
主要包含兩種方式:failover和load_balance
- failover:Failover Sink Processor維護了一個sink的優(yōu)先級列表,具有故障轉(zhuǎn)移的功能,每個sink都有一個權(quán)值用于表示自己的優(yōu)先級,優(yōu)先級值高Sink會更早被激活。值越大,優(yōu)先級越高。表示優(yōu)先級的權(quán)值不能相同。
- load_balance:按照一定的算法選擇sink輸出到指定地方,如果在文件輸出量很大的情況下,負載均衡還是很有必要的,通過多個通道輸出緩解輸出壓力,flume內(nèi)置的負載均衡的算法默認是round robin(輪詢算法),還有一個random(隨機算法)。
failover 配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9001/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
load_balance配置如下(更改負載均衡策略進行測試):
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
avro source和avro sink
該案例需要用到兩個agent,一個作為數(shù)據(jù)源:產(chǎn)生數(shù)據(jù),一個作為數(shù)據(jù)接收端:接收數(shù)據(jù)
數(shù)據(jù)源agent配置如下:
# Name the components on this agent
avro-source-agent.sources = exec-source
avro-source-agent.sinks = avro-sink
avro-source-agent.channels = avro-memory-channel
# Describe/configure the source
avro-source-agent.sources.exec-source.type = exec
avro-source-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/logs/access.log
# Use a channel which buffers events in memory
avro-source-agent.channels.avro-memory-channel.type = memory
avro-source-agent.channels.avro-memory-channel.capacity = 1000
avro-source-agent.channels.avro-memory-channel.transactionCapacity = 100
avro-source-agent.sinks.avro-sink.type=avro
avro-source-agent.sinks.avro-sink.hostname=hadoop002
avro-source-agent.sinks.avro-sink.port=44444
# Bind the source and sink to the channel
avro-source-agent.sources.exec-source.channels = avro-memory-channel
avro-source-agent.sinks.avro-sink.channel = avro-memory-channel
數(shù)據(jù)接收端配置如下:
# Name the components on this agent
avro-sink-agent.sources = avro-source
avro-sink-agent.sinks = avro-logger
avro-sink-agent.channels = avro-memory-channel
# Describe/configure the source
avro-sink-agent.sources.avro-source.type = avro
avro-sink-agent.sources.avro-source.bind = hadoop002
avro-sink-agent.sources.avro-source.port = 44444
# Use a channel which buffers events in memory
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.channels.avro-memory-channel.capacity = 1000
avro-sink-agent.channels.avro-memory-channel.transactionCapacity = 100
avro-sink-agent.sinks.avro-logger.type=logger
# Bind the source and sink to the channel
avro-sink-agent.sources.avro-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-logger.channel = avro-memory-channel
依次啟動avro-sink-agent,和avro-source-agent
flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console
flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console