介紹
概述
Apache Flume是為有效收集聚合和移動(dòng)大量來(lái)自不同源到中心數(shù)據(jù)存儲(chǔ)而設(shè)計(jì)的可分布,可靠的,可用的系統(tǒng)。
Apache Flume的用途不僅限于日志數(shù)據(jù)聚合。由于數(shù)據(jù)源是可定制的,F(xiàn)lume可用于傳輸大量事物數(shù)據(jù)包括但不限于網(wǎng)絡(luò)流量數(shù)據(jù),社交媒體產(chǎn)生的數(shù)據(jù),Email消息和很多其它類型的數(shù)據(jù)源。
Apache Flume是Apache軟件基金會(huì)的頂級(jí)項(xiàng)目之一。
現(xiàn)在有兩個(gè)版本可用(版本0.9.x和 1.x)
0.9.x版本的文檔在the Flume 0.9.x User Guide。
此文檔適用于1.4.x以后的版本。
新用戶和現(xiàn)有用戶鼓勵(lì)使用1.x版本,這樣可以利用最新的架構(gòu)提高性能和靈活配置。
系統(tǒng)需要
- Java運(yùn)行環(huán)境 - Java 1.7 or later
- 內(nèi)存 - 足夠配置文件中設(shè)置的內(nèi)存
- 磁盤大小 - 足夠配置文件中設(shè)置的磁盤大小
- 文件夾權(quán)限 - 可讀寫agent文件夾
架構(gòu)
數(shù)據(jù)流模型
一個(gè)Flume事件定義為,一個(gè)Flume agent是一個(gè)JVM包含這些從源到目標(biāo)的所有組建的進(jìn)程。

