flume1.9源碼分析(一)從編譯到啟動(dòng)

1. 源碼編譯

  1. 下載地址
    github選擇branch 1.9(https://github.com/apache/flume/tree/flume-1.9

git clone git@github.com:apache/flume.git

  1. 配置maven依賴(lài)庫(kù)
    下載完源碼之后按照maven項(xiàng)目導(dǎo)入到idea中,然后配置maven的依賴(lài)庫(kù)。
    如果有國(guó)外代理可以不用配置,否則可將maven配置成國(guó)內(nèi)的庫(kù),比如阿里云的maven庫(kù):
<mirror>  
    <id>nexus-aliyun</id>
    <mirrorOf>*</mirrorOf>
    <name>Nexus aliyun</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public</url>
</mirror>
<!--以上阿里云maven庫(kù)的配置來(lái)自網(wǎng)絡(luò),作者未親自驗(yàn)證,如遇問(wèn)題請(qǐng)讀者自行查資料解決。-->

本人采用的是公司內(nèi)部的maven庫(kù),依賴(lài)的包都比較全,編譯很順暢,沒(méi)有出現(xiàn)網(wǎng)上遇到編譯不通過(guò)的問(wèn)題,所以此章節(jié)的配置請(qǐng)讀者自行解決,網(wǎng)上有很多解決方案。

  1. 編譯源碼
    如下命令編譯源碼:
    mvn clean install -DskipTests
    編譯完成會(huì)在每個(gè)模塊下看到各自相應(yīng)的target文件夾,里面有編譯之后的jar包。
[INFO] Reactor Summary:
[INFO] 
[INFO] Build Support ...................................... SUCCESS [  1.854 s]
[INFO] Apache Flume ....................................... SUCCESS [ 11.992 s]
[INFO] Flume NG SDK ....................................... SUCCESS [ 16.187 s]
[INFO] Flume NG Hadoop Credential Store Config Filter ..... SUCCESS [  0.135 s]
[INFO] Flume NG Config Filters API ........................ SUCCESS [  2.549 s]
[INFO] Flume NG Configuration ............................. SUCCESS [  5.963 s]
[INFO] Flume Auth ......................................... SUCCESS [  6.558 s]
[INFO] Flume NG Core ...................................... SUCCESS [ 18.504 s]
[INFO] Flume NG Sinks ..................................... SUCCESS [  0.145 s]
[INFO] Flume NG HDFS Sink ................................. SUCCESS [  8.566 s]
[INFO] Flume NG IRC Sink .................................. SUCCESS [  3.347 s]
[INFO] Flume NG Channels .................................. SUCCESS [  0.115 s]
[INFO] Flume NG JDBC channel .............................. SUCCESS [  5.174 s]
[INFO] Flume NG file-based channel ........................ SUCCESS [ 11.072 s]
[INFO] Flume NG Spillable Memory channel .................. SUCCESS [  3.804 s]
[INFO] Flume NG Node ...................................... SUCCESS [ 10.520 s]
[INFO] Flume NG Embedded Agent ............................ SUCCESS [  3.632 s]
[INFO] Flume NG HBase Sink ................................ SUCCESS [  7.410 s]
[INFO] Flume NG HBase2 Sink ............................... SUCCESS [  8.654 s]
[INFO] Flume NG ElasticSearch Sink ........................ SUCCESS [  4.994 s]
[INFO] Flume NG Morphline Solr Sink ....................... SUCCESS [  6.844 s]
[INFO] Flume Shared Utils ................................. SUCCESS [  0.067 s]
[INFO] Flume Shared Kafka ................................. SUCCESS [  2.755 s]
[INFO] Flume Shared Kafka Test Utils ...................... SUCCESS [  3.389 s]
[INFO] Flume Kafka Sink ................................... SUCCESS [  3.689 s]
[INFO] Flume HTTP/S Sink .................................. SUCCESS [  3.416 s]
[INFO] Flume NG Kite Dataset Sink ......................... SUCCESS [  5.341 s]
[INFO] Flume NG Hive Sink ................................. SUCCESS [  4.872 s]
[INFO] Flume Sources ...................................... SUCCESS [  0.091 s]
[INFO] Flume Scribe Source ................................ SUCCESS [  4.130 s]
[INFO] Flume JMS Source ................................... SUCCESS [  3.664 s]
[INFO] Flume Twitter Source ............................... SUCCESS [  3.274 s]
[INFO] Flume Kafka Source ................................. SUCCESS [  4.323 s]
[INFO] Flume Taildir Source ............................... SUCCESS [  4.337 s]
[INFO] flume-kafka-channel ................................ SUCCESS [  3.949 s]
[INFO] Flume legacy Sources ............................... SUCCESS [  0.050 s]
[INFO] Flume legacy Avro source ........................... SUCCESS [  3.007 s]
[INFO] Flume legacy Thrift Source ......................... SUCCESS [  4.780 s]
[INFO] Flume NG Environment Variable Config Filter ........ SUCCESS [  2.054 s]
[INFO] flume-ng-hadoop-credential-store-config-filter ..... SUCCESS [  2.773 s]
[INFO] Flume NG External Process Config Filter ............ SUCCESS [  2.402 s]
[INFO] Flume NG Clients ................................... SUCCESS [  0.059 s]
[INFO] Flume NG Log4j Appender ............................ SUCCESS [  6.521 s]
[INFO] Flume NG Tools ..................................... SUCCESS [  2.914 s]
[INFO] Flume NG distribution .............................. SUCCESS [ 11.804 s]
[INFO] Flume NG Integration Tests ......................... SUCCESS [  3.099 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 03:45 min
[INFO] Finished at: 2019-01-21T11:54:33+08:00
[INFO] Final Memory: 252M/1584M

2. flume調(diào)試

2.1 flume使用

這里我們先簡(jiǎn)單復(fù)習(xí)下flume的使用,下面用一個(gè)最簡(jiǎn)單的例子做介紹。
啟動(dòng)一個(gè)flume agent的命令需要指定調(diào)用的模塊名稱(chēng),可用的模塊包括:help, agent, avro-client, tool, version等,啟動(dòng)一個(gè)agent的命令格式如下:
bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

首先新建一個(gè)agent的配置文件:

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

然后執(zhí)行如下命令:
bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=DEBUG,console
其中,--conf 的簡(jiǎn)稱(chēng)為 -c,--conf-file 的簡(jiǎn)稱(chēng)為 -f,--name 的簡(jiǎn)稱(chēng)為 -n

flume執(zhí)行腳本主函數(shù)run_flume()中默認(rèn)會(huì)執(zhí)行 set -x 的語(yǔ)句,所以執(zhí)行flume啟動(dòng)命令之后,可以在日志中發(fā)現(xiàn)腳本最終執(zhí)行了如下命令:
exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
通過(guò)telnet工具與flume的source建立鏈接,發(fā)送字符串,flume在接收到數(shù)據(jù)之后,logger sink將接受到的數(shù)據(jù)打印在屏幕上。

$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
2.2 程序入口

flume-ng腳本:

################################
# constants
################################

FLUME_AGENT_CLASS="org.apache.flume.node.Application"
FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"

...

run_flume() {
  local FLUME_APPLICATION_CLASS

  if [ "$#" -gt 0 ]; then
    FLUME_APPLICATION_CLASS=$1
    shift
  else
    error "Must specify flume application class" 1
  fi

  if [ ${CLEAN_FLAG} -ne 0 ]; then
    set -x
  fi
  $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
      -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
}

...

# finally, invoke the appropriate command
if [ -n "$opt_agent" ] ; then
  run_flume $FLUME_AGENT_CLASS $args
elif [ -n "$opt_avro_client" ] ; then
  run_flume $FLUME_AVRO_CLIENT_CLASS $args
elif [ -n "${opt_version}" ] ; then
  run_flume $FLUME_VERSION_CLASS $args
elif [ -n "${opt_tool}" ] ; then
  run_flume $FLUME_TOOLS_CLASS $args
else
  error "This message should never appear" 1
fi

從腳本可以看出org.apache.flume.node.Application 為程序的主入口。

在idea中利用Alt+7命令查看類(lèi)的structure結(jié)構(gòu)如下:


Application類(lèi)結(jié)構(gòu)
2.3 遠(yuǎn)程Debug

為了便于對(duì)代碼進(jìn)行debug分析,下面介紹一下flume的遠(yuǎn)程debug的配置方法。總共分為兩步:第一步,修改flume啟動(dòng)腳本;第二步,idea的debug配置中添加remote配置項(xiàng)。

(1)修改flume啟動(dòng)腳本
打開(kāi)flume-ng啟動(dòng)文件,找到"JAVA_OPTS=",添加如下內(nèi)容:

flume-ng配置文件

JAVA_OPTS="-Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y"

(2)修改flume啟動(dòng)腳本
在idea界面上依次點(diǎn)擊"Run"->"debug..."->"Edit Configurations",點(diǎn)擊左上角的加號(hào),新增一個(gè)remote配置項(xiàng),idea的默認(rèn)端口號(hào)是5005,這里的端口號(hào)一定要跟flume配置的一致。

idea端debug添加remote配置項(xiàng)

配置完成,啟動(dòng)flume,會(huì)看到flume正在監(jiān)聽(tīng)5005端口,此時(shí)啟動(dòng)idea調(diào)試。

+ exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y -Dflume.root.logger=DEBUG,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
Listening for transport dt_socket at address: 5005
2.4 代碼分析

我們從main函數(shù)開(kāi)始分析。
首先是第一行初始化ssl的全局參數(shù):

public static void main(String[] args) {

    try {
      /*初始化ssl的全局參數(shù),利用System.getEnv()和System.setProperty(), flume可利用ssl進(jìn)行加解密傳輸*/
      SSLUtil.initGlobalSSLParameters();

進(jìn)入函數(shù)內(nèi)部發(fā)現(xiàn)主要是調(diào)用initSysPropFromEnvVar函數(shù)將系統(tǒng)級(jí)別的關(guān)于ssl的參數(shù)放到Property中。關(guān)于 System.getEnv()System.getProperty() 的對(duì)比詳見(jiàn)http://www.itdecent.cn/p/dbe4795b61ac 。

 private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
                                            String description) {
    if (System.getProperty(sysPropName) != null) {
      LOGGER.debug("Global SSL " + description + " has been initialized from system property.");
    } else {
      String envVarValue = System.getenv(envVarName);
      if (envVarValue != null) {
        System.setProperty(sysPropName, envVarValue);
        LOGGER.debug("Global SSL " + description +
            " has been initialized from environment variable.");
      } else {
        LOGGER.debug("No global SSL " + description + " specified.");
      }
    }
  }

flume啟動(dòng)日志中也可以查看初始化ssl參數(shù)的過(guò)程:

2019-02-24 12:05:55,036 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore path specified.
2019-02-24 12:05:55,040 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore password specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore type specified.
2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore path specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore password specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore type specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude protocols specified.
2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include cipher suites specified.
2019-02-24 12:05:55,047 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude cipher suites specified.

下面繼續(xù)看main函數(shù),接下來(lái)是參數(shù)解析部分,先上代碼。

  /*參數(shù)解析*/
      Options options = new Options();

      Option option = new Option("n", "name", true, "the name of this agent");
      option.setRequired(true);
      options.addOption(option);

      option = new Option("f", "conf-file", true,
          "specify a config file (required if -z missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option(null, "no-reload-conf", false,
          "do not reload config file if changed");
      options.addOption(option);

      // Options for Zookeeper
      option = new Option("z", "zkConnString", true,
          "specify the ZooKeeper connection to use (required if -f missing)");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("p", "zkBasePath", true,
          "specify the base path in ZooKeeper for agent configs");
      option.setRequired(false);
      options.addOption(option);

      option = new Option("h", "help", false, "display help text");
      options.addOption(option);

      CommandLineParser parser = new GnuParser();
      CommandLine commandLine = parser.parse(options, args);

      if (commandLine.hasOption('h')) {
        new HelpFormatter().printHelp("flume-ng agent", options, true);
        return;
      }

      String agentName = commandLine.getOptionValue('n');
      boolean reload = !commandLine.hasOption("no-reload-conf");

      boolean isZkConfigured = false;
      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
        isZkConfigured = true;
      }

在調(diào)試的時(shí)候很容易發(fā)現(xiàn),執(zhí)行章節(jié)2.1的flume命令,main函數(shù)中收到的參數(shù)包括--conf-file和--name:
args: --conf-file ./conf/example.conf --name a1

flume的配置文件有兩種獲取方式,可從zookeeper或者文件中獲取配置信息。每種方式都含有自動(dòng)更新配置(重啟所有組件)和不自動(dòng)更新配置兩種操作。

參數(shù)zkConnString不為空時(shí),會(huì)直接從zookeeper中獲取配置信息,否則從文件中獲取。如果加上配置參數(shù) no-reload-conf,flume不會(huì)自動(dòng)更新配置參數(shù),默認(rèn)不加這個(gè)參數(shù)flume會(huì)自動(dòng)監(jiān)聽(tīng)配置信息的變化并且利用eventBus觸發(fā)重讀配置文件并重新啟動(dòng)所有組件。

以從文件獲取配置信息,并且監(jiān)聽(tīng)配置文件變化自動(dòng)重啟所有組件的情況來(lái)舉例說(shuō)明flume的調(diào)用順序,代碼如下:

//...省略若干行
 boolean reload = !commandLine.hasOption("no-reload-conf");
//...省略若干行
List<LifecycleAware> components = Lists.newArrayList();

        if (reload) {
          EventBus eventBus = new EventBus(agentName + "-event-bus");
          PollingPropertiesFileConfigurationProvider configurationProvider =
              new PollingPropertiesFileConfigurationProvider(
                  agentName, configurationFile, eventBus, 30);
          components.add(configurationProvider);
          application = new Application(components);
          eventBus.register(application);
        } 
//...省略若干行
 application.start();

如果含有參數(shù)no-reload-conf,則 reload=true

以上代碼用到了guava EventBus,guava的EventBus是觀察者模式的一種優(yōu)雅的解決方案,利用EventBus實(shí)現(xiàn)事件的發(fā)布和訂閱,可以節(jié)省很多工作量。guava EventBus的原理和使用參見(jiàn):http://www.itdecent.cn/p/f8ba312904f4 。EventBus的觀察者(事件訂閱者)需要用@Subscribe 注釋標(biāo)注的函數(shù)來(lái)處理事件發(fā)布者發(fā)過(guò)來(lái)的事件。EventBus.register()用來(lái)注冊(cè)觀察者。
在類(lèi)Application中,我們可以找到事件處理方法handleConfigurationEvent(MaterializedConfiguration conf)。

/*guava EventBus中用@Subscribe標(biāo)記,定義監(jiān)聽(tīng)處理方法*/
  @Subscribe
  public void handleConfigurationEvent(MaterializedConfiguration conf) {
    try {
      lifecycleLock.lockInterruptibly();
      stopAllComponents();
      startAllComponents(conf);
    } catch (InterruptedException e) {
      logger.info("Interrupted while trying to handle configuration event");
      return;
    } finally {
      // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
      if (lifecycleLock.isHeldByCurrentThread()) {
        lifecycleLock.unlock();
      }
    }
  }

該方法調(diào)用stopAllComponents()和startAllComponents(conf)函數(shù)對(duì)所有的組件進(jìn)行了重啟。

找到了事件的處理邏輯,那么往EventBus發(fā)送事件的發(fā)布者在哪里??
帶著問(wèn)題,我們需要重新回到剛才的代碼,可以看到在創(chuàng)建完 EventBus 對(duì)象之后,又new了一個(gè)類(lèi)PollingPropertiesFileConfigurationProvider的對(duì)象,該類(lèi)實(shí)現(xiàn)了接口LifecycleAware,flume中所有的組件都實(shí)現(xiàn)自該接口。

PollingPropertiesFileConfigurationProvider繼承關(guān)系

最終,PollingPropertiesFileConfigurationProvider的對(duì)象被添加到全局屬性List<LifecycleAware> components中,

 public Application(List<LifecycleAware> components) {
    this.components = components;
    supervisor = new LifecycleSupervisor();
  }

然后調(diào)用Application的start()方法,對(duì)components進(jìn)行啟動(dòng)。

  public void start() {
    lifecycleLock.lock();
    try {
      for (LifecycleAware component : components) {
        supervisor.supervise(component,
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
      }
    } finally {
      lifecycleLock.unlock();
    }
  }

具體每個(gè)component是怎么啟動(dòng)的,我們可以深入到LifecycleSupervisor.supervise()函數(shù)中查看:

    MonitorRunnable monitorRunnable = new MonitorRunnable();
    monitorRunnable.lifecycleAware = lifecycleAware;
    monitorRunnable.supervisoree = process;
    monitorRunnable.monitorService = monitorService;

    supervisedProcesses.put(lifecycleAware, process);

    ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
        monitorRunnable, 0, 3, TimeUnit.SECONDS);
    monitorFutures.put(lifecycleAware, future);

通過(guò)ScheduleWithFixedDelay延時(shí)調(diào)用任務(wù)monitorRunnable,任務(wù)執(zhí)行完之后,等待3s繼續(xù)調(diào)度執(zhí)行。
MonitorRunnable的run函數(shù)中lifecycleAware.start()說(shuō)明執(zhí)行了傳入組件的start()方法。

          switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();

回到剛才的PollingPropertiesFileConfigurationProvider類(lèi)中,我們發(fā)現(xiàn)在start()方法中,new了一個(gè)單線(xiàn)程執(zhí)行器Executors.newSingleThreadScheduledExecutor(),然后每隔30s(interval=30s,Application類(lèi)調(diào)用的時(shí)候傳入)調(diào)度執(zhí)行一次FileWatcherRunnable任務(wù)。

 public PollingPropertiesFileConfigurationProvider(String agentName,
      File file, EventBus eventBus, int interval) {
    super(agentName, file);
    this.eventBus = eventBus;
    this.file = file;
    this.interval = interval;
    counterGroup = new CounterGroup();
    lifecycleState = LifecycleState.IDLE;
  }

  @Override
  public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

      /*新啟動(dòng)一個(gè)線(xiàn)程,監(jiān)控到有文件變動(dòng)就將getConfiguration加到eventBus中,eventBus有事件更新會(huì)調(diào)用Application類(lèi)中
    * 用@Subscribe修飾的函數(shù),也就是 public void handleConfigurationEvent(MaterializedConfiguration conf)
    *  eventBus.post(getConfiguration())將conf對(duì)象通過(guò)總線(xiàn)傳給了handleConfigurationEvent去處理*/
    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

FileWatcherRunnable任務(wù)用于監(jiān)控配置文件的變化,

 long lastModified = file.lastModified();
      if (lastModified > lastChange) {
//省略若干行

如果配置文件發(fā)生變化,則調(diào)用eventBus.post(getConfiguration())語(yǔ)句將事件發(fā)送到eventBus主線(xiàn),eventBus負(fù)責(zé)調(diào)用觀察者(Application)調(diào)用事件處理函數(shù)(handleConfigurationEvent(MaterializedConfiguration conf))處理事件。


 public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      super();
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;
    }

    @Override
    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }
  }

