flume 的source 、channel和sink 多種組合

flume 有三大組件source 、channel和sink,各個組件之間都可以相互組合使用,各組件間耦合度低。使用靈活,方便。

1.多sink

channel 的內(nèi)容只輸出一次,同一個event 如果sink1 輸出,sink2 不輸出;如果sink1 輸出,sink1 不輸出。 最終 sink1+sink2=channel 中的數(shù)據(jù)。

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

2.多 channel 多sink ,每個sink 輸出內(nèi)容一致

(memory channel 用于kafka操作,實(shí)時性高,file channel 用于 sink file 數(shù)據(jù)安全性高)?

(多channel 單 sink 的情況沒有舉例,個人感覺用處不廣泛。)

配置文件如下:

a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#多個channel 的數(shù)據(jù)相同a1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp

3. 多source 單 channel 單 sink

多個source 可以讀取多種信息放在一個channel 然后輸出到同一個地方?

配置文件如下:

a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1? in memorya1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy

flume 像樂高積木一樣可以自己隨心所欲將不同的組件進(jìn)行搭配使用,耦合度低。



Source

rpc遠(yuǎn)程過程調(diào)用協(xié)議,客戶機(jī)與服務(wù)機(jī)的調(diào)用模式需要對數(shù)據(jù)進(jìn)行序列化。

???????? 1:客戶機(jī)將參數(shù)序列化并以二進(jìn)制形式通過網(wǎng)絡(luò)傳輸?shù)椒?wù)器。

???????? 2:服務(wù)器接收到后進(jìn)行反序列化再調(diào)用方法獲取返回值。

???????? 3:服務(wù)器將返回值序列化后再通過網(wǎng)絡(luò)傳輸給客戶機(jī)。

???????? 4:客戶機(jī)接收到結(jié)果后再進(jìn)行反序列化獲取結(jié)果。

Avro source:

???????? Avro就是一種序列化形式,avrosource監(jiān)聽一個端口只接收avro序列化后的數(shù)據(jù),其他類型的不接收。

???????? type:avrosource的類型,必須是avro。

bind:要監(jiān)聽的(本機(jī)的)主機(jī)名或者ip。此監(jiān)聽不是過濾發(fā)送方。一臺電腦不是說只有一個IP。有多網(wǎng)卡的電腦,對應(yīng)多個IP。

port:綁定的本地的端口。


Thrif source:

???????? 和avro一樣是一種數(shù)據(jù)序列化形式,Thrifsource只采集thrift數(shù)據(jù)序列化后的數(shù)據(jù)


Exec source:

???????? 采集linux命令的返回結(jié)果傳輸給channel

???????? type:source的類型:必須是exec。

command:要執(zhí)行命令。

tail? –f? 若文件被刪除即使重新創(chuàng)建同名文件也不會監(jiān)聽

? ? ? ? tail? -F? 只要文件同名就可以繼續(xù)監(jiān)聽

以上可以用在日志文件切割時的監(jiān)聽



JMS Source:

Java消息服務(wù)數(shù)據(jù)源,Java消息服務(wù)是一個與具體平臺無關(guān)的API,這是支持jms規(guī)范的數(shù)據(jù)源采集;


Spooling Directory Source:通過文件夾里的新增的文件作為數(shù)據(jù)源的采集;


Kafka Source:從kafka服務(wù)中采集數(shù)據(jù)。


NetCat Source:綁定的端口(tcp、udp),將流經(jīng)端口的每一個文本行數(shù)據(jù)作為Event輸入

? ? ? ? type:source的類型,必須是netcat。

bind:要監(jiān)聽的(本機(jī)的)主機(jī)名或者ip。此監(jiān)聽不是過濾發(fā)送方。一臺電腦不是說只有一個IP。有多網(wǎng)卡的電腦,對應(yīng)多個IP。

port:綁定的本地的端口。


HTTP Source:監(jiān)聽HTTP POST和 GET產(chǎn)生的數(shù)據(jù)的采集


