Flink 使用介紹相關(guān)文檔目錄
前言
本文為大家分享博主在雙網(wǎng)分離的環(huán)境下使用Flink遇到的問題和解決方案。
所謂雙網(wǎng)分離環(huán)境指的是Flink集群運(yùn)行環(huán)境存在兩個(gè)網(wǎng)段,分別為管理網(wǎng)和業(yè)務(wù)網(wǎng)。這兩個(gè)網(wǎng)段互不相通。管理網(wǎng)使用了千兆交換機(jī),僅用于管理員操作集群和監(jiān)控集群狀態(tài)。業(yè)務(wù)網(wǎng)使用光纖和萬兆交換機(jī),F(xiàn)link TaskManager之間交換數(shù)據(jù)全部走業(yè)務(wù)網(wǎng),實(shí)現(xiàn)帶寬的最大化利用。
環(huán)境信息
軟件版本
- Flink 1.15.4
- Hadoop 3.1.1
網(wǎng)絡(luò)配置
集群內(nèi)配置有兩個(gè)網(wǎng)絡(luò),分別為:
管理網(wǎng):10.x.x.x。千兆網(wǎng)。
業(yè)務(wù)網(wǎng):192.168.x.x。萬兆網(wǎng)。
集群內(nèi)相互訪問使用hostname,hostname配置的均為業(yè)務(wù)網(wǎng)。
管理網(wǎng)和業(yè)務(wù)網(wǎng)不互通。
問題1:提交任務(wù)成功后出錯(cuò),無法連接到JobManager
另有一個(gè)集群外客戶端節(jié)點(diǎn),其所在網(wǎng)段和管理網(wǎng)相同。在該客戶端節(jié)點(diǎn)提交Flink任務(wù)到集群,出現(xiàn)無法連接到JobManager,報(bào)Connection Refused錯(cuò)誤。錯(cuò)誤中的IP為集群內(nèi)萬兆網(wǎng)的IP。但是作業(yè)能夠提交成功,只是無法獲取到作業(yè)的運(yùn)行狀態(tài)和結(jié)果。
通過這個(gè)錯(cuò)誤不難得知問題是Flink JobManager的bind host配置存在問題。通過查看Yarn界面,找到Flink作業(yè)JobManager所在節(jié)點(diǎn)。使用netstat查看JobManager進(jìn)程對(duì)應(yīng)的端口和綁定IP。我們發(fā)現(xiàn)只有一個(gè)端口綁定的是192.168.x.x,其余都是0.0.0.0。這個(gè)端口對(duì)應(yīng)的是Flink什么端口呢?查看Yarn中該Flink Application的啟動(dòng)日志可以發(fā)現(xiàn)這個(gè)端口是rest端口。
查閱官方文檔,可以使用rest.bind-address方式綁定地址。于是編輯Flink配置文件flink-conf.yaml,增加:
rest.bind-address: 0.0.0.0
保存之后重新提交任務(wù),問題依舊。綁定的地址并沒有發(fā)生變化。查看Flink Web UI中JobManager的Configuration,發(fā)現(xiàn)rest.bind-address和rest.address值相同,都為集群內(nèi)JobManager運(yùn)行所在機(jī)器的hostname。rest.bind-address配置項(xiàng)不生效。
經(jīng)過詳細(xì)問題調(diào)查,發(fā)現(xiàn)是Flink 1.15.x版本之后bug。無論如何配置rest.bind-address,都會(huì)將該屬性的值覆蓋為運(yùn)行時(shí)所在Yarn節(jié)點(diǎn)的地址。參見如下代碼,位于YarnEntrypointUtils.java的loadConfiguration方法。
// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
configuration.set(RestOptions.BIND_ADDRESS, hostname);
// ...
該特性對(duì)應(yīng)的提交為[FLINK-24474] Default rest.bind-address to localhost in flink-conf.yaml。Commit ID:6222532db0f0a1e75811fc215cd66bc26fb74afb
解決方法為修改上述片段的代碼為:
// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
if (!configuration.contains(RestOptions.BIND_ADDRESS)) {
configuration.set(RestOptions.BIND_ADDRESS, hostname);
}
// ...
按照修改之后的邏輯,只有配置文件中沒有配置rest.bind-address,才會(huì)使用Yarn節(jié)點(diǎn)的地址。如果配置了,則以實(shí)際配置的值為準(zhǔn)。
修改后重新編譯,替換掉Flink客戶端中flink-dist-xxx.jar文件。重新提交作業(yè)可以解決。
博主已通過patch FLINK-35332解決了這個(gè)問題。
問題2:集群外客戶端flink提交時(shí)host無法解析為IP
問題現(xiàn)象:使用flink run方式在集群外提交可以ResourceManager可以解析host為IP,但是yarn-session模式啟動(dòng)的時(shí)候無法解析host。
經(jīng)調(diào)查發(fā)現(xiàn),控制解析的配置項(xiàng)為hadoop.security.token.service.use_ip。
配置集群外客戶端的hdfs-site.xml(這里有伏筆),修改hadoop.security.token.service.use_ip為true:
<property>
<name>hadoop.security.token.service.use_ip</name>
<value>true</value>
</property>
集群中Hadoop的配置項(xiàng)保持不變,為false。然后嘗試啟動(dòng)yarn-session,問題解決。
查找了Hadoop官網(wǎng),發(fā)現(xiàn)hadoop.security.token.service.use_ip是屬于core-site.xml的配置。集群中和集群外客戶端將其錯(cuò)誤的寫到hdfs-site.xml中。
這里先說結(jié)論,這個(gè)詭異的問題原因在于該配置不能位于hdfs-site.xml中,必須在core-site.xml中。如果該屬性錯(cuò)誤配置到了hdfs-site.xml,會(huì)導(dǎo)致flink run -m yarn-cluster方式提交可以解析host但是yarn-session模式啟動(dòng)yarn-session的時(shí)候無法解析host。至于為什么會(huì)這樣,我們接下來分析。
默認(rèn)的日志級(jí)別不夠詳細(xì),看不出來什么內(nèi)容。將日志開啟DEBUG級(jí)別之后提交任務(wù)。查看日志發(fā)現(xiàn)flink run -m yarn-cluster方式的hadoop.security.token.service.use_ip的值為true,啟動(dòng)yarn-session的時(shí)候hadoop.security.token.service.use_ip的值為false。懷疑yarn-session模式提交的時(shí)候不加載hdfs-site.xml。
仔細(xì)觀察flink yarn-session的啟動(dòng)日志,發(fā)現(xiàn)開頭有這一行:
... org.apache.flink.runtime.util.HadoopUtils [] - Searching Hadoop configuration files in HADOOP_HOME
Flink run命令啟動(dòng)的時(shí)候沒有這一行。懷疑是這里加載了HDFS配置文件。
想要揭開謎底,我們接下來分析Flink yarn-session的啟動(dòng)代碼。
Flink Yarn Session啟動(dòng)代碼
查看yarn-session.sh的最后一行啟動(dòng)Java進(jìn)程代碼,可得知應(yīng)用的入口為FlinkYarnSessionCli類。
"$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
跟蹤main方法,然后找到FlinkYarnSessionCli::run方法,我們找到這一行代碼:
// ...
final YarnClusterDescriptor yarnClusterDescriptor =
(YarnClusterDescriptor)
yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
// ...
繼續(xù)跟蹤,查看YarnClusterClientFactory::createClusterDescriptor的內(nèi)容:
@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
return getClusterDescriptor(configuration);
}
繼續(xù)分析YarnClusterClientFactory::getClusterDescriptor。
private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
final YarnClient yarnClient = YarnClient.createYarnClient();
// 獲取Yarn和Hadoop配置
final YarnConfiguration yarnConfiguration =
Utils.getYarnAndHadoopConfiguration(configuration);
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}
這里有一個(gè)關(guān)鍵調(diào)用Utils.getYarnAndHadoopConfiguration(configuration)。作用為獲取Yarn和Hadoop配置。
public static YarnConfiguration getYarnAndHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
return yarnConfig;
}
其中getYarnConfiguration調(diào)用中執(zhí)行了new YarnConfiguration該調(diào)用從classpath中加載yarn-default.xml、yarn-site.xml和resource-types.xml文件。
同樣,HadoopUtils.getHadoopConfiguration(flinkConfig)調(diào)用的時(shí)候執(zhí)行了new HdfsConfiguration,加載了hdfs-site.xml、hdfs-default.xml和core-site.xml、core-default.xml配置。
到這里可知,F(xiàn)link yarn-session啟動(dòng)的時(shí)候的確加載了hdfs-site.xml配置文件。
Flink run命令的入口類為CliFrontEnd,并沒有使用FlinkYarnSessionCli。因此我們從Hadoop源代碼中打印hadoop.security.token.service.use_ip值這一行DEBUG日志的為止展開分析。
這一行日志位于Hadoop源代碼中SecurityUtil類的setTokenServiceUseIp方法中。我們將相關(guān)的方法貼出來,如下所示:
static {
setConfigurationInternal(new Configuration());
}
private static void setConfigurationInternal(Configuration conf) {
boolean useIp = conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
setTokenServiceUseIp(useIp);
// ...
}
@InterfaceAudience.Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting "
+ CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP
+ " to " + flag);
}
useIpForTokenService = flag;
hostResolver = !useIpForTokenService
? new QualifiedHostResolver()
: new StandardHostResolver();
}
不難發(fā)現(xiàn),在SecurityUtil類加載的時(shí)候執(zhí)行了setConfigrationInternal方法,然后間接調(diào)用了setTokenServiceUseIp方法。
setConfigrationInternal傳入的參數(shù)是new Configuration。我們查看它的static代碼塊:
static {
deprecationContext = new AtomicReference(new DeprecationContext((DeprecationContext)null, defaultDeprecations));
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if (cL.getResource("hadoop-site.xml") != null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively");
addDefaultResource("hadoop-site.xml");
}
}
發(fā)現(xiàn)它只從classpath中加載core-default.xml和core-site.xml文件,不會(huì)去加載hdfs-site.xml。
到這里得出結(jié)論:Flink yarn-session提交的時(shí)候加載了hdfs-site.xml文件,但是flink run -m yarn-cluster的時(shí)候沒有加載。分析完畢。
這次問題分析得出的啟示是對(duì)于Hadoop相關(guān)配置,務(wù)必將配置項(xiàng)寫到正確的配置文件中,否則會(huì)出現(xiàn)奇怪的問題,較難定位原因。
Hadoop解析host相關(guān)代碼
這里我們返回思考前面的問題:僅僅將hadoop.security.token.service.use_ip修改true的確可行。但為什么設(shè)置它為false的時(shí)候就無法解析hostname為IP了呢?
為了搞清楚這個(gè)問題,接下來需要分析hadoop.security.token.service.use_ip究竟做了些什么。
跟蹤YarnClient::start調(diào)用找到YarnImpl::serviceStart
protected void serviceStart() throws Exception {
try {
this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
if (this.historyServiceEnabled) {
this.historyClient.start();
}
} catch (IOException var2) {
IOException e = var2;
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
繼續(xù)跟蹤RMProxy::createRMProxy:
private static <T> T createRMProxy(YarnConfiguration conf, Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException {
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
return RetryProxy.create(protocol, provider, retryPolicy);
} else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = getProxy(conf, protocol, rmAddress);
return RetryProxy.create(protocol, proxy, retryPolicy);
}
}
找到獲取InetSocketAddress的方法ClientRMProxy::getRMAddress:
@Private
protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
return conf.getSocketAddr("yarn.resourcemanager.admin.address", "0.0.0.0:8033", 8033);
} else if (protocol == ApplicationMasterProtocol.class) {
setAMRMTokenService(conf);
return conf.getSocketAddr("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030", 8030);
} else {
String message = "Unsupported protocol found when creating the proxy connection to ResourceManager: " + (protocol != null ? protocol.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
}
繼續(xù)展開YarnConfiguration.getSocketAddr:
public InetSocketAddress getSocketAddr(String name, String defaultAddress, int defaultPort) {
String address;
if (HAUtil.isHAEnabled(this) && getServiceAddressConfKeys(this).contains(name)) {
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
} else {
address = this.get(name, defaultAddress);
}
return NetUtils.createSocketAddr(address, defaultPort, name);
}
跟蹤NetUtils::createSocketAddr,一路到NetUtils::createSocketAddrForHost:
public static InetSocketAddress createSocketAddrForHost(String host, int port) {
String staticHost = getStaticResolution(host);
String resolveHost = staticHost != null ? staticHost : host;
InetSocketAddress addr;
try {
InetAddress iaddr = SecurityUtil.getByName(resolveHost);
if (staticHost != null) {
iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
}
addr = new InetSocketAddress(iaddr, port);
} catch (UnknownHostException var6) {
addr = InetSocketAddress.createUnresolved(host, port);
}
return addr;
}
最終跟蹤到SecurityUtil::getByName方法:
@Private
public static InetAddress getByName(String hostname) throws UnknownHostException {
return hostResolver.getByName(hostname);
}
它使用hostResolver的getByName方法解析host。
hostResolver是HostResolver類型,具有兩個(gè)實(shí)現(xiàn)類,為StandardHostResolver和QulifiedHostResolver兩種。
至于hostResolver具體是哪個(gè)類,答案就在它初始化賦值的地方。我們找到setTokenServiceUseIp方法。
@Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
useIpForTokenService = flag;
hostResolver = (HostResolver)(!useIpForTokenService ? new QualifiedHostResolver() : new StandardHostResolver());
}
到這里我們發(fā)現(xiàn)hadoop.security.token.service.use_ip為true對(duì)應(yīng)的是StandardHostResolver,false對(duì)應(yīng)的是QualifiedHostResolver。
StandardHostResolver解析host的方式較為簡(jiǎn)單,直接使用JDK的InetAddress.getByName完成解析。不再過多分析。QualifiedHostResolver解析較為復(fù)雜。前面的問題是hadoop.security.token.service.use_ip配置為false的時(shí)候無法解析host,那接下來的重點(diǎn)則是搞清楚QualifiedHostResolver如何解析host。
我們查看QualifiedHostResolver的getByName方法:
@Override
public InetAddress getByName(String host) throws UnknownHostException {
InetAddress addr = null;
byte[] ip;
if (IPAddressUtil.isIPv4LiteralAddress(host)) {
ip = IPAddressUtil.textToNumericFormatV4(host);
addr = InetAddress.getByAddress(host, ip);
} else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
ip = IPAddressUtil.textToNumericFormatV6(host);
addr = InetAddress.getByAddress(host, ip);
} else if (host.endsWith(".")) {
addr = this.getByExactName(host);
} else if (host.contains(".")) {
addr = this.getByExactName(host);
if (addr == null) {
addr = this.getByNameWithSearch(host);
}
} else {
InetAddress loopback = InetAddress.getByName((String)null);
if (host.equalsIgnoreCase(loopback.getHostName())) {
addr = InetAddress.getByAddress(host, loopback.getAddress());
} else {
addr = this.getByNameWithSearch(host);
if (addr == null) {
addr = this.getByExactName(host);
}
}
}
if (addr == null) {
throw new UnknownHostException(host);
} else {
return addr;
}
}
該方法的邏輯:
- 如果host字符串是合法的IPv4和IPv6字面值,如果是,將且構(gòu)造為InetAddress返回,不用解析hostname。
- 如果host字符串以
.結(jié)尾,調(diào)用getByExactName解析。 - 如果host字符串包含
.,先使用getByExactName解析。如果無法解析,使用searchDomain逐個(gè)嘗試解析。 - 其余情況,先嘗試按照loopback解析。如果無法解析,按照前面即第三種情況解析。
接下來分析按照host解析IP的方法getByExactName。
InetAddress getByExactName(String host) {
InetAddress addr = null;
// 這里將host視為FQDN
String fqHost = host;
// 如果FQDN結(jié)尾不是.,在結(jié)尾增加一個(gè)點(diǎn)
if (!fqHost.endsWith(".")) {
fqHost = fqHost + ".";
}
try {
// 調(diào)用InetAddress.getByName解析FQDN
addr = this.getInetAddressByName(fqHost);
// 構(gòu)建InetAddress
addr = InetAddress.getByAddress(host, addr.getAddress());
} catch (UnknownHostException var5) {
}
return addr;
}
到這里問題已經(jīng)比較清晰了。集群外客戶端節(jié)點(diǎn)沒有配置DNS,檢查該節(jié)點(diǎn)配置的/etc/hosts,發(fā)現(xiàn)指向集群的hostname都不是.結(jié)尾的。前面方法解析host的時(shí)候在host結(jié)尾拼接了一個(gè).再解析。我們按照這個(gè)邏輯,嘗試將host后面加一個(gè).,然后ping一下這個(gè)host,發(fā)現(xiàn)域名解析錯(cuò)誤,問題鎖定。
PS: 以"."結(jié)尾的FQDN稱之為rooted/absolute FQDN。
getByExactName將FQDN轉(zhuǎn)化為absolute FQDN,然后再嘗試解析。
修改/etc/hosts文件,將這些域名結(jié)尾都加上.,然后保持hadoop.security.token.service.use_ip為false不變,重新提交任務(wù),任務(wù)運(yùn)行一切正常。
這里建議使用DNS方式解析域名。
最后我們?cè)陬~外看下getByNameWithSearch方法做了些什么。
InetAddress getByNameWithSearch(String host) {
InetAddress addr = null;
if (host.endsWith(".")) {
addr = this.getByExactName(host);
} else {
Iterator i$ = this.searchDomains.iterator();
while(i$.hasNext()) {
String domain = (String)i$.next();
String dot = !domain.startsWith(".") ? "." : "";
addr = this.getByExactName(host + dot + domain);
if (addr != null) {
break;
}
}
}
return addr;
}
該方法遍歷每個(gè)search domain(Linux/etc/resolv.conf中的search配置),分別在原始host后面拼接上search domain,逐個(gè)嘗試解析。一旦解析成功立刻返回。
本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請(qǐng)注明出處。