Flink 獲取配置的途徑

前言

Flink配合Hadoop使用的時(shí)候獲取配置文件的方式非常之多,官網(wǎng)沒有統(tǒng)一的總結(jié)。本篇將這些獲取配置的方法梳理總結(jié)是為了:

  1. 掌握多種指定Flink Hadoop配置的方式。
  2. 對于一個(gè)較亂的環(huán)境,F(xiàn)link無法正確讀取Hadoop配置的時(shí)候,提供一個(gè)排查問題思路。

獲取Flink conf目錄

Flink獲取conf目錄(Flink配置)的順序:

  1. 查找FLINK_CONF_DIR環(huán)境變量。
  2. 查找../conf目錄。
  3. 查找conf目錄。

代碼位于CliFrontendgetConfigurationDirectoryFromEnv方法:

public static String getConfigurationDirectoryFromEnv() {
    // 從FLINK_CONF_DIR環(huán)境變量獲取conf路徑
    String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

    if (location != null) {
        if (new File(location).exists()) {
            return location;
        } else {
            throw new RuntimeException(
                    "The configuration directory '"
                            + location
                            + "', specified in the '"
                            + ConfigConstants.ENV_FLINK_CONF_DIR
                            + "' environment variable, does not exist.");
        }
    } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
        // 嘗試查找../conf目錄是否存在
        location = CONFIG_DIRECTORY_FALLBACK_1;
    } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
        // 嘗試查找conf目錄是否存在
        location = CONFIG_DIRECTORY_FALLBACK_2;
    } else {
        throw new RuntimeException(
                "The configuration directory was not specified. "
                        + "Please specify the directory containing the configuration file through the '"
                        + ConfigConstants.ENV_FLINK_CONF_DIR
                        + "' environment variable.");
    }
    return location;
}

獲取log4j配置文件

YarnLogConfigUtildiscoverLogConfigFile方法從flink配置文件目錄中查找log4j.propertieslogback.xml。如果兩者都存在,優(yōu)先使用log4j.properties。

private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
    Optional<File> logConfigFile = Optional.empty();

    // 從Flink配置文件目錄中查找log4j.properties文件
    final File log4jFile =
            new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
    if (log4jFile.exists()) {
        logConfigFile = Optional.of(log4jFile);
    }

    // 從Flink配置文件目錄中查找logback.xml文件
    final File logbackFile =
            new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
    if (logbackFile.exists()) {
        if (logConfigFile.isPresent()) {
            // 如果兩個(gè)配置文件都存在,打印告警
            LOG.warn(
                    "The configuration directory ('"
                            + configurationDirectory
                            + "') already contains a LOG4J config file."
                            + "If you want to use logback, then please delete or rename the log configuration file.");
        } else {
            logConfigFile = Optional.of(logbackFile);
        }
    }
    return logConfigFile;
}

Kerberos相關(guān)配置

KerberosUtilsstatic方法中??梢允褂?code>KRB5CCNAME環(huán)境變量,指定Flink Kerberos認(rèn)證使用的ticket cache路徑。

String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
    if (IBM_JAVA) {
        System.setProperty("KRB5CCNAME", ticketCache);
    } else {
        kerberosCacheOptions.put("ticketCache", ticketCache);
    }
}

KerberosLoginProviderdoLoginAndReturnUGI使用了KRB5PRINCIPAL環(huán)境變量指定principal。代碼如下所示:

public UserGroupInformation doLoginAndReturnUGI() throws IOException {
    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();

    if (principal != null) {
        LOG.info(
                "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
        UserGroupInformation ugi =
                UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
        LOG.info("Successfully logged into KDC");
        return ugi;
    } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
        LOG.info("Attempting to load user's ticket cache");
        final String ccache = System.getenv("KRB5CCNAME");
        final String user =
                Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
                        .orElse(currentUser.getUserName());
        UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
        LOG.info("Loaded user's ticket cache successfully");
        return ugi;
    } else {
        throwProxyUserNotSupported();
        return currentUser;
    }
}

Hadoop相關(guān)配置

