Flume基礎(chǔ)案例

核心概念

  • Agent:使用JVM 運行Flume。每臺機器運行一個agent,但是可以在一個agent中包含多個sources和sinks。
    Client:生產(chǎn)數(shù)據(jù),運行在一個獨立的線程。
  • Source:從Client專門用來收集數(shù)據(jù),傳遞給Channel,可以處理各種類型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。
  • Sink:從Channel收集數(shù)據(jù),運行在一個獨立線程,sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。
  • Channel:連接 sources 和 sinks ,這個有點像一個隊列,source組件把數(shù)據(jù)收集來以后,臨時存放在channel中,即channel組件在agent中是專門用來存放臨時數(shù)據(jù)的——對采集到的數(shù)據(jù)進行簡單的緩存,可以存放在memory、jdbc、file等等。
  • Events:可以是日志記錄、 avro 對象等。

Agent的概念

Flume以agent為最小的獨立運行單位。一個agent就是一個JVM,agent本身是一個Java進程,運行在日志收集節(jié)點—所謂日志收集節(jié)點就是服務(wù)器節(jié)點。

單agent由Source、Sink和Channel三大組件構(gòu)成,類似生產(chǎn)者、倉庫、消費者的架構(gòu).如下圖:

[站外圖片上傳中...(image-64c038-1541939331883)]

a single node flume

NetCat Source:監(jiān)聽一個指定的網(wǎng)絡(luò)端口,即只要應用程序向這個端口里面寫數(shù)據(jù),這個source組件就可以獲取到信息。

在/home/hadoop/script/flume下新建配置文件a-single-node.conf,配置文件如下:

#a-single-node.conf : A single node flume configuration


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

保存之后運行,執(zhí)行命令:

flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/a-single-node.conf \
-Dflume.root.logger=INFO,console

參數(shù)說明:

-n 指定agent名稱(與配置文件中代理的名字相同)

-c 指定flume中配置文件的目錄

-f 指定配置文件

-Dflume.root.logger=DEBUG,console 設(shè)置日志等級

通過telnet監(jiān)聽端口:

telnet localhost 44444

輸入任意數(shù)據(jù),在flume中可以看到輸出:

18/08/02 15:25:29 INFO sink.LoggerSink: Event: { headers:{} body: 61 62 63 0D                                     abc. }

采集指定文件數(shù)據(jù)存入到hdfs

source-channel-sink :exec - memory - hdfs

配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
# 監(jiān)聽文件路徑
a1.sources.r1.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
# hdfs路徑
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/tail
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.batchSize=10
a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

缺少這個配置的時候

a1.sinks.k1.hdfs.useLocalTimeStamp=true

會出現(xiàn)異常

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

原因是因為寫入到hfds時使用到了時間戳來區(qū)分目錄結(jié)構(gòu),flume的消息組件event在接受到之后在header中沒有發(fā)現(xiàn)時間戳參數(shù),導致該錯誤發(fā)生,有三種方法可以解決這個錯誤:

  1. agent1.sources.source1.interceptors = t1
    agent1.sources.source1.interceptors.t1.type = timestamp 為source添加攔截,每條event頭中加入時間戳;(效率會慢一些)
  2. agent1.sinks.sink1.hdfs.useLocalTimeStamp = true 為sink指定該參數(shù)為true (如果客戶端和flume集群時間不一致數(shù)據(jù)時間會不準確)
  3. 在向source發(fā)送event時,將時間戳參數(shù)添加到event的header中即可,header是一個map,添加時mapkey為timestamp(推薦使用)

采集指定文件夾的內(nèi)容到控制臺

source - channel - sink :spooling - memory - logger
目錄下的文件如果已經(jīng)讀取完畢會增加后綴.COMPELETE,且文件名不能相同
配置文件如下:


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir=/home/hadoop/temp
a1.sources.r1.fileHeader=true

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = logger

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

小文件問題

案例:采集指定文件夾內(nèi)容到hdfs taildir - memory - hdfs
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
a1.sinks.k1.hdfs.hdfs.batchSize=15
#關(guān)鍵參數(shù)  三個是或的關(guān)系 滿足一個就會roll
a1.sinks.k1.hdfs.rollInterval= 0  #按時間 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollSize= 500    #按大小 0為參數(shù)不生效
a1.sinks.k1.hdfs.rollCount = 0    #按記錄數(shù) 0為參數(shù)不生效

a1.sinks.k1.hdfs.useLocalTimeStamp=true

# Bind the source and sink to the channel
a1.sources.r1.channels = 

多個channel

image

一個channel對應輸出到日志的sink,另外一個對應寫入到Hdfs的sink
配置文件如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /home/hadoop/temp/position/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/home/hadoop/temp/flume/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2=/home/hadoop/temp/flume/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true


# Use a channel which buffers events in memory
a1.channels.c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100


a1.channels.c2.type=memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Describe the sink
a1.sinks.k1.type = logger


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 100 