Chanel

???????? 是一個數(shù)據(jù)存儲池,中間通道,從source中接收數(shù)據(jù)再向sink目的地傳輸,如果sink寫入失敗會自動重寫因此不會造成數(shù)據(jù)丟失。

???????? Memory:用內(nèi)存存儲,但服務(wù)器宕機(jī)會丟失數(shù)據(jù)。

? ? ? ? ? ? ? ? ?Typechannel的類型:必須為memory

capacity:channel中的最大event數(shù)目

transactionCapacity:channel中允許事務(wù)的最大event數(shù)目


???????? File:使用文件存儲數(shù)據(jù)不會丟失數(shù)據(jù)但會耗費(fèi)io。

? ? ? ? ? ? ? ? ?Typechannel的類型:必須為 file

checkpointDir :檢查點(diǎn)的數(shù)據(jù)存儲目錄

dataDirs :數(shù)據(jù)的存儲目錄

transactionCapacity:channel中允許事務(wù)的最大event數(shù)目


???????? SpillableMemory Channel:內(nèi)存文件綜合使用,先存入內(nèi)存達(dá)到閥值后flush到文件中。

? ? ? ? ? ? ? ? Typechannel的類型:必須為SPILLABLEMEMORY

memoryCapacity:內(nèi)存的容量event數(shù)

overflowCapacity:數(shù)據(jù)存到文件的event閥值數(shù)

checkpointDir:檢查點(diǎn)的數(shù)據(jù)存儲目錄

dataDirs:數(shù)據(jù)的存儲目錄


?????????Jdbc:使用jdbc數(shù)據(jù)源來存儲數(shù)據(jù)。

???????? Kafka:使用kafka服務(wù)來存儲數(shù)據(jù)。


Sink

???????? 各種類型的目的地,接收channel寫入的數(shù)據(jù)并以指定的形式表現(xiàn)出來。Sink有很多種類型。

type:sink的類型 必須是hdfs。

hdfs.path:hdfs的上傳路徑。

hdfs.filePrefix:hdfs文件的前綴。默認(rèn)是:FlumeData

hdfs.rollInterval:間隔多久產(chǎn)生新文件,默認(rèn)是:30(秒) 0表示不以時間間隔為準(zhǔn)。

hdfs.rollSize:文件到達(dá)多大再產(chǎn)生一個新文件,默認(rèn)是:1024(bytes)0表示不以文件大小為準(zhǔn)。

hdfs.rollCount:event達(dá)到多大再產(chǎn)生一個新文件,默認(rèn)是:10(個)0表示不以event數(shù)目為準(zhǔn)。

hdfs.batchSize:每次往hdfs里提交多少個event,默認(rèn)為100

hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要設(shè)置壓縮方式。

hdfs.codeC:壓縮方式:gzip,bzip2, lzo, lzop, snappy

注:%{host}可以使用header的key。以及%Y%m%d來表示時間,但關(guān)于時間的表示需要在header里有timestamp這個key。


Logger Sink將數(shù)據(jù)作為日志處理(根據(jù)flume中的設(shè)置的日志方式來顯示)

要在控制臺顯示在運(yùn)行agent的時候加入:-Dflume.root.logger=INFO,console。

type:sink的類型:必須是logger。

maxBytesToLog:打印body的最長的字節(jié)數(shù) 默認(rèn)為16



Avro Sink:數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到指定的服務(wù)端口上。

? ? ? ? ? ? ? ? ?type:sink的類型:必須是 avro。

hostname:指定發(fā)送數(shù)據(jù)的主機(jī)名或者ip

port:指定發(fā)送數(shù)據(jù)的端口

實(shí)例

1:監(jiān)聽一個文件的增加變化,采集數(shù)據(jù)并在控制臺打印。

在這個例子中我使用exec source,memory chanel,logger sink??梢钥次业腶gent結(jié)構(gòu)圖

以下是我創(chuàng)建的exec_source.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1