Flume source消耗外部數(shù)據(jù)源(如web服務(wù)器)發(fā)送給它的事件。外部數(shù)據(jù)源以Flume source理解的格式發(fā)送給Flume事件。例如,一個(gè)avro flume源可以 接受從avro客戶端或者其他avro sink的 flume agent發(fā)送的avro事件。類似的流程也可以使用Thrift Flume Source接受來(lái)自Thrift Sink或者Flume Thrift Rpc客戶端或者以任何語(yǔ)言編寫的符合Flume thrift協(xié)議的Thrift客戶端。當(dāng)Flume source接收到一個(gè)時(shí)間,會(huì)把事件存儲(chǔ)到一個(gè)或多個(gè)channels中。channel是一個(gè)被動(dòng)存儲(chǔ)(保存事件直到有Flume sink消耗)。舉個(gè)例子文件類型的channel——受本地系統(tǒng)支持。sink從channel中移除事件并把它推送到外部存儲(chǔ)如HDFS(通過(guò)Flume HDFS sink)或者把他傳到流的下一個(gè)Flume agent source中。agent的source和sink異步操作存儲(chǔ)在channel中的事件。
復(fù)雜流
Flume允許用戶建立多節(jié)點(diǎn)流,這種流的事件通過(guò)多個(gè)agent到達(dá)目的地。它也支持扇入和扇出流,上下文路由和為失敗的節(jié)點(diǎn)備份路由。
可靠性
事件存儲(chǔ)在agent的channel中,然后被傳到流的下一個(gè)agent或終端存儲(chǔ)(如HDFS)。只有在他們被存儲(chǔ)在下一個(gè)agent的channel中或這終端存儲(chǔ)時(shí)才會(huì)被移除。這是Flume單個(gè)流的消息傳遞機(jī)制提供流端到端的可靠性。
Flume使用事務(wù)保證事件的可靠傳輸。sources和sinks分別封裝在storage/retrieval,channel提供事件。這保證事件集合在流中點(diǎn)到點(diǎn)之間可靠傳輸。在多節(jié)點(diǎn)流的情況下,sink有著上一點(diǎn)的契約source有下一節(jié)點(diǎn)的契約,保證數(shù)據(jù)安全存儲(chǔ)到下一個(gè)節(jié)點(diǎn)的channel中。
可恢復(fù)性
事件存儲(chǔ)在channel中,channel負(fù)責(zé)災(zāi)難回復(fù)。Flume支持由本地文件系統(tǒng)提供的持久文件channel。內(nèi)存channel只是簡(jiǎn)單的把事件存儲(chǔ)在內(nèi)存隊(duì)列中,內(nèi)存channel更快,但在agent進(jìn)程掛掉時(shí)仍在內(nèi)存的事件不能恢復(fù)。
安裝
安裝一個(gè)agent
Flume agent的配置存在一個(gè)本地的配置文件中。這是一個(gè)遵循Java properties文件格式的文本文件。一個(gè)或多個(gè)agent配置可放在同一個(gè)配置文件里。配置文件包含agent的source,sink和channel的各個(gè)屬性以及他們的數(shù)據(jù)流連接。
配置獨(dú)立組件
每個(gè)流組件(source,sink或者channel)都有一個(gè)name,type和一系列的基于其type或?qū)嵗膶傩?。例如,一個(gè)avro source需要有個(gè)hostname(或者ip地址)一個(gè)端口號(hào)來(lái)接受數(shù)據(jù)。一個(gè)內(nèi)存channel有最大隊(duì)列長(zhǎng)度的屬性(capacity),一個(gè)HDFS sink需要知曉文件系統(tǒng)的URI地址創(chuàng)建文件,文件訪問(wèn)頻率(“hdfs.rollInterval”)等等。所有的這些組件屬性都需要在Flume配置文件中設(shè)置。
連接各個(gè)組件
agent需要知道加載什么組件,以及這些組件在流中的連接順序。通過(guò)列出在agent中的source,sink和channel名稱,定義每個(gè)sink和source的channel來(lái)完成。例如,一個(gè)從avroWeb的avro source來(lái)的agent流事件通過(guò)文件channel到HDFS sink集群。這個(gè)配置文件會(huì)包含三個(gè)組建,文件channel作為avroweb source 和hdfs sink的共享channel。
啟動(dòng)agent
使用bin目錄中的flume-ng腳本啟動(dòng)agent,你需要在命令行指定agent名稱,配置文件目錄,和配置文件。
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
現(xiàn)在agent將運(yùn)行配置腳本中的source和sink。
一個(gè)簡(jiǎn)單的例子
再次,我們給一個(gè)簡(jiǎn)單的配置文件,描述一個(gè)單一節(jié)點(diǎn)的Flume開(kāi)發(fā)。此配置文件使用戶生成事件并把日志輸出到控制臺(tái)。
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
此配置定義了一個(gè)名為a1的agent。a1有一個(gè)source監(jiān)聽(tīng)44444端口,一個(gè)channel在內(nèi)存中緩存事件數(shù)據(jù),一個(gè)sink把日志輸出到控制臺(tái)。配置文件給各個(gè)組建命名描述他們的類型和配置參數(shù)。一個(gè)配置文件可能定義多個(gè)命名agent,當(dāng)啟動(dòng)Flume進(jìn)程是傳遞標(biāo)志告訴它運(yùn)行哪些agent。
給出這個(gè)配置文件,我們可以如下啟動(dòng)Flume:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
注意在完整的部署中,我們通常包含多個(gè)--conf=<conf-dir>選項(xiàng)。這個(gè)<conf-dir>目錄將包含多個(gè)一個(gè)flume-env.sh shell腳本和一個(gè)潛在的log4j屬性文件。在此例中,我們傳遞一個(gè)java選項(xiàng)把日志輸出到控制臺(tái)而不需要一個(gè)自定義的文件腳本。
從另一個(gè)接口,我們可以telnet 44444端口發(fā)送一個(gè)Flume事件:
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
原始的Flume終端將輸出事件日志消息。
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
祝賀你——你已經(jīng)成功配置并部署了一個(gè)Flume agent!隨后的章節(jié)將涉及agent 配置的更多細(xì)節(jié)。
記錄原始數(shù)據(jù)
從管道里記錄原始的流數(shù)據(jù)不是許多生產(chǎn)環(huán)境所關(guān)心的行為,因?yàn)榭赡軐?dǎo)致敏感信息泄露或安全相關(guān)的配置,如密鑰輸出到Flume日志。默認(rèn)情況下,F(xiàn)lume不記錄這么多信息。另一方面,如果數(shù)據(jù)管道損壞,F(xiàn)Lume會(huì)嘗試提供調(diào)試錯(cuò)誤的線索。
一個(gè)調(diào)試事件管道錯(cuò)誤的方法是設(shè)置額外的內(nèi)存管道連接到日志sink,它會(huì)輸出所有的事件數(shù)據(jù)到Flume日志。有些情況,這種方法還不足夠。
為了記錄事件和配置相關(guān)數(shù)據(jù),必須設(shè)置一些java系統(tǒng)屬性在log4j屬性文件中。
為了記錄配置相關(guān)日志,設(shè)置-Dorg.apache.flume.log.printconfig=trueJava系統(tǒng)屬性。此屬性可放在命令行中或者設(shè)置在flume-env.sh的JAVA_OPTS變量中。
為了記錄數(shù)據(jù),如上設(shè)置-Dorg.apache.flume.log.rawdata=trueJava系統(tǒng)屬性。對(duì)于大多數(shù)組件,log4j日志級(jí)別必須設(shè)置為DEBUG或TRACE似的event-specific日志出現(xiàn)在Flume記錄中。
這是一個(gè)設(shè)置配置日志和原始數(shù)據(jù)日志的例子。同時(shí)設(shè)置了Log4j的記錄級(jí)別為DEBUG:
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=DEBUG,console -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
基于Zookeeper的配置
Flume支持使用Zookeeper配置agent。這個(gè)是一個(gè)實(shí)驗(yàn)特性。配置文件需要上傳到zookeeper中,在一個(gè)可配置前綴下。配置文件存儲(chǔ)在Zookeeper節(jié)點(diǎn)數(shù)據(jù)里。下面是a1 和 a2 agent在Zookeeper節(jié)點(diǎn)樹(shù)的配置情況。
- /flume
|- /a1 [Agent config file]
|- /a2 [Agent config file]
一旦上傳完配置文件,使用下面參數(shù)啟動(dòng)agent。
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
| Argument Name | Default | Description |
|---|---|---|
| z | - | Zookeeper連接字符串.以逗號(hào)分割的hostname:port |
| p | /flume | Zookeeper中存儲(chǔ)agent配置的根目錄 |
安裝第三方插件
Flume有完整的插件架構(gòu)。當(dāng)Flume通過(guò)source,channel,sink,serializer等組件,存在許多實(shí)現(xiàn)分割Flume。
始終可以使用flume-env.sh文件中的FLUME_CLASSPATH變量路徑添加自定義的Flume組件,F(xiàn)lume現(xiàn)在支持一個(gè)特殊的文件夾pluguins.d自動(dòng)獲得組件。這允許更簡(jiǎn)單的插件包管理問(wèn)題,更簡(jiǎn)單的調(diào)試和錯(cuò)誤定位,特別是依賴包的沖突。
plugins.d文件夾
plugins.d文件夾在$FLUME_HOME/plugins.d。在啟動(dòng)時(shí),flume-ng啟動(dòng)腳本查看plugins.d目錄文件,檢查符合一下格式的插件把它們導(dǎo)入到j(luò)ava路徑中。
插件目錄布局
plugins.d中的每個(gè)插件都可以有三個(gè)子目錄:
- lib - 插件的jar包
- libext - 插件的依賴包
- native - 任何需要的本地庫(kù)文件,如
.so文件。
下面是plugins.d目錄中包含兩個(gè)插件的例子
plugins.d/
plugins.d/custom-source-1/
plugins.d/custom-source-1/lib/my-source.jar
plugins.d/custom-source-1/libext/spring-core-2.5.6.jar
plugins.d/custom-source-2/
plugins.d/custom-source-2/lib/custom.jar
plugins.d/custom-source-2/native/gettext.so
數(shù)據(jù)提取
Flume支持很多從外部數(shù)據(jù)源提取數(shù)據(jù)的機(jī)制。
RPC
包含在Flume分布式系統(tǒng)的一個(gè)Avro客戶端可以使用avro遠(yuǎn)程方法調(diào)用發(fā)送一個(gè)給定的文件給Flume Avro source:
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上面的命令將把/usr/logs/log.10文件內(nèi)容傳遞到Flume source的監(jiān)聽(tīng)端口。
執(zhí)行命令
有一個(gè)exec類型的 source執(zhí)行一個(gè)給定的命令并消耗輸出(例如一個(gè)以\r、\n或\r\n分隔的單獨(dú)line)。
注意:Flume不支持tail作數(shù)據(jù)源,可以把tail封裝在exec source中傳輸文件
網(wǎng)絡(luò)流
Flume支持一下流行的日志流類型讀取數(shù)據(jù)機(jī)制,如:
- Avro
- Thrift
- Syslog
- Netcat
設(shè)置多agent流