讀取Hadoop配置文件的流程位于HadoopUtilsgetHadoopConfiguration方法。流程如下:

  1. 從class path中讀取hdfs-default.xml和hdfs-site.xml文件。
  2. $HADOOP_HOME/conf$HADOOP_HOME/etc/hadoop中讀取。
  3. 從Flink配置文件中fs.hdfs.hdfsdefaultfs.hdfs.hdfssitefs.hdfs.hadoopconf配置讀取。此方法已廢棄不建議使用。
  4. HADOOP_CONF_DIR讀取。
  5. 從Flink配置文件讀取flink.hadoop.開頭的配置作為Hadoop的配置項(xiàng)。
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfiguration) {

    // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
    // from the classpath
    // 從classpath中讀取hdfs-default.xml和hdfs-site.xml文件
    Configuration result = new HdfsConfiguration();
    boolean foundHadoopConfiguration = false;

    // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
    // the hdfs configuration.
    // The properties of a newly added resource will override the ones in previous resources, so
    // a configuration
    // file with higher priority should be added later.

    // Approach 1: HADOOP_HOME environment variables
    String[] possibleHadoopConfPaths = new String[2];

    // 從HADOOP_HOME環(huán)境變量獲取
    // $HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    }

    for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
        if (possibleHadoopConfPath != null) {
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        }
    }

    // Approach 2: Flink configuration (deprecated)
    // 從Flink配置文件中fs.hdfs.hdfsdefault配置項(xiàng)獲取
    final String hdfsDefaultPath =
            flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
                "Using hdfs-default configuration-file path from Flink config: {}",
                hdfsDefaultPath);
        foundHadoopConfiguration = true;
    }

    // 從Flink配置文件中fs.hdfs.hdfssite配置項(xiàng)獲取
    final String hdfsSitePath =
            flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
                "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
        foundHadoopConfiguration = true;
    }

    // 從Flink配置文件中fs.hdfs.hadoopconf配置項(xiàng)獲取
    final String hadoopConfigPath =
            flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) {
        LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    }

    // Approach 3: HADOOP_CONF_DIR environment variable
    // 從HADOOP_CONF_DIR環(huán)境變量獲取
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    }

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    // 讀取Flink配置文件中flink.hadoop.開頭的配置項(xiàng),去掉該前綴的內(nèi)容作為key放入configuration
    for (String key : flinkConfiguration.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Hadoop config",
                        key,
                        newKey,
                        value);
                foundHadoopConfiguration = true;
            }
        }
    }

    // 如果以上途徑均未發(fā)現(xiàn)Hadoop配置,打印警告
    if (!foundHadoopConfiguration) {
        LOG.warn(
                "Could not find Hadoop configuration via any of the supported methods "
                        + "(Flink configuration, environment variables).");
    }

    return result;
}

Hadoop class path配置

構(gòu)建Flink Hadoop classpath的相關(guān)代碼位于config.sh

INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

可使用export HADOOP_CLASSPATH=xxx方式為Flink指定Hadoop的class path。

通過這行代碼還可以得知Flink的class path包含HADOOP_CONF_DIRYARN_CONF_DIR,分別對應(yīng)Hadoop和Yarn配置文件目錄??梢酝ㄟ^HADOOP_CONF_DIRYARN_CONF_DIR分別指定Hadoop和Yarn的配置文件路徑。

Yarn相關(guān)配置

YarnClusterDescriptor::isReadyForDeployment方法中檢測HADOOP_CONF_DIRYARN_CONF_DIR。如果HADOOP_CONF_DIR或者YARN_CONF_DIR都沒有配置,打印警告。

// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
    LOG.warn(
            "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
                    + "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
                    + "configuration for accessing YARN.");
}

一般來說hadoop classpath命令的返回結(jié)果中包含了Hadoop的conf目錄,所以沒有指定HADOOP_CONF_DIRYARN_CONF_DIR也不影響Flink作業(yè)訪問HDFS和提交Yarn集群。

Yarn配置文件的讀取邏輯為:

  1. 從class path中讀取yarn-site.xmlyarn-default.xml。因?yàn)?code>HADOOP_CLASSPATH,HADOOP_CONF_DIRYARN_CONF_DIR環(huán)境變量都可以修改Flink Hadoop的class path,這三個(gè)環(huán)境變量的配置均會影響此步驟。
  2. 通過Flink配置文件中flink.yarn開頭的配置指定Yarn配置項(xiàng)。

下面是源代碼分析。

提交Yarn作業(yè)之前通過Utils::getYarnAndHadoopConfiguration方法讀取Hadoop和Yarn的配置文件。

public static YarnConfiguration getYarnAndHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
    // 獲取Hadoop配置,前面已分析過
    yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));

    return yarnConfig;
}

UtilsgetYarnConfiguration方法從Flink配置文件中讀取flink.yarn開頭的配置,去掉前綴之后作為key放入configuration。

public static YarnConfiguration getYarnConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    // 從class path中獲取yarn-default.xml和yarn-site.xml文件
    final YarnConfiguration yarnConfig = new YarnConfiguration();

    for (String key : flinkConfig.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring("flink.".length());
                String value = flinkConfig.getString(key, null);
                yarnConfig.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Yarn config",
                        key,
                        newKey,
                        value);
            }
        }
    }

    return yarnConfig;
}

附錄:日志不打印問題排查

檢查的順序?yàn)椋?/p>

檢查$FLINK_HOME/conf/log4j*.properties配置文件是否正確。

檢查$FLINK_HOME/lib/中的日志相關(guān)jar包是否存在或是否沖突。

檢查打包進(jìn)作業(yè)內(nèi)的日志相關(guān)jar包,需要都排除使用Flink框架自帶的日志依賴庫。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容