前言
Flink配合Hadoop使用的時(shí)候獲取配置文件的方式非常之多,官網(wǎng)沒有統(tǒng)一的總結(jié)。本篇將這些獲取配置的方法梳理總結(jié)是為了:
- 掌握多種指定Flink Hadoop配置的方式。
- 對于一個(gè)較亂的環(huán)境,F(xiàn)link無法正確讀取Hadoop配置的時(shí)候,提供一個(gè)排查問題思路。
獲取Flink conf目錄
Flink獲取conf目錄(Flink配置)的順序:
- 查找
FLINK_CONF_DIR環(huán)境變量。 - 查找
../conf目錄。 - 查找
conf目錄。
代碼位于CliFrontend的getConfigurationDirectoryFromEnv方法:
public static String getConfigurationDirectoryFromEnv() {
// 從FLINK_CONF_DIR環(huán)境變量獲取conf路徑
String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (location != null) {
if (new File(location).exists()) {
return location;
} else {
throw new RuntimeException(
"The configuration directory '"
+ location
+ "', specified in the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable, does not exist.");
}
} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
// 嘗試查找../conf目錄是否存在
location = CONFIG_DIRECTORY_FALLBACK_1;
} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
// 嘗試查找conf目錄是否存在
location = CONFIG_DIRECTORY_FALLBACK_2;
} else {
throw new RuntimeException(
"The configuration directory was not specified. "
+ "Please specify the directory containing the configuration file through the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable.");
}
return location;
}
獲取log4j配置文件
YarnLogConfigUtil的discoverLogConfigFile方法從flink配置文件目錄中查找log4j.properties和logback.xml。如果兩者都存在,優(yōu)先使用log4j.properties。
private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
Optional<File> logConfigFile = Optional.empty();
// 從Flink配置文件目錄中查找log4j.properties文件
final File log4jFile =
new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
if (log4jFile.exists()) {
logConfigFile = Optional.of(log4jFile);
}
// 從Flink配置文件目錄中查找logback.xml文件
final File logbackFile =
new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
if (logbackFile.exists()) {
if (logConfigFile.isPresent()) {
// 如果兩個(gè)配置文件都存在,打印告警
LOG.warn(
"The configuration directory ('"
+ configurationDirectory
+ "') already contains a LOG4J config file."
+ "If you want to use logback, then please delete or rename the log configuration file.");
} else {
logConfigFile = Optional.of(logbackFile);
}
}
return logConfigFile;
}
Kerberos相關(guān)配置
KerberosUtils的static方法中??梢允褂?code>KRB5CCNAME環(huán)境變量,指定Flink Kerberos認(rèn)證使用的ticket cache路徑。
String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
if (IBM_JAVA) {
System.setProperty("KRB5CCNAME", ticketCache);
} else {
kerberosCacheOptions.put("ticketCache", ticketCache);
}
}
KerberosLoginProvider的doLoginAndReturnUGI使用了KRB5PRINCIPAL環(huán)境變量指定principal。代碼如下所示:
public UserGroupInformation doLoginAndReturnUGI() throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (principal != null) {
LOG.info(
"Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
UserGroupInformation ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
LOG.info("Successfully logged into KDC");
return ugi;
} else if (!HadoopUserUtils.isProxyUser(currentUser)) {
LOG.info("Attempting to load user's ticket cache");
final String ccache = System.getenv("KRB5CCNAME");
final String user =
Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
.orElse(currentUser.getUserName());
UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
LOG.info("Loaded user's ticket cache successfully");
return ugi;
} else {
throwProxyUserNotSupported();
return currentUser;
}
}
Hadoop相關(guān)配置
讀取Hadoop配置文件的流程位于HadoopUtils的getHadoopConfiguration方法。流程如下:
- 從class path中讀取hdfs-default.xml和hdfs-site.xml文件。
- 從
$HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop中讀取。 - 從Flink配置文件中
fs.hdfs.hdfsdefault,fs.hdfs.hdfssite和fs.hdfs.hadoopconf配置讀取。此方法已廢棄不建議使用。 - 從
HADOOP_CONF_DIR讀取。 - 從Flink配置文件讀取
flink.hadoop.開頭的配置作為Hadoop的配置項(xiàng)。
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
// 從classpath中讀取hdfs-default.xml和hdfs-site.xml文件
Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];
// 從HADOOP_HOME環(huán)境變量獲取
// $HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
// 從Flink配置文件中fs.hdfs.hdfsdefault配置項(xiàng)獲取
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
"Using hdfs-default configuration-file path from Flink config: {}",
hdfsDefaultPath);
foundHadoopConfiguration = true;
}
// 從Flink配置文件中fs.hdfs.hdfssite配置項(xiàng)獲取
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
// 從Flink配置文件中fs.hdfs.hadoopconf配置項(xiàng)獲取
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
// 從HADOOP_CONF_DIR環(huán)境變量獲取
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// 讀取Flink配置文件中flink.hadoop.開頭的配置項(xiàng),去掉該前綴的內(nèi)容作為key放入configuration
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
// 如果以上途徑均未發(fā)現(xiàn)Hadoop配置,打印警告
if (!foundHadoopConfiguration) {
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
Hadoop class path配置
構(gòu)建Flink Hadoop classpath的相關(guān)代碼位于config.sh:
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
可使用export HADOOP_CLASSPATH=xxx方式為Flink指定Hadoop的class path。
通過這行代碼還可以得知Flink的class path包含HADOOP_CONF_DIR和YARN_CONF_DIR,分別對應(yīng)Hadoop和Yarn配置文件目錄??梢酝ㄟ^HADOOP_CONF_DIR和YARN_CONF_DIR分別指定Hadoop和Yarn的配置文件路徑。
Yarn相關(guān)配置
YarnClusterDescriptor::isReadyForDeployment方法中檢測HADOOP_CONF_DIR和YARN_CONF_DIR。如果HADOOP_CONF_DIR或者YARN_CONF_DIR都沒有配置,打印警告。
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
LOG.warn(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ "configuration for accessing YARN.");
}
一般來說hadoop classpath命令的返回結(jié)果中包含了Hadoop的conf目錄,所以沒有指定HADOOP_CONF_DIR和YARN_CONF_DIR也不影響Flink作業(yè)訪問HDFS和提交Yarn集群。
Yarn配置文件的讀取邏輯為:
- 從class path中讀取
yarn-site.xml和yarn-default.xml。因?yàn)?code>HADOOP_CLASSPATH,HADOOP_CONF_DIR和YARN_CONF_DIR環(huán)境變量都可以修改Flink Hadoop的class path,這三個(gè)環(huán)境變量的配置均會影響此步驟。 - 通過Flink配置文件中
flink.yarn開頭的配置指定Yarn配置項(xiàng)。
下面是源代碼分析。
提交Yarn作業(yè)之前通過Utils::getYarnAndHadoopConfiguration方法讀取Hadoop和Yarn的配置文件。
public static YarnConfiguration getYarnAndHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
// 獲取Hadoop配置,前面已分析過
yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
return yarnConfig;
}
Utils的getYarnConfiguration方法從Flink配置文件中讀取flink.yarn開頭的配置,去掉前綴之后作為key放入configuration。
public static YarnConfiguration getYarnConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
// 從class path中獲取yarn-default.xml和yarn-site.xml文件
final YarnConfiguration yarnConfig = new YarnConfiguration();
for (String key : flinkConfig.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring("flink.".length());
String value = flinkConfig.getString(key, null);
yarnConfig.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Yarn config",
key,
newKey,
value);
}
}
}
return yarnConfig;
}
附錄:日志不打印問題排查
檢查的順序?yàn)椋?/p>
檢查$FLINK_HOME/conf/log4j*.properties配置文件是否正確。
檢查$FLINK_HOME/lib/中的日志相關(guān)jar包是否存在或是否沖突。
檢查打包進(jìn)作業(yè)內(nèi)的日志相關(guān)jar包,需要都排除使用Flink框架自帶的日志依賴庫。