a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log


a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100


a1.sinks.k1.type=logger


a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1


執(zhí)行命令:

bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &


然后更改/usr/local/success.log文件中的內(nèi)容后可以看到flume采集到了文件的變化并在控制臺上打印出來。文件初始內(nèi)容hello和how are you,剩下的i am fine和ok為新增加內(nèi)容。

2:監(jiān)控一個文件變化并將其發(fā)送到另一個服務(wù)器上然后打印

這個例子可以建立在上一個例子之上,但是需要對flume的結(jié)構(gòu)做一些修改,我使用avro序列化數(shù)據(jù)再發(fā)送到指定的服務(wù)器上。詳情看結(jié)構(gòu)圖。

實(shí)際上flume可以進(jìn)行多個節(jié)點(diǎn)關(guān)聯(lián),本例中我只使用131向139發(fā)送數(shù)據(jù)

131,139上都必須啟動agent

服務(wù)器131配置

以下是我創(chuàng)建的exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1


a1.sources.r1.type=exec

a1.sources.r1.command=tail -F/usr/local/success.log


a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100


a1.sinks.k1.type=avro

a1.sinks.k1.hostname=192.168.79.139

a1.sinks.k1.port=42424


a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1


執(zhí)行命令啟動agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&


139服務(wù)器配置

執(zhí)行命令拷貝flume到139

scp -r apache-flume-1.7.0-bin/root@192.168.79.139:/usr/local/

修改exec_source_avro_sink.conf

a1.sources=r1

a1.channels=c1

a1.sinks=k1


a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424


a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100


a1.sinks.k1.type=logger

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1


執(zhí)行命令啟動agent

bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&


結(jié)果可以在139控制臺上看到131中修改success.log的變化信息

3:avro-client實(shí)例

執(zhí)行bin/flume-ng會提示有命令如下

help?????????????????????display this help text

agent???????????????????? run aFlume agent

avro-client?????????????? run anavro Flume client

version?????????????????? show Flume version info


avro-clinet是avro客戶端,可以把本地文件以avro序列化方式序列化后發(fā)送到指定的服務(wù)器端口。本例就是將131的一個文件一次性的發(fā)送到139中并打印。

Agent結(jié)構(gòu)圖如下

131啟動的是一個avro-client,它會建立連接,發(fā)送數(shù)據(jù),斷開連接,它只是一個客戶端。

啟動一個avro客戶端

bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log


--headerFile是用來區(qū)分是哪個服務(wù)器發(fā)送的數(shù)據(jù),kv.log中的內(nèi)容會被發(fā)送到139,可以作為標(biāo)識來使用。


139的avro_client.conf如下

a1.sources=r1

a1.channels=c1

a1.sinks=k1


a1.sources.r1.type=avro

a1.sources.r1.bind=0.0.0.0

a1.sources.r1.port=42424



a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactioncapacity=100


a1.sinks.k1.type=logger


a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1


啟動agent

bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &


139控制臺顯示如下

可以看到headers的內(nèi)容headers:{hostname=192.168.79.131}

注意:

1:Flume服務(wù)沒有stop命令需要通過kill來殺掉進(jìn)行,可以使用jps? -m來確認(rèn)是那個agent的number

[root@shb01 conf]# jps -m

3610 Jps -m

3512 Application --conf-fileconf/exec_source.conf --name a1


2:修改flume的配置文件后如avro_client.conf,flume會自動重啟


3:logger sink默認(rèn)只顯示16個字節(jié)


4:flume是以event為單位進(jìn)行數(shù)據(jù)傳輸?shù)?,其中headers是一個map容器map

Event: { headers:{hostname=192.168.79.131}body: 31 61?????????????????????????????????????????? 1a }


5:flume支持多節(jié)點(diǎn)關(guān)聯(lián)但是sink和source的類型要一致,比如avro-client發(fā)送數(shù)據(jù)那么接收方的source也必須是avro否則會警告。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容