到這里整個(gè)flume的啟動(dòng)過(guò)程就講完了,有人會(huì)問(wèn),這里只啟動(dòng)了PollingPropertiesFileConfigurationProvider,并沒(méi)有啟動(dòng)flume的channel、source和sink。其實(shí)在第一次啟動(dòng)的時(shí)候,lastModifiedlastChange這兩個(gè)值是不相等的,

//省略若干行
  this.lastChange = 0L;
//省略若干行
  if (lastModified > lastChange) {
//省略若干行

就會(huì)觸發(fā)eventBus,調(diào)用handleConfigurationEvent函數(shù),handleConfigurationEvent函數(shù)中有語(yǔ)句startAllComponents(conf),里面有對(duì)channel、source和sink的啟動(dòng)語(yǔ)句,具體在下一篇文章里介紹。

2.5回顧

下面我們總結(jié)一下整個(gè)flume的調(diào)用順序。
Application->LifecycleSupervisor-(3s調(diào)度一次)>MonitorRunnable->PollingPropertiesFileConfigurationProvider-(30s調(diào)度一次)>FileWatcherRunnable->EventBus->Application
期間我們看到一個(gè)調(diào)度器調(diào)度了另一個(gè)調(diào)度器,而且間隔幾秒一次,為什么沒(méi)有出現(xiàn)多個(gè)重復(fù)任務(wù)實(shí)例被調(diào)度起來(lái)?

 supervisor.supervise(component,
            new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);

我們看到在LifecycleSupervisor執(zhí)行調(diào)度的時(shí)候傳入了一個(gè)LifecycleState.START值,這個(gè)值便是下面代碼(MonitorRunnable的run函數(shù))中的desiredState

//省略若干行
  if (!lifecycleAware.getLifecycleState().equals(
              supervisoree.status.desiredState)) {

            logger.debug("Want to transition {} from {} to {} (failures:{})",
                new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                    supervisoree.status.desiredState,
                    supervisoree.status.failures });

            switch (supervisoree.status.desiredState) {
              case START:
                try {
                  lifecycleAware.start();
//省略若干行

實(shí)現(xiàn)lifecycleAware接口的PollingPropertiesFileConfigurationProvider類(lèi)在首次調(diào)用start()函數(shù)的時(shí)候,就已經(jīng)將lifecycleState的值變?yōu)镾TART:lifecycleState = LifecycleState.START;
所以調(diào)度器在之后的調(diào)度過(guò)程中,由于if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))if條件不成立,便不會(huì)有新的任務(wù)被調(diào)度起來(lái)。PollingPropertiesFileConfigurationProvider任務(wù)只有一個(gè)線(xiàn)程實(shí)例,又由于調(diào)度FileWatcherRunnable的是一個(gè)單線(xiàn)程調(diào)度器,F(xiàn)ileWatcherRunnable任務(wù)也只有一個(gè)線(xiàn)程實(shí)例。同理,各個(gè)channel、source和sink也都沒(méi)有重復(fù)實(shí)例被調(diào)度起來(lái)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容