為了在多個(gè)agent之間流動(dòng)數(shù)據(jù),前一個(gè)agent的sink和當(dāng)前的source需要都是avro類型設(shè)置相同的hostname和port。
結(jié)合
日志收集的常見(jiàn)場(chǎng)景是大量的日志生成客戶端發(fā)送數(shù)據(jù)到少量的消費(fèi)者agent存儲(chǔ)子系統(tǒng)。例如,日志從幾百個(gè)web服務(wù)器發(fā)送數(shù)據(jù)到十幾個(gè)agents然后存儲(chǔ)到HDFS集群。

這可以通過(guò)Flume配置許多avro sink第一層agent,
多路復(fù)用流
Flume支持多路復(fù)用事件流到一個(gè)或多個(gè)目的地。這是通過(guò)定義一個(gè)flow multiplexer來(lái)實(shí)現(xiàn)的,它可以復(fù)制或選擇事件流到一個(gè)或多個(gè)channels。

上面的示例展示了從一個(gè)foo agent扇出流到多個(gè)channels中。這種扇出可以復(fù)制或選擇。在復(fù)制流的情況下,每個(gè)事件都被發(fā)送到所有的channels中。在選擇的情況下,如果一個(gè)事件被發(fā)送到一些channels中,當(dāng)一個(gè)事件的屬性匹配一個(gè)預(yù)先配置的值。例如,如果一個(gè)事件的屬性
txnType設(shè)置為customer,name他會(huì)去channel1和channel3,如果設(shè)置為vendor,會(huì)去channel2,否則去channel3。這種映射可以在agent的配置文件中設(shè)置。
配置
之前章節(jié)提到,F(xiàn)lume agent的配置是從一個(gè)類似Java多層次屬性文件格式的文件中提取的。
定義流
使用單一的agent定義流,你需要使用channel連接source和sink。需要列出給定agent的source,sink和channel,然后指定source和sink到一個(gè)channel。一個(gè)source可以指定多個(gè)channels而一個(gè)sink只能指定一個(gè)channel。格式如下:
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# set channel for source
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# set channel for sink
<Agent>.sinks.<Sink>.channel = <Channel1>
例如,一個(gè)名為agent_foo的agent從外部avro客戶端讀取數(shù)據(jù)然后通過(guò)內(nèi)存channel發(fā)送到HDFS。配置文件weblog.config可能如下所示:
# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
這可以使事件流通過(guò)mem-channel-1從avro-AppSrv-source到hdfs-Cluster1-sink。當(dāng)agent從weblog.config配置文件開(kāi)始,將會(huì)實(shí)例化這個(gè)流。
配置單獨(dú)的組件
定義流后,你需要每個(gè)source,sink和channel的屬性。你設(shè)置組件類型和其他的指定組件的屬性,是使用相同的層次命名格式完成的。
# properties for sources
<Agent>.sources.<Source>.<someProperty> = <someValue>
# properties for channels
<Agent>.channel.<Channel>.<someProperty> = <someValue>
# properties for sinks
<Agent>.sources.<Sink>.<someProperty> = <someValue>
每個(gè)組件的·type·屬性都要設(shè)置為Flume能理解的類型。每個(gè)source,sink和channel類型都有它們自己需要的屬性和功能集合。在需要的情況這些都需要設(shè)置,前一個(gè)例子中,我們有一個(gè)使用內(nèi)存channel mem-channel-1從avro-AppSrv-source到hdfs-Cluster1-sink的流。這是展示每個(gè)組件的配置的例子:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# set channel for sources, sinks
# properties of avro-AppSrv-source
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
#...
添加在一個(gè)agent里添加多個(gè)流
一個(gè)Flume agent可以包含多個(gè)獨(dú)立流。你可以在一個(gè)配置文件列出多個(gè)sources,sinks和channels。這些組件可以同多個(gè)流連接。
# list the sources, sinks and channels for the agent
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
然后你可以使用相應(yīng)的channels連接sources和links設(shè)置兩個(gè)不同的流。例如,你需要在一個(gè)agent中設(shè)置兩個(gè)流,一個(gè)從外部的avro客戶端到外部的HDFS系統(tǒng)另外一個(gè)從外部的tail命令輸出到avro sink,有一個(gè)配置文件可以做到:
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
配置一個(gè)多agent流
設(shè)置一個(gè)多層流,你需要有一個(gè)avro/thrift sink到一個(gè)avro/thrift source。這會(huì)把第一個(gè)flume agent結(jié)果轉(zhuǎn)發(fā)下一個(gè)flume agent中。例如,如果你定期的使用avro客戶端傳遞文件到本地的flume agent中,那么這個(gè)本地的agent可以轉(zhuǎn)發(fā)到另一個(gè)agent中存儲(chǔ)。
Weblog agent 配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# define the flow
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# avro sink properties
agent_foo.sources.avro-forward-sink.type = avro
agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sources.avro-forward-sink.port = 10000
# configure other pieces
#...
HDFS agent配置:
# list sources, sinks and channels in the agent
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# define the flow
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# avro sink properties
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# configure other pieces
#...
此處我們連接weblogagent的the avro-forward-sink到hdfs agent的avro-collection-source。這將會(huì)使來(lái)自外部appserver的事件最終存儲(chǔ)在HDFS系統(tǒng)。
扇出流
前一節(jié)討論過(guò),F(xiàn)lume支持從一個(gè)source到多個(gè)channels的扇出流。有兩種模式的扇出方式:重復(fù)和多路選擇。在重復(fù)流中,事件被發(fā)送到配置的channels中。在多路選擇中,事件被發(fā)送到匹配的channels。為了實(shí)現(xiàn)扇出流,需要指定一個(gè)source的channels列表和定義扇出規(guī)則。這是通過(guò)添加一個(gè)selector的channel,其支持重復(fù)和多路選擇。如果是多路選擇還要指定選擇的規(guī)則。如果你不定義一個(gè)selector,默認(rèn)是重復(fù)。
# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
多路選擇有更多的分叉流屬性。需要為channel設(shè)置映射事件屬性。選擇器檢查每個(gè)事件頭的配置屬性。如果陪陪指定的值,就會(huì)把事件發(fā)送給所有匹配的channel。如果沒(méi)有匹配的,事件會(huì)被發(fā)送到默認(rèn)配置的channel。
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>
映射允許每個(gè)channel值的覆蓋。
下面的例子有一個(gè)流選擇到兩條路徑。名為agent_too的agent有一個(gè)avro source和兩個(gè)連接到兩個(gè)sinks的channels。
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
選擇器檢查每個(gè)State的頭。如果值是CA就發(fā)送到mem-channel-1,如果是AZ就到file-channel-2如果是NY就兩個(gè)都發(fā)。如果State頭沒(méi)有設(shè)置或者都不匹配,將會(huì)發(fā)送到默認(rèn)的mem-channel-1。
選擇器也支持可選的channel。為了指定可選channel,配置參數(shù)optional如下使用:
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
選擇器首相嘗試發(fā)送到必須的channel中,如果有channel消費(fèi)事件失敗,事務(wù)失敗。食物將再次嘗試發(fā)送到所有的channels中。如果所有的必須channels都消費(fèi)了事物。那么選擇器試圖發(fā)送給可選的channel??蛇xchannel消費(fèi)事件的失敗簡(jiǎn)單忽略不會(huì)導(dǎo)致重發(fā)。