1. 源碼編譯
- 下載地址
github選擇branch 1.9(https://github.com/apache/flume/tree/flume-1.9)
git clone git@github.com:apache/flume.git
- 配置maven依賴(lài)庫(kù)
下載完源碼之后按照maven項(xiàng)目導(dǎo)入到idea中,然后配置maven的依賴(lài)庫(kù)。
如果有國(guó)外代理可以不用配置,否則可將maven配置成國(guó)內(nèi)的庫(kù),比如阿里云的maven庫(kù):
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>*</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
<!--以上阿里云maven庫(kù)的配置來(lái)自網(wǎng)絡(luò),作者未親自驗(yàn)證,如遇問(wèn)題請(qǐng)讀者自行查資料解決。-->
本人采用的是公司內(nèi)部的maven庫(kù),依賴(lài)的包都比較全,編譯很順暢,沒(méi)有出現(xiàn)網(wǎng)上遇到編譯不通過(guò)的問(wèn)題,所以此章節(jié)的配置請(qǐng)讀者自行解決,網(wǎng)上有很多解決方案。
- 編譯源碼
如下命令編譯源碼:
mvn clean install -DskipTests
編譯完成會(huì)在每個(gè)模塊下看到各自相應(yīng)的target文件夾,里面有編譯之后的jar包。
[INFO] Reactor Summary:
[INFO]
[INFO] Build Support ...................................... SUCCESS [ 1.854 s]
[INFO] Apache Flume ....................................... SUCCESS [ 11.992 s]
[INFO] Flume NG SDK ....................................... SUCCESS [ 16.187 s]
[INFO] Flume NG Hadoop Credential Store Config Filter ..... SUCCESS [ 0.135 s]
[INFO] Flume NG Config Filters API ........................ SUCCESS [ 2.549 s]
[INFO] Flume NG Configuration ............................. SUCCESS [ 5.963 s]
[INFO] Flume Auth ......................................... SUCCESS [ 6.558 s]
[INFO] Flume NG Core ...................................... SUCCESS [ 18.504 s]
[INFO] Flume NG Sinks ..................................... SUCCESS [ 0.145 s]
[INFO] Flume NG HDFS Sink ................................. SUCCESS [ 8.566 s]
[INFO] Flume NG IRC Sink .................................. SUCCESS [ 3.347 s]
[INFO] Flume NG Channels .................................. SUCCESS [ 0.115 s]
[INFO] Flume NG JDBC channel .............................. SUCCESS [ 5.174 s]
[INFO] Flume NG file-based channel ........................ SUCCESS [ 11.072 s]
[INFO] Flume NG Spillable Memory channel .................. SUCCESS [ 3.804 s]
[INFO] Flume NG Node ...................................... SUCCESS [ 10.520 s]
[INFO] Flume NG Embedded Agent ............................ SUCCESS [ 3.632 s]
[INFO] Flume NG HBase Sink ................................ SUCCESS [ 7.410 s]
[INFO] Flume NG HBase2 Sink ............................... SUCCESS [ 8.654 s]
[INFO] Flume NG ElasticSearch Sink ........................ SUCCESS [ 4.994 s]
[INFO] Flume NG Morphline Solr Sink ....................... SUCCESS [ 6.844 s]
[INFO] Flume Shared Utils ................................. SUCCESS [ 0.067 s]
[INFO] Flume Shared Kafka ................................. SUCCESS [ 2.755 s]
[INFO] Flume Shared Kafka Test Utils ...................... SUCCESS [ 3.389 s]
[INFO] Flume Kafka Sink ................................... SUCCESS [ 3.689 s]
[INFO] Flume HTTP/S Sink .................................. SUCCESS [ 3.416 s]
[INFO] Flume NG Kite Dataset Sink ......................... SUCCESS [ 5.341 s]
[INFO] Flume NG Hive Sink ................................. SUCCESS [ 4.872 s]
[INFO] Flume Sources ...................................... SUCCESS [ 0.091 s]
[INFO] Flume Scribe Source ................................ SUCCESS [ 4.130 s]
[INFO] Flume JMS Source ................................... SUCCESS [ 3.664 s]
[INFO] Flume Twitter Source ............................... SUCCESS [ 3.274 s]
[INFO] Flume Kafka Source ................................. SUCCESS [ 4.323 s]
[INFO] Flume Taildir Source ............................... SUCCESS [ 4.337 s]
[INFO] flume-kafka-channel ................................ SUCCESS [ 3.949 s]
[INFO] Flume legacy Sources ............................... SUCCESS [ 0.050 s]
[INFO] Flume legacy Avro source ........................... SUCCESS [ 3.007 s]
[INFO] Flume legacy Thrift Source ......................... SUCCESS [ 4.780 s]
[INFO] Flume NG Environment Variable Config Filter ........ SUCCESS [ 2.054 s]
[INFO] flume-ng-hadoop-credential-store-config-filter ..... SUCCESS [ 2.773 s]
[INFO] Flume NG External Process Config Filter ............ SUCCESS [ 2.402 s]
[INFO] Flume NG Clients ................................... SUCCESS [ 0.059 s]
[INFO] Flume NG Log4j Appender ............................ SUCCESS [ 6.521 s]
[INFO] Flume NG Tools ..................................... SUCCESS [ 2.914 s]
[INFO] Flume NG distribution .............................. SUCCESS [ 11.804 s]
[INFO] Flume NG Integration Tests ......................... SUCCESS [ 3.099 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:45 min
[INFO] Finished at: 2019-01-21T11:54:33+08:00
[INFO] Final Memory: 252M/1584M
2. flume調(diào)試
2.1 flume使用
這里我們先簡(jiǎn)單復(fù)習(xí)下flume的使用,下面用一個(gè)最簡(jiǎn)單的例子做介紹。
啟動(dòng)一個(gè)flume agent的命令需要指定調(diào)用的模塊名稱(chēng),可用的模塊包括:help, agent, avro-client, tool, version等,啟動(dòng)一個(gè)agent的命令格式如下:
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
首先新建一個(gè)agent的配置文件:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
然后執(zhí)行如下命令:
bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=DEBUG,console
其中,--conf 的簡(jiǎn)稱(chēng)為 -c,--conf-file 的簡(jiǎn)稱(chēng)為 -f,--name 的簡(jiǎn)稱(chēng)為 -n。
flume執(zhí)行腳本主函數(shù)run_flume()中默認(rèn)會(huì)執(zhí)行 set -x 的語(yǔ)句,所以執(zhí)行flume啟動(dòng)命令之后,可以在日志中發(fā)現(xiàn)腳本最終執(zhí)行了如下命令:
exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
通過(guò)telnet工具與flume的source建立鏈接,發(fā)送字符串,flume在接收到數(shù)據(jù)之后,logger sink將接受到的數(shù)據(jù)打印在屏幕上。
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
2.2 程序入口
flume-ng腳本:
################################
# constants
################################
FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
...
run_flume() {
local FLUME_APPLICATION_CLASS
if [ "$#" -gt 0 ]; then
FLUME_APPLICATION_CLASS=$1
shift
else
error "Must specify flume application class" 1
fi
if [ ${CLEAN_FLAG} -ne 0 ]; then
set -x
fi
$EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
-Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}
...
# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then
run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
run_flume $FLUME_TOOLS_CLASS $args
else
error "This message should never appear" 1
fi
從腳本可以看出org.apache.flume.node.Application 為程序的主入口。
在idea中利用Alt+7命令查看類(lèi)的structure結(jié)構(gòu)如下:

2.3 遠(yuǎn)程Debug
為了便于對(duì)代碼進(jìn)行debug分析,下面介紹一下flume的遠(yuǎn)程debug的配置方法。總共分為兩步:第一步,修改flume啟動(dòng)腳本;第二步,idea的debug配置中添加remote配置項(xiàng)。
(1)修改flume啟動(dòng)腳本
打開(kāi)flume-ng啟動(dòng)文件,找到"JAVA_OPTS=",添加如下內(nèi)容:

JAVA_OPTS="-Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y"
(2)修改flume啟動(dòng)腳本
在idea界面上依次點(diǎn)擊"Run"->"debug..."->"Edit Configurations",點(diǎn)擊左上角的加號(hào),新增一個(gè)remote配置項(xiàng),idea的默認(rèn)端口號(hào)是5005,這里的端口號(hào)一定要跟flume配置的一致。

配置完成,啟動(dòng)flume,會(huì)看到flume正在監(jiān)聽(tīng)5005端口,此時(shí)啟動(dòng)idea調(diào)試。
+ exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y -Dflume.root.logger=DEBUG,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
Listening for transport dt_socket at address: 5005

2.4 代碼分析
我們從main函數(shù)開(kāi)始分析。
首先是第一行初始化ssl的全局參數(shù):
public static void main(String[] args) {
try {
/*初始化ssl的全局參數(shù),利用System.getEnv()和System.setProperty(), flume可利用ssl進(jìn)行加解密傳輸*/
SSLUtil.initGlobalSSLParameters();
進(jìn)入函數(shù)內(nèi)部發(fā)現(xiàn)主要是調(diào)用initSysPropFromEnvVar函數(shù)將系統(tǒng)級(jí)別的關(guān)于ssl的參數(shù)放到Property中。關(guān)于 System.getEnv() 和 System.getProperty() 的對(duì)比詳見(jiàn)http://www.itdecent.cn/p/dbe4795b61ac 。
private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
String description) {
if (System.getProperty(sysPropName) != null) {
LOGGER.debug("Global SSL " + description + " has been initialized from system property.");
} else {
String envVarValue = System.getenv(envVarName);
if (envVarValue != null) {
System.setProperty(sysPropName, envVarValue);
LOGGER.debug("Global SSL " + description +
" has been initialized from environment variable.");
} else {
LOGGER.debug("No global SSL " + description + " specified.");
}
}
}
flume啟動(dòng)日志中也可以查看初始化ssl參數(shù)的過(guò)程:
2019-02-24 12:05:55,036 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore path specified.
2019-02-24 12:05:55,040 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore password specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore type specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore path specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore password specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore type specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include cipher suites specified.
2019-02-24 12:05:55,047 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude cipher suites specified.
下面繼續(xù)看main函數(shù),接下來(lái)是參數(shù)解析部分,先上代碼。
/*參數(shù)解析*/
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
option = new Option("f", "conf-file", true,
"specify a config file (required if -z missing)");
option.setRequired(false);
options.addOption(option);
option = new Option(null, "no-reload-conf", false,
"do not reload config file if changed");
options.addOption(option);
// Options for Zookeeper
option = new Option("z", "zkConnString", true,
"specify the ZooKeeper connection to use (required if -f missing)");
option.setRequired(false);
options.addOption(option);
option = new Option("p", "zkBasePath", true,
"specify the base path in ZooKeeper for agent configs");
option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
options.addOption(option);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
String agentName = commandLine.getOptionValue('n');
boolean reload = !commandLine.hasOption("no-reload-conf");
boolean isZkConfigured = false;
if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
isZkConfigured = true;
}
在調(diào)試的時(shí)候很容易發(fā)現(xiàn),執(zhí)行章節(jié)2.1的flume命令,main函數(shù)中收到的參數(shù)包括--conf-file和--name:
args: --conf-file ./conf/example.conf --name a1
flume的配置文件有兩種獲取方式,可從zookeeper或者文件中獲取配置信息。每種方式都含有自動(dòng)更新配置(重啟所有組件)和不自動(dòng)更新配置兩種操作。
參數(shù)zkConnString不為空時(shí),會(huì)直接從zookeeper中獲取配置信息,否則從文件中獲取。如果加上配置參數(shù) no-reload-conf,flume不會(huì)自動(dòng)更新配置參數(shù),默認(rèn)不加這個(gè)參數(shù)flume會(huì)自動(dòng)監(jiān)聽(tīng)配置信息的變化并且利用eventBus觸發(fā)重讀配置文件并重新啟動(dòng)所有組件。
以從文件獲取配置信息,并且監(jiān)聽(tīng)配置文件變化自動(dòng)重啟所有組件的情況來(lái)舉例說(shuō)明flume的調(diào)用順序,代碼如下:
//...省略若干行
boolean reload = !commandLine.hasOption("no-reload-conf");
//...省略若干行
List<LifecycleAware> components = Lists.newArrayList();
if (reload) {
EventBus eventBus = new EventBus(agentName + "-event-bus");
PollingPropertiesFileConfigurationProvider configurationProvider =
new PollingPropertiesFileConfigurationProvider(
agentName, configurationFile, eventBus, 30);
components.add(configurationProvider);
application = new Application(components);
eventBus.register(application);
}
//...省略若干行
application.start();
如果含有參數(shù)no-reload-conf,則 reload=true。
以上代碼用到了guava EventBus,guava的EventBus是觀察者模式的一種優(yōu)雅的解決方案,利用EventBus實(shí)現(xiàn)事件的發(fā)布和訂閱,可以節(jié)省很多工作量。guava EventBus的原理和使用參見(jiàn):http://www.itdecent.cn/p/f8ba312904f4 。EventBus的觀察者(事件訂閱者)需要用@Subscribe 注釋標(biāo)注的函數(shù)來(lái)處理事件發(fā)布者發(fā)過(guò)來(lái)的事件。EventBus.register()用來(lái)注冊(cè)觀察者。
在類(lèi)Application中,我們可以找到事件處理方法handleConfigurationEvent(MaterializedConfiguration conf)。
/*guava EventBus中用@Subscribe標(biāo)記,定義監(jiān)聽(tīng)處理方法*/
@Subscribe
public void handleConfigurationEvent(MaterializedConfiguration conf) {
try {
lifecycleLock.lockInterruptibly();
stopAllComponents();
startAllComponents(conf);
} catch (InterruptedException e) {
logger.info("Interrupted while trying to handle configuration event");
return;
} finally {
// If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
if (lifecycleLock.isHeldByCurrentThread()) {
lifecycleLock.unlock();
}
}
}
該方法調(diào)用stopAllComponents()和startAllComponents(conf)函數(shù)對(duì)所有的組件進(jìn)行了重啟。
找到了事件的處理邏輯,那么往EventBus發(fā)送事件的發(fā)布者在哪里??
帶著問(wèn)題,我們需要重新回到剛才的代碼,可以看到在創(chuàng)建完 EventBus 對(duì)象之后,又new了一個(gè)類(lèi)PollingPropertiesFileConfigurationProvider的對(duì)象,該類(lèi)實(shí)現(xiàn)了接口LifecycleAware,flume中所有的組件都實(shí)現(xiàn)自該接口。

最終,PollingPropertiesFileConfigurationProvider的對(duì)象被添加到全局屬性
List<LifecycleAware> components中,
public Application(List<LifecycleAware> components) {
this.components = components;
supervisor = new LifecycleSupervisor();
}
然后調(diào)用Application的start()方法,對(duì)components進(jìn)行啟動(dòng)。
public void start() {
lifecycleLock.lock();
try {
for (LifecycleAware component : components) {
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
}
} finally {
lifecycleLock.unlock();
}
}
具體每個(gè)component是怎么啟動(dòng)的,我們可以深入到LifecycleSupervisor.supervise()函數(shù)中查看:
MonitorRunnable monitorRunnable = new MonitorRunnable();
monitorRunnable.lifecycleAware = lifecycleAware;
monitorRunnable.supervisoree = process;
monitorRunnable.monitorService = monitorService;
supervisedProcesses.put(lifecycleAware, process);
ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
monitorRunnable, 0, 3, TimeUnit.SECONDS);
monitorFutures.put(lifecycleAware, future);
通過(guò)ScheduleWithFixedDelay延時(shí)調(diào)用任務(wù)monitorRunnable,任務(wù)執(zhí)行完之后,等待3s繼續(xù)調(diào)度執(zhí)行。
MonitorRunnable的run函數(shù)中lifecycleAware.start()說(shuō)明執(zhí)行了傳入組件的start()方法。
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
回到剛才的PollingPropertiesFileConfigurationProvider類(lèi)中,我們發(fā)現(xiàn)在start()方法中,new了一個(gè)單線(xiàn)程執(zhí)行器Executors.newSingleThreadScheduledExecutor(),然后每隔30s(interval=30s,Application類(lèi)調(diào)用的時(shí)候傳入)調(diào)度執(zhí)行一次FileWatcherRunnable任務(wù)。
public PollingPropertiesFileConfigurationProvider(String agentName,
File file, EventBus eventBus, int interval) {
super(agentName, file);
this.eventBus = eventBus;
this.file = file;
this.interval = interval;
counterGroup = new CounterGroup();
lifecycleState = LifecycleState.IDLE;
}
@Override
public void start() {
LOGGER.info("Configuration provider starting");
Preconditions.checkState(file != null,
"The parameter file must not be null");
executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
.build());
/*新啟動(dòng)一個(gè)線(xiàn)程,監(jiān)控到有文件變動(dòng)就將getConfiguration加到eventBus中,eventBus有事件更新會(huì)調(diào)用Application類(lèi)中
* 用@Subscribe修飾的函數(shù),也就是 public void handleConfigurationEvent(MaterializedConfiguration conf)
* eventBus.post(getConfiguration())將conf對(duì)象通過(guò)總線(xiàn)傳給了handleConfigurationEvent去處理*/
FileWatcherRunnable fileWatcherRunnable =
new FileWatcherRunnable(file, counterGroup);
executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
TimeUnit.SECONDS);
lifecycleState = LifecycleState.START;
LOGGER.debug("Configuration provider started");
}
FileWatcherRunnable任務(wù)用于監(jiān)控配置文件的變化,
long lastModified = file.lastModified();
if (lastModified > lastChange) {
//省略若干行
如果配置文件發(fā)生變化,則調(diào)用eventBus.post(getConfiguration())語(yǔ)句將事件發(fā)送到eventBus主線(xiàn),eventBus負(fù)責(zé)調(diào)用觀察者(Application)調(diào)用事件處理函數(shù)(handleConfigurationEvent(MaterializedConfiguration conf))處理事件。
public class FileWatcherRunnable implements Runnable {
private final File file;
private final CounterGroup counterGroup;
private long lastChange;
public FileWatcherRunnable(File file, CounterGroup counterGroup) {
super();
this.file = file;
this.counterGroup = counterGroup;
this.lastChange = 0L;
}
@Override
public void run() {
LOGGER.debug("Checking file:{} for changes", file);
counterGroup.incrementAndGet("file.checks");
long lastModified = file.lastModified();
if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);
counterGroup.incrementAndGet("file.loads");
lastChange = lastModified;
try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
}
到這里整個(gè)flume的啟動(dòng)過(guò)程就講完了,有人會(huì)問(wèn),這里只啟動(dòng)了PollingPropertiesFileConfigurationProvider,并沒(méi)有啟動(dòng)flume的channel、source和sink。其實(shí)在第一次啟動(dòng)的時(shí)候,lastModified和lastChange這兩個(gè)值是不相等的,
//省略若干行
this.lastChange = 0L;
//省略若干行
if (lastModified > lastChange) {
//省略若干行
就會(huì)觸發(fā)eventBus,調(diào)用handleConfigurationEvent函數(shù),handleConfigurationEvent函數(shù)中有語(yǔ)句startAllComponents(conf),里面有對(duì)channel、source和sink的啟動(dòng)語(yǔ)句,具體在下一篇文章里介紹。
2.5回顧
下面我們總結(jié)一下整個(gè)flume的調(diào)用順序。
Application->LifecycleSupervisor-(3s調(diào)度一次)>MonitorRunnable->PollingPropertiesFileConfigurationProvider-(30s調(diào)度一次)>FileWatcherRunnable->EventBus->Application
期間我們看到一個(gè)調(diào)度器調(diào)度了另一個(gè)調(diào)度器,而且間隔幾秒一次,為什么沒(méi)有出現(xiàn)多個(gè)重復(fù)任務(wù)實(shí)例被調(diào)度起來(lái)?
supervisor.supervise(component,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
我們看到在LifecycleSupervisor執(zhí)行調(diào)度的時(shí)候傳入了一個(gè)LifecycleState.START值,這個(gè)值便是下面代碼(MonitorRunnable的run函數(shù))中的desiredState:
//省略若干行
if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {
logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });
switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
//省略若干行
實(shí)現(xiàn)lifecycleAware接口的PollingPropertiesFileConfigurationProvider類(lèi)在首次調(diào)用start()函數(shù)的時(shí)候,就已經(jīng)將lifecycleState的值變?yōu)镾TART:lifecycleState = LifecycleState.START;
所以調(diào)度器在之后的調(diào)度過(guò)程中,由于if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))if條件不成立,便不會(huì)有新的任務(wù)被調(diào)度起來(lái)。PollingPropertiesFileConfigurationProvider任務(wù)只有一個(gè)線(xiàn)程實(shí)例,又由于調(diào)度FileWatcherRunnable的是一個(gè)單線(xiàn)程調(diào)度器,F(xiàn)ileWatcherRunnable任務(wù)也只有一個(gè)線(xiàn)程實(shí)例。同理,各個(gè)channel、source和sink也都沒(méi)有重復(fù)實(shí)例被調(diào)度起來(lái)。