flume 有三大組件source 、channel和sink,各個組件之間都可以相互組合使用,各組件間耦合度低。使用靈活,方便。
channel 的內(nèi)容只輸出一次,同一個event 如果sink1 輸出,sink2 不輸出;如果sink1 輸出,sink1 不輸出。 最終 sink1+sink2=channel 中的數(shù)據(jù)。
配置文件如下:
a1.sources=r1a1.sinks= k1 k2a1.channels= c1# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# channela1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c1#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp
2.多 channel 多sink ,每個sink 輸出內(nèi)容一致
(memory channel 用于kafka操作,實(shí)時性高,file channel 用于 sink file 數(shù)據(jù)安全性高)?
(多channel 單 sink 的情況沒有舉例,個人感覺用處不廣泛。)
配置文件如下:
a1.sources=r1a1.sinks= k1 k2a1.channels= c1 c2# Describe/configure the sourcea1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1 c2a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log#多個channel 的數(shù)據(jù)相同a1.sources.r1.selector.type=replicating# channel1a1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#channel2a1.channels.c2.type= filea1.channels.c2.checkpointDir= /opt/apps/flume-1.7.0/checkpointa1.channels.c2.dataDirs= /opt/apps/flume-1.7.0/data#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy#sink2a1.sinks.k2.type= file_rolla1.sinks.k2.channel= c2#a1.sinks.k2.sink.rollInterval=0a1.sinks.k2.sink.directory= /opt/apps/tmp
多個source 可以讀取多種信息放在一個channel 然后輸出到同一個地方?
配置文件如下:
a1.sources=r1r2a1.sinks= k1a1.channels= c1# source1a1.sources.r1.type= execa1.sources.r1.shell= /bin/bash -ca1.sources.r1.channels= c1a1.sources.r1.command= tail -F /opt/apps/logs/tail4.log# source2a1.sources.r2.type= execa1.sources.r2.shell= /bin/bash -ca1.sources.r2.channels= c1a1.sources.r2.command= tail -F /opt/apps/logs/tail2.log# channel1? in memorya1.channels.c1.type= memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100#sink1a1.sinks.k1.channel= c1a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic= mytopica1.sinks.k1.kafka.bootstrap.servers= localhost:9092a1.sinks.k1.kafka.flumeBatchSize=20a1.sinks.k1.kafka.producer.acks=1a1.sinks.k1.kafka.producer.linger.ms=1a1.sinks.ki.kafka.producer.compression.type= snappy
flume 像樂高積木一樣可以自己隨心所欲將不同的組件進(jìn)行搭配使用,耦合度低。
Source
rpc遠(yuǎn)程過程調(diào)用協(xié)議,客戶機(jī)與服務(wù)機(jī)的調(diào)用模式需要對數(shù)據(jù)進(jìn)行序列化。
???????? 1:客戶機(jī)將參數(shù)序列化并以二進(jìn)制形式通過網(wǎng)絡(luò)傳輸?shù)椒?wù)器。
???????? 2:服務(wù)器接收到后進(jìn)行反序列化再調(diào)用方法獲取返回值。
???????? 3:服務(wù)器將返回值序列化后再通過網(wǎng)絡(luò)傳輸給客戶機(jī)。
???????? 4:客戶機(jī)接收到結(jié)果后再進(jìn)行反序列化獲取結(jié)果。
Avro source:
???????? Avro就是一種序列化形式,avrosource監(jiān)聽一個端口只接收avro序列化后的數(shù)據(jù),其他類型的不接收。
???????? type:avrosource的類型,必須是avro。
bind:要監(jiān)聽的(本機(jī)的)主機(jī)名或者ip。此監(jiān)聽不是過濾發(fā)送方。一臺電腦不是說只有一個IP。有多網(wǎng)卡的電腦,對應(yīng)多個IP。
port:綁定的本地的端口。
Thrif source:
???????? 和avro一樣是一種數(shù)據(jù)序列化形式,Thrifsource只采集thrift數(shù)據(jù)序列化后的數(shù)據(jù)
Exec source:
???????? 采集linux命令的返回結(jié)果傳輸給channel
???????? type:source的類型:必須是exec。
command:要執(zhí)行命令。
tail? –f? 若文件被刪除即使重新創(chuàng)建同名文件也不會監(jiān)聽
? ? ? ? tail? -F? 只要文件同名就可以繼續(xù)監(jiān)聽
以上可以用在日志文件切割時的監(jiān)聽
JMS Source:
Java消息服務(wù)數(shù)據(jù)源,Java消息服務(wù)是一個與具體平臺無關(guān)的API,這是支持jms規(guī)范的數(shù)據(jù)源采集;
Spooling Directory Source:通過文件夾里的新增的文件作為數(shù)據(jù)源的采集;
Kafka Source:從kafka服務(wù)中采集數(shù)據(jù)。
NetCat Source:綁定的端口(tcp、udp),將流經(jīng)端口的每一個文本行數(shù)據(jù)作為Event輸入
? ? ? ? type:source的類型,必須是netcat。
bind:要監(jiān)聽的(本機(jī)的)主機(jī)名或者ip。此監(jiān)聽不是過濾發(fā)送方。一臺電腦不是說只有一個IP。有多網(wǎng)卡的電腦,對應(yīng)多個IP。
port:綁定的本地的端口。
HTTP Source:監(jiān)聽HTTP POST和 GET產(chǎn)生的數(shù)據(jù)的采集
???????? 是一個數(shù)據(jù)存儲池,中間通道,從source中接收數(shù)據(jù)再向sink目的地傳輸,如果sink寫入失敗會自動重寫因此不會造成數(shù)據(jù)丟失。
???????? Memory:用內(nèi)存存儲,但服務(wù)器宕機(jī)會丟失數(shù)據(jù)。
? ? ? ? ? ? ? ? ?Typechannel的類型:必須為memory
capacity:channel中的最大event數(shù)目
transactionCapacity:channel中允許事務(wù)的最大event數(shù)目
???????? File:使用文件存儲數(shù)據(jù)不會丟失數(shù)據(jù)但會耗費(fèi)io。
? ? ? ? ? ? ? ? ?Typechannel的類型:必須為 file
checkpointDir :檢查點(diǎn)的數(shù)據(jù)存儲目錄
dataDirs :數(shù)據(jù)的存儲目錄
transactionCapacity:channel中允許事務(wù)的最大event數(shù)目
???????? SpillableMemory Channel:內(nèi)存文件綜合使用,先存入內(nèi)存達(dá)到閥值后flush到文件中。
? ? ? ? ? ? ? ? Typechannel的類型:必須為SPILLABLEMEMORY
memoryCapacity:內(nèi)存的容量event數(shù)
overflowCapacity:數(shù)據(jù)存到文件的event閥值數(shù)
checkpointDir:檢查點(diǎn)的數(shù)據(jù)存儲目錄
dataDirs:數(shù)據(jù)的存儲目錄
?????????Jdbc:使用jdbc數(shù)據(jù)源來存儲數(shù)據(jù)。
???????? Kafka:使用kafka服務(wù)來存儲數(shù)據(jù)。
???????? 各種類型的目的地,接收channel寫入的數(shù)據(jù)并以指定的形式表現(xiàn)出來。Sink有很多種類型。
type:sink的類型 必須是hdfs。
hdfs.path:hdfs的上傳路徑。
hdfs.filePrefix:hdfs文件的前綴。默認(rèn)是:FlumeData
hdfs.rollInterval:間隔多久產(chǎn)生新文件,默認(rèn)是:30(秒) 0表示不以時間間隔為準(zhǔn)。
hdfs.rollSize:文件到達(dá)多大再產(chǎn)生一個新文件,默認(rèn)是:1024(bytes)0表示不以文件大小為準(zhǔn)。
hdfs.rollCount:event達(dá)到多大再產(chǎn)生一個新文件,默認(rèn)是:10(個)0表示不以event數(shù)目為準(zhǔn)。
hdfs.batchSize:每次往hdfs里提交多少個event,默認(rèn)為100
hdfs.fileType:hdfs文件的格式主要包括:SequenceFile,DataStream ,CompressedStream,如果使用了CompressedStream就要設(shè)置壓縮方式。
hdfs.codeC:壓縮方式:gzip,bzip2, lzo, lzop, snappy
注:%{host}可以使用header的key。以及%Y%m%d來表示時間,但關(guān)于時間的表示需要在header里有timestamp這個key。
Logger Sink將數(shù)據(jù)作為日志處理(根據(jù)flume中的設(shè)置的日志方式來顯示)
要在控制臺顯示在運(yùn)行agent的時候加入:-Dflume.root.logger=INFO,console。
type:sink的類型:必須是logger。
maxBytesToLog:打印body的最長的字節(jié)數(shù) 默認(rèn)為16
Avro Sink:數(shù)據(jù)被轉(zhuǎn)換成Avro Event,然后發(fā)送到指定的服務(wù)端口上。
? ? ? ? ? ? ? ? ?type:sink的類型:必須是 avro。
hostname:指定發(fā)送數(shù)據(jù)的主機(jī)名或者ip
port:指定發(fā)送數(shù)據(jù)的端口
1:監(jiān)聽一個文件的增加變化,采集數(shù)據(jù)并在控制臺打印。
在這個例子中我使用exec source,memory chanel,logger sink??梢钥次业腶gent結(jié)構(gòu)圖
以下是我創(chuàng)建的exec_source.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執(zhí)行命令:
bin/flume-ngagent --conf conf/ --conf-file conf/exec_source.conf --name a1-Dflume.root.logger=INFO,console &
然后更改/usr/local/success.log文件中的內(nèi)容后可以看到flume采集到了文件的變化并在控制臺上打印出來。文件初始內(nèi)容hello和how are you,剩下的i am fine和ok為新增加內(nèi)容。
2:監(jiān)控一個文件變化并將其發(fā)送到另一個服務(wù)器上然后打印
這個例子可以建立在上一個例子之上,但是需要對flume的結(jié)構(gòu)做一些修改,我使用avro序列化數(shù)據(jù)再發(fā)送到指定的服務(wù)器上。詳情看結(jié)構(gòu)圖。
實(shí)際上flume可以進(jìn)行多個節(jié)點(diǎn)關(guān)聯(lián),本例中我只使用131向139發(fā)送數(shù)據(jù)
131,139上都必須啟動agent
服務(wù)器131配置
以下是我創(chuàng)建的exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=exec
a1.sources.r1.command=tail -F/usr/local/success.log
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.79.139
a1.sinks.k1.port=42424
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執(zhí)行命令啟動agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
139服務(wù)器配置
執(zhí)行命令拷貝flume到139
scp -r apache-flume-1.7.0-bin/root@192.168.79.139:/usr/local/
修改exec_source_avro_sink.conf
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
執(zhí)行命令啟動agent
bin/flume-ng agent --conf conf/ --conf-fileconf/exec_source_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console&
結(jié)果可以在139控制臺上看到131中修改success.log的變化信息
執(zhí)行bin/flume-ng會提示有命令如下
help?????????????????????display this help text
agent???????????????????? run aFlume agent
avro-client?????????????? run anavro Flume client
version?????????????????? show Flume version info
avro-clinet是avro客戶端,可以把本地文件以avro序列化方式序列化后發(fā)送到指定的服務(wù)器端口。本例就是將131的一個文件一次性的發(fā)送到139中并打印。
Agent結(jié)構(gòu)圖如下
131啟動的是一個avro-client,它會建立連接,發(fā)送數(shù)據(jù),斷開連接,它只是一個客戶端。
啟動一個avro客戶端
bin/flume-ngavro-client --conf conf/ --host 192.168.79.139 --port 42424 --filename/usr/local/success.log --headerFile /usr/local/kv.log
--headerFile是用來區(qū)分是哪個服務(wù)器發(fā)送的數(shù)據(jù),kv.log中的內(nèi)容會被發(fā)送到139,可以作為標(biāo)識來使用。
139的avro_client.conf如下
a1.sources=r1
a1.channels=c1
a1.sinks=k1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=42424
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactioncapacity=100
a1.sinks.k1.type=logger
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
啟動agent
bin/flume-ngagent --conf conf/ --conf-file conf/avro_client.conf --name a1-Dflume.root.logger=INFO,console &
139控制臺顯示如下
可以看到headers的內(nèi)容headers:{hostname=192.168.79.131}
1:Flume服務(wù)沒有stop命令需要通過kill來殺掉進(jìn)行,可以使用jps? -m來確認(rèn)是那個agent的number
[root@shb01 conf]# jps -m
3610 Jps -m
3512 Application --conf-fileconf/exec_source.conf --name a1
2:修改flume的配置文件后如avro_client.conf,flume會自動重啟
3:logger sink默認(rèn)只顯示16個字節(jié)
4:flume是以event為單位進(jìn)行數(shù)據(jù)傳輸?shù)?,其中headers是一個map容器map
Event: { headers:{hostname=192.168.79.131}body: 31 61?????????????????????????????????????????? 1a }
5:flume支持多節(jié)點(diǎn)關(guān)聯(lián)但是sink和source的類型要一致,比如avro-client發(fā)送數(shù)據(jù)那么接收方的source也必須是avro否則會警告。