Flume學(xué)習(xí)一(基本source、channel、sink)

默認(rèn)命名如下:

a1.sources = r1

a1.sinks = k1

a1.channels = c1 c2 c3 c4

agent_name:a1 ? ? source_name:r1 ? ? channel_name:c1 ? ? sink_name:k1

1、Interceptors(sources)

攔截器的作用范圍是數(shù)據(jù)源到source之間,主要是為了給數(shù)據(jù)添加headers,最常用的是timestamp、host、static。timestamp類型可以配合hdfs sink的文件輸出的日期格式(hdfs sink也可以用hdfs.useLocalTimeStamp來當(dāng)時間戳,但是不能代表數(shù)據(jù)的產(chǎn)生時間)。可以用host和static來做數(shù)據(jù)來源界定,既可以用來標(biāo)記數(shù)據(jù),也可以用來做數(shù)據(jù)分發(fā)的時候使用(配合Channel Selectors)。但是默認(rèn)的Interceptor只能針對web打標(biāo)簽,不能針對event打標(biāo)簽。如果需要對數(shù)據(jù)進(jìn)行標(biāo)志,需要修改代碼才能實(shí)現(xiàn),比如avro? source可以修改org.apache.flume.clients.log4jappender.Log4jAppender的實(shí)現(xiàn)來增加一些數(shù)據(jù)上的處理。

2、Channel Selectors

選擇器的作用范圍是source到channel之間,主要是為了確定每個Event到哪個channel。最常用的類型是replicating和multiplexing,replicating是默認(rèn)的,能夠把Event分別復(fù)制到每個channel。multiplexing可以根據(jù)Event的headers里的key的不同的值把數(shù)據(jù)分發(fā)到不同的channel。

3、Sink Processors

處理器的作用范圍是channel到sink之間,主要是為了確定Events到哪個sink。默認(rèn)的是default,最常用的是failover(故障轉(zhuǎn)移)和load_balance(負(fù)載均衡)。

4、Flume sources

avro(監(jiān)聽端口):

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.ipFilter = true #開啟ip黑白名單認(rèn)證,多個驗(yàn)證順序執(zhí)行,或關(guān)系,只要滿足一個即驗(yàn)證結(jié)束

a1.sources.r1.ipFilterRules = allow:ip:127.*,allow:name:localhost,deny:ip:*#這個意思是阻止任何ip訪問,僅允許ip127開頭和localhost通過

#攔截器部分,其他類型source組件一樣

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = test_key

a1.sources.r1.interceptors.i3.value = test_value

#選擇器部分,其他類型source組件一樣

a1.sources.r1.selector.type=multiplexing

a1.sources.r1.selector.header=state

a1.sources.r1.selector.mapping.CZ=c1

a1.sources.r1.selector.mapping.US=c2 c3

a1.sources.r1.selector.default=c4

exec(常用量監(jiān)控文件):

a1.sources.r1.type=exec

a1.sources.r1.command=tail -F /var/log/secure

a1.sources.r1.batchSize=20 #批處理?xiàng)l數(shù),每次上傳數(shù)據(jù)的條數(shù),不到不傳

a1.sources.r1.batchTimeout=3000 #等待數(shù)據(jù)時間(單位毫秒),如果在時間內(nèi)沒有收集到20條數(shù)據(jù)也需要傳輸數(shù)據(jù)

spooldir(監(jiān)控目錄):

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp/

a1.sources.r1.fileSuffix = .org

a1.sources.r1.deletePolicy = never #讀完刪除與否,默認(rèn)不刪除(never),immediate立即刪除

a1.sources.r1.fileHeader = true

a1.sources.r1.fileHeaderKey = ruoze_file #headers里添加ruoze_file=文件全路徑

a1.sources.r1.basenameHeader = true

a1.sources.r1.basenameHeaderKey = ruoze_base #headers里添加ruoze_base=文件名

a1.sources.r1.trackerDir = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool

TAILDIR(多目錄的文件監(jiān)控):

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1 f2

a1.sources.r1.positionFile = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/positions/taildir_position.json

a1.sources.r1.filegroups.f1 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/tmp

a1.sources.r1.headers.f1.ruoze = TAILDIR_f1

a1.sources.r1.byteOffsetHeader = true #字節(jié)偏移量放到headers

a1.sources.r1.writePosInterval = 3000 #偏移量記錄時間間隔

a1.sources.r1.filegroups.f2 = /home/hadoop/app/flume-1.6.0-cdh5.14.0-bin/flumespool

a1.sources.r1.headers.f2.ruoze2 = TAILDIR_f2

netcat(監(jiān)聽端口):

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 44444

a1.sources.r1.max-line-length = 1024 #一行的最大長度,超出長度會報錯并斷掉連接

a1.sources.r1.ack-every-event = true #應(yīng)答OK給每個Event


5、Flume channels

memory(內(nèi)存管道):

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000#緩存最大Event條數(shù)

a1.channels.c1.transactionCapacity = 100#事物包括的Event最大條數(shù),source和sink的batchSize需要小于該值

file(文件管道):

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = #監(jiān)測點(diǎn)文件路徑

a1.channels.c1.useDualCheckpoints = true #監(jiān)測點(diǎn)文件是否備份

a1.channels.c1.backupCheckpointDir = #監(jiān)測點(diǎn)文件備份路徑

a1.channels.c1.dataDirs = #數(shù)據(jù)存儲目錄

6、Flume? sinks

logger(控制臺打印信息):

a1.sinks.k1.type = logger

flume-ng啟動時需要加上:-Dflume.root.logger=INFO,console

avro(往avro? source推送數(shù)據(jù)):

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = localhost #目標(biāo)ip

a1.sinks.k1.port = 44444 #目標(biāo)端口

HDFS(寫數(shù)據(jù)到HDFS):

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://192.168.205.131:9000/data/%Y%m%d%H%M? #這個時間的格式需要時間戳來進(jìn)行轉(zhuǎn)換,需要headers里有timestamp

a1.sinks.k1.hdfs.filePrefix = app_name #文件前綴

a1.sinks.k1.hdfs.fileSuffix = .log #文件后綴

a1.sinks.k1.hdfs.inUseSuffix = .tmp #正在寫入文件后綴

a1.sinks.k1.hdfs.rollInterval = 30 #臨時文件轉(zhuǎn)正式文件的時間,單位秒,如果為0,則不限制,和rollSize、rollCount為或關(guān)系

a1.sinks.k1.hdfs.rollSize = 10485760 #臨時文件轉(zhuǎn)正式文件的大小,單位byte,如果為0,則不限制

a1.sinks.k1.hdfs.rollCount = 100000 #臨時文件轉(zhuǎn)正式文件記錄條數(shù),如果為0,則不限制

a1.sinks.k1.hdfs.round = true #是否啟用時間上的”舍棄”。如果啟用,則會影響除了%t的其他所有時間表達(dá)式

a1.sinks.k1.hdfs.roundValue = 10#時間上進(jìn)行“舍棄”的值,這里是舍棄10分鐘以內(nèi)的值,意味著每10分鐘新建一個目錄

a1.sinks.k1.hdfs.roundUnit = minute #時間上進(jìn)行”舍棄”的單位,包含:second,minute,hour

a1.sinks.k1.hdfs.useLocalTimeStamp = true #如果source里沒有timestamp時間戳,則需要該參數(shù)為true

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容