# Bind the source and sink to the channel
a1.sources.r1.channels =c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

sink processor

主要包含兩種方式:failover和load_balance

  • failover:Failover Sink Processor維護了一個sink的優(yōu)先級列表,具有故障轉(zhuǎn)移的功能,每個sink都有一個權(quán)值用于表示自己的優(yōu)先級,優(yōu)先級值高Sink會更早被激活。值越大,優(yōu)先級越高。表示優(yōu)先級的權(quán)值不能相同。
  • load_balance:按照一定的算法選擇sink輸出到指定地方,如果在文件輸出量很大的情況下,負載均衡還是很有必要的,通過多個通道輸出緩解輸出壓力,flume內(nèi)置的負載均衡的算法默認是round robin(輪詢算法),還有一個random(隨機算法)。

failover 配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10 
a1.sinkgroups.g1.processor.maxpenalty = 10000

a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9001/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0  
a1.sinks.k2.hdfs.rollSize= 0  
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

load_balance配置如下(更改負載均衡策略進行測試):

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random


a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1


a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path=hdfs://hadoop002:9000/user/hadoop/flume/manager
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=TEXT
a1.sinks.k2.hdfs.hdfs.batchSize=15
a1.sinks.k2.hdfs.rollInterval= 0
a1.sinks.k2.hdfs.rollSize= 0
a1.sinks.k2.hdfs.rollCount = 10
a1.sinks.k2.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

avro source和avro sink

該案例需要用到兩個agent,一個作為數(shù)據(jù)源:產(chǎn)生數(shù)據(jù),一個作為數(shù)據(jù)接收端:接收數(shù)據(jù)

數(shù)據(jù)源agent配置如下:

# Name the components on this agent
avro-source-agent.sources = exec-source
avro-source-agent.sinks = avro-sink
avro-source-agent.channels = avro-memory-channel

# Describe/configure the source

avro-source-agent.sources.exec-source.type = exec
avro-source-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/logs/access.log


# Use a channel which buffers events in memory
avro-source-agent.channels.avro-memory-channel.type = memory
avro-source-agent.channels.avro-memory-channel.capacity = 1000
avro-source-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-source-agent.sinks.avro-sink.type=avro
avro-source-agent.sinks.avro-sink.hostname=hadoop002
avro-source-agent.sinks.avro-sink.port=44444

# Bind the source and sink to the channel
avro-source-agent.sources.exec-source.channels = avro-memory-channel
avro-source-agent.sinks.avro-sink.channel = avro-memory-channel

數(shù)據(jù)接收端配置如下:

# Name the components on this agent
avro-sink-agent.sources = avro-source
avro-sink-agent.sinks = avro-logger
avro-sink-agent.channels = avro-memory-channel

# Describe/configure the source
avro-sink-agent.sources.avro-source.type = avro
avro-sink-agent.sources.avro-source.bind = hadoop002
avro-sink-agent.sources.avro-source.port = 44444


# Use a channel which buffers events in memory
avro-sink-agent.channels.avro-memory-channel.type = memory
avro-sink-agent.channels.avro-memory-channel.capacity = 1000
avro-sink-agent.channels.avro-memory-channel.transactionCapacity = 100


avro-sink-agent.sinks.avro-logger.type=logger

# Bind the source and sink to the channel
avro-sink-agent.sources.avro-source.channels = avro-memory-channel
avro-sink-agent.sinks.avro-logger.channel = avro-memory-channel

依次啟動avro-sink-agent,和avro-source-agent

flume-ng agent \
--name avro-sink-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-sink.conf \
-Dflume.root.logger=INFO,console 


flume-ng agent \
--name avro-source-agent \
--conf $FLUME_HOME/conf \
--conf-file /home/hadoop/script/flume/avro-source.conf \
-Dflume.root.logger=INFO,console
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 博客原文 翻譯作品,水平有限,如有錯誤,煩請留言指正。原文請見 官網(wǎng)英文文檔 引言 概述 Apache Flume...
    rabbitGYK閱讀 11,694評論 13 34
  • 介紹 概述 Apache Flume是為有效收集聚合和移動大量來自不同源到中心數(shù)據(jù)存儲而設(shè)計的可分布,可靠的,可用...
    ximengchj閱讀 3,665評論 0 13
  • Flume的官網(wǎng)地址:http://flume.apache.org/FlumeUserGuide.html#ex...
    24格的世界閱讀 975評論 0 1
  • flume 有三大組件source 、channel和sink,各個組件之間都可以相互組合使用,各組件間耦合度低。...
    三萬_chenbing閱讀 6,002評論 0 5
  • 題目1: 輪播的實現(xiàn)原理是怎樣的?如果讓你來實現(xiàn),你會抽象出哪些函數(shù)(or接口)供使用?(比如 play()) 例...
    _達斯基閱讀 373評論 0 0

友情鏈接更多精彩內(nèi)容