Flink 雙網(wǎng)分離環(huán)境使用問題解決記錄

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

前言

本文為大家分享博主在雙網(wǎng)分離的環(huán)境下使用Flink遇到的問題和解決方案。

所謂雙網(wǎng)分離環(huán)境指的是Flink集群運(yùn)行環(huán)境存在兩個(gè)網(wǎng)段,分別為管理網(wǎng)和業(yè)務(wù)網(wǎng)。這兩個(gè)網(wǎng)段互不相通。管理網(wǎng)使用了千兆交換機(jī),僅用于管理員操作集群和監(jiān)控集群狀態(tài)。業(yè)務(wù)網(wǎng)使用光纖和萬兆交換機(jī),F(xiàn)link TaskManager之間交換數(shù)據(jù)全部走業(yè)務(wù)網(wǎng),實(shí)現(xiàn)帶寬的最大化利用。

環(huán)境信息

軟件版本

  • Flink 1.15.4
  • Hadoop 3.1.1

網(wǎng)絡(luò)配置

集群內(nèi)配置有兩個(gè)網(wǎng)絡(luò),分別為:

管理網(wǎng):10.x.x.x。千兆網(wǎng)。
業(yè)務(wù)網(wǎng):192.168.x.x。萬兆網(wǎng)。
集群內(nèi)相互訪問使用hostname,hostname配置的均為業(yè)務(wù)網(wǎng)。

管理網(wǎng)和業(yè)務(wù)網(wǎng)不互通。

問題1:提交任務(wù)成功后出錯(cuò),無法連接到JobManager

另有一個(gè)集群外客戶端節(jié)點(diǎn),其所在網(wǎng)段和管理網(wǎng)相同。在該客戶端節(jié)點(diǎn)提交Flink任務(wù)到集群,出現(xiàn)無法連接到JobManager,報(bào)Connection Refused錯(cuò)誤。錯(cuò)誤中的IP為集群內(nèi)萬兆網(wǎng)的IP。但是作業(yè)能夠提交成功,只是無法獲取到作業(yè)的運(yùn)行狀態(tài)和結(jié)果。

通過這個(gè)錯(cuò)誤不難得知問題是Flink JobManager的bind host配置存在問題。通過查看Yarn界面,找到Flink作業(yè)JobManager所在節(jié)點(diǎn)。使用netstat查看JobManager進(jìn)程對(duì)應(yīng)的端口和綁定IP。我們發(fā)現(xiàn)只有一個(gè)端口綁定的是192.168.x.x,其余都是0.0.0.0。這個(gè)端口對(duì)應(yīng)的是Flink什么端口呢?查看Yarn中該Flink Application的啟動(dòng)日志可以發(fā)現(xiàn)這個(gè)端口是rest端口。

查閱官方文檔,可以使用rest.bind-address方式綁定地址。于是編輯Flink配置文件flink-conf.yaml,增加:

rest.bind-address: 0.0.0.0

保存之后重新提交任務(wù),問題依舊。綁定的地址并沒有發(fā)生變化。查看Flink Web UI中JobManager的Configuration,發(fā)現(xiàn)rest.bind-addressrest.address值相同,都為集群內(nèi)JobManager運(yùn)行所在機(jī)器的hostname。rest.bind-address配置項(xiàng)不生效。

經(jīng)過詳細(xì)問題調(diào)查,發(fā)現(xiàn)是Flink 1.15.x版本之后bug。無論如何配置rest.bind-address,都會(huì)將該屬性的值覆蓋為運(yùn)行時(shí)所在Yarn節(jié)點(diǎn)的地址。參見如下代碼,位于YarnEntrypointUtils.javaloadConfiguration方法。

// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
configuration.set(RestOptions.BIND_ADDRESS, hostname);
// ...

該特性對(duì)應(yīng)的提交為[FLINK-24474] Default rest.bind-address to localhost in flink-conf.yaml。Commit ID:6222532db0f0a1e75811fc215cd66bc26fb74afb

解決方法為修改上述片段的代碼為:

// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
if (!configuration.contains(RestOptions.BIND_ADDRESS)) {
    configuration.set(RestOptions.BIND_ADDRESS, hostname);
}
// ...

按照修改之后的邏輯,只有配置文件中沒有配置rest.bind-address,才會(huì)使用Yarn節(jié)點(diǎn)的地址。如果配置了,則以實(shí)際配置的值為準(zhǔn)。

修改后重新編譯,替換掉Flink客戶端中flink-dist-xxx.jar文件。重新提交作業(yè)可以解決。

博主已通過patch FLINK-35332解決了這個(gè)問題。

問題2:集群外客戶端flink提交時(shí)host無法解析為IP

問題現(xiàn)象:使用flink run方式在集群外提交可以ResourceManager可以解析host為IP,但是yarn-session模式啟動(dòng)的時(shí)候無法解析host。

經(jīng)調(diào)查發(fā)現(xiàn),控制解析的配置項(xiàng)為hadoop.security.token.service.use_ip。

配置集群外客戶端的hdfs-site.xml(這里有伏筆),修改hadoop.security.token.service.use_iptrue

<property>
    <name>hadoop.security.token.service.use_ip</name>
    <value>true</value>
</property>

集群中Hadoop的配置項(xiàng)保持不變,為false。然后嘗試啟動(dòng)yarn-session,問題解決。

查找了Hadoop官網(wǎng),發(fā)現(xiàn)hadoop.security.token.service.use_ip是屬于core-site.xml的配置。集群中和集群外客戶端將其錯(cuò)誤的寫到hdfs-site.xml中。

這里先說結(jié)論,這個(gè)詭異的問題原因在于該配置不能位于hdfs-site.xml中,必須在core-site.xml中。如果該屬性錯(cuò)誤配置到了hdfs-site.xml,會(huì)導(dǎo)致flink run -m yarn-cluster方式提交可以解析host但是yarn-session模式啟動(dòng)yarn-session的時(shí)候無法解析host。至于為什么會(huì)這樣,我們接下來分析。

默認(rèn)的日志級(jí)別不夠詳細(xì),看不出來什么內(nèi)容。將日志開啟DEBUG級(jí)別之后提交任務(wù)。查看日志發(fā)現(xiàn)flink run -m yarn-cluster方式的hadoop.security.token.service.use_ip的值為true,啟動(dòng)yarn-session的時(shí)候hadoop.security.token.service.use_ip的值為false。懷疑yarn-session模式提交的時(shí)候不加載hdfs-site.xml。

仔細(xì)觀察flink yarn-session的啟動(dòng)日志,發(fā)現(xiàn)開頭有這一行:

... org.apache.flink.runtime.util.HadoopUtils [] - Searching Hadoop configuration files in HADOOP_HOME

Flink run命令啟動(dòng)的時(shí)候沒有這一行。懷疑是這里加載了HDFS配置文件。

想要揭開謎底,我們接下來分析Flink yarn-session的啟動(dòng)代碼。

Flink Yarn Session啟動(dòng)代碼

查看yarn-session.sh的最后一行啟動(dòng)Java進(jìn)程代碼,可得知應(yīng)用的入口為FlinkYarnSessionCli類。

"$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

跟蹤main方法,然后找到FlinkYarnSessionCli::run方法,我們找到這一行代碼:

// ...
final YarnClusterDescriptor yarnClusterDescriptor =
        (YarnClusterDescriptor)
                yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
// ...

繼續(xù)跟蹤,查看YarnClusterClientFactory::createClusterDescriptor的內(nèi)容:

@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
    checkNotNull(configuration);

    final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
    YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);

    return getClusterDescriptor(configuration);
}

繼續(xù)分析YarnClusterClientFactory::getClusterDescriptor。

private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    final YarnClient yarnClient = YarnClient.createYarnClient();
    // 獲取Yarn和Hadoop配置
    final YarnConfiguration yarnConfiguration =
            Utils.getYarnAndHadoopConfiguration(configuration);

    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    return new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient),
            false);
}

這里有一個(gè)關(guān)鍵調(diào)用Utils.getYarnAndHadoopConfiguration(configuration)。作用為獲取Yarn和Hadoop配置。

public static YarnConfiguration getYarnAndHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
    yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));

    return yarnConfig;
}

其中getYarnConfiguration調(diào)用中執(zhí)行了new YarnConfiguration該調(diào)用從classpath中加載yarn-default.xml、yarn-site.xmlresource-types.xml文件。
同樣,HadoopUtils.getHadoopConfiguration(flinkConfig)調(diào)用的時(shí)候執(zhí)行了new HdfsConfiguration,加載了hdfs-site.xmlhdfs-default.xmlcore-site.xmlcore-default.xml配置。

到這里可知,F(xiàn)link yarn-session啟動(dòng)的時(shí)候的確加載了hdfs-site.xml配置文件。

Flink run命令的入口類為CliFrontEnd,并沒有使用FlinkYarnSessionCli。因此我們從Hadoop源代碼中打印hadoop.security.token.service.use_ip值這一行DEBUG日志的為止展開分析。

這一行日志位于Hadoop源代碼中SecurityUtil類的setTokenServiceUseIp方法中。我們將相關(guān)的方法貼出來,如下所示:

  static {
    setConfigurationInternal(new Configuration());
  }
  
  private static void setConfigurationInternal(Configuration conf) {
    boolean useIp = conf.getBoolean(
      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
    setTokenServiceUseIp(useIp);
    // ...
  }

  @InterfaceAudience.Private
  @VisibleForTesting
  public static void setTokenServiceUseIp(boolean flag) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Setting "
          + CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP
          + " to " + flag);
    }
    useIpForTokenService = flag;
    hostResolver = !useIpForTokenService
        ? new QualifiedHostResolver()
        : new StandardHostResolver();
  }

不難發(fā)現(xiàn),在SecurityUtil類加載的時(shí)候執(zhí)行了setConfigrationInternal方法,然后間接調(diào)用了setTokenServiceUseIp方法。

setConfigrationInternal傳入的參數(shù)是new Configuration。我們查看它的static代碼塊:

static {
    deprecationContext = new AtomicReference(new DeprecationContext((DeprecationContext)null, defaultDeprecations));
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
        cL = Configuration.class.getClassLoader();
    }

    if (cL.getResource("hadoop-site.xml") != null) {
        LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively");
        addDefaultResource("hadoop-site.xml");
    }

}

發(fā)現(xiàn)它只從classpath中加載core-default.xmlcore-site.xml文件,不會(huì)去加載hdfs-site.xml。

到這里得出結(jié)論:Flink yarn-session提交的時(shí)候加載了hdfs-site.xml文件,但是flink run -m yarn-cluster的時(shí)候沒有加載。分析完畢。

這次問題分析得出的啟示是對(duì)于Hadoop相關(guān)配置,務(wù)必將配置項(xiàng)寫到正確的配置文件中,否則會(huì)出現(xiàn)奇怪的問題,較難定位原因。

Hadoop解析host相關(guān)代碼

這里我們返回思考前面的問題:僅僅將hadoop.security.token.service.use_ip修改true的確可行。但為什么設(shè)置它為false的時(shí)候就無法解析hostname為IP了呢?

為了搞清楚這個(gè)問題,接下來需要分析hadoop.security.token.service.use_ip究竟做了些什么。

跟蹤YarnClient::start調(diào)用找到YarnImpl::serviceStart

protected void serviceStart() throws Exception {
    try {
        this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
        if (this.historyServiceEnabled) {
            this.historyClient.start();
        }
    } catch (IOException var2) {
        IOException e = var2;
        throw new YarnRuntimeException(e);
    }

    super.serviceStart();
}

繼續(xù)跟蹤RMProxy::createRMProxy

private static <T> T createRMProxy(YarnConfiguration conf, Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException {
    if (HAUtil.isHAEnabled(conf)) {
        RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
        return RetryProxy.create(protocol, provider, retryPolicy);
    } else {
        InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
        LOG.info("Connecting to ResourceManager at " + rmAddress);
        T proxy = getProxy(conf, protocol, rmAddress);
        return RetryProxy.create(protocol, proxy, retryPolicy);
    }
}

找到獲取InetSocketAddress的方法ClientRMProxy::getRMAddress

@Private
protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException {
    if (protocol == ApplicationClientProtocol.class) {
        return conf.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
    } else if (protocol == ResourceManagerAdministrationProtocol.class) {
        return conf.getSocketAddr("yarn.resourcemanager.admin.address", "0.0.0.0:8033", 8033);
    } else if (protocol == ApplicationMasterProtocol.class) {
        setAMRMTokenService(conf);
        return conf.getSocketAddr("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030", 8030);
    } else {
        String message = "Unsupported protocol found when creating the proxy connection to ResourceManager: " + (protocol != null ? protocol.getClass().getName() : "null");
        LOG.error(message);
        throw new IllegalStateException(message);
    }
}

繼續(xù)展開YarnConfiguration.getSocketAddr

public InetSocketAddress getSocketAddr(String name, String defaultAddress, int defaultPort) {
    String address;
    if (HAUtil.isHAEnabled(this) && getServiceAddressConfKeys(this).contains(name)) {
        address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
    } else {
        address = this.get(name, defaultAddress);
    }

    return NetUtils.createSocketAddr(address, defaultPort, name);
}

跟蹤NetUtils::createSocketAddr,一路到NetUtils::createSocketAddrForHost

public static InetSocketAddress createSocketAddrForHost(String host, int port) {
    String staticHost = getStaticResolution(host);
    String resolveHost = staticHost != null ? staticHost : host;

    InetSocketAddress addr;
    try {
        InetAddress iaddr = SecurityUtil.getByName(resolveHost);
        if (staticHost != null) {
            iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
        }

        addr = new InetSocketAddress(iaddr, port);
    } catch (UnknownHostException var6) {
        addr = InetSocketAddress.createUnresolved(host, port);
    }

    return addr;
}

最終跟蹤到SecurityUtil::getByName方法:

@Private
public static InetAddress getByName(String hostname) throws UnknownHostException {
    return hostResolver.getByName(hostname);
}

它使用hostResolvergetByName方法解析host。
hostResolverHostResolver類型,具有兩個(gè)實(shí)現(xiàn)類,為StandardHostResolverQulifiedHostResolver兩種。

至于hostResolver具體是哪個(gè)類,答案就在它初始化賦值的地方。我們找到setTokenServiceUseIp方法。

@Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
    useIpForTokenService = flag;
    hostResolver = (HostResolver)(!useIpForTokenService ? new QualifiedHostResolver() : new StandardHostResolver());
}

到這里我們發(fā)現(xiàn)hadoop.security.token.service.use_iptrue對(duì)應(yīng)的是StandardHostResolverfalse對(duì)應(yīng)的是QualifiedHostResolver

StandardHostResolver解析host的方式較為簡(jiǎn)單,直接使用JDK的InetAddress.getByName完成解析。不再過多分析。QualifiedHostResolver解析較為復(fù)雜。前面的問題是hadoop.security.token.service.use_ip配置為false的時(shí)候無法解析host,那接下來的重點(diǎn)則是搞清楚QualifiedHostResolver如何解析host。

我們查看QualifiedHostResolvergetByName方法:

@Override
public InetAddress getByName(String host) throws UnknownHostException {
    InetAddress addr = null;
    byte[] ip;
    if (IPAddressUtil.isIPv4LiteralAddress(host)) {
        ip = IPAddressUtil.textToNumericFormatV4(host);
        addr = InetAddress.getByAddress(host, ip);
    } else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
        ip = IPAddressUtil.textToNumericFormatV6(host);
        addr = InetAddress.getByAddress(host, ip);
    } else if (host.endsWith(".")) {
        addr = this.getByExactName(host);
    } else if (host.contains(".")) {
        addr = this.getByExactName(host);
        if (addr == null) {
            addr = this.getByNameWithSearch(host);
        }
    } else {
        InetAddress loopback = InetAddress.getByName((String)null);
        if (host.equalsIgnoreCase(loopback.getHostName())) {
            addr = InetAddress.getByAddress(host, loopback.getAddress());
        } else {
            addr = this.getByNameWithSearch(host);
            if (addr == null) {
                addr = this.getByExactName(host);
            }
        }
    }

    if (addr == null) {
        throw new UnknownHostException(host);
    } else {
        return addr;
    }
}

該方法的邏輯:

  1. 如果host字符串是合法的IPv4和IPv6字面值,如果是,將且構(gòu)造為InetAddress返回,不用解析hostname。
  2. 如果host字符串以.結(jié)尾,調(diào)用getByExactName解析。
  3. 如果host字符串包含.,先使用getByExactName解析。如果無法解析,使用searchDomain逐個(gè)嘗試解析。
  4. 其余情況,先嘗試按照loopback解析。如果無法解析,按照前面即第三種情況解析。

接下來分析按照host解析IP的方法getByExactName。

InetAddress getByExactName(String host) {
    InetAddress addr = null;
    // 這里將host視為FQDN
    String fqHost = host;
    // 如果FQDN結(jié)尾不是.,在結(jié)尾增加一個(gè)點(diǎn)
    if (!fqHost.endsWith(".")) {
        fqHost = fqHost + ".";
    }

    try {
        // 調(diào)用InetAddress.getByName解析FQDN
        addr = this.getInetAddressByName(fqHost);
        // 構(gòu)建InetAddress
        addr = InetAddress.getByAddress(host, addr.getAddress());
    } catch (UnknownHostException var5) {
    }

    return addr;
}

到這里問題已經(jīng)比較清晰了。集群外客戶端節(jié)點(diǎn)沒有配置DNS,檢查該節(jié)點(diǎn)配置的/etc/hosts,發(fā)現(xiàn)指向集群的hostname都不是.結(jié)尾的。前面方法解析host的時(shí)候在host結(jié)尾拼接了一個(gè).再解析。我們按照這個(gè)邏輯,嘗試將host后面加一個(gè).,然后ping一下這個(gè)host,發(fā)現(xiàn)域名解析錯(cuò)誤,問題鎖定。

PS: 以"."結(jié)尾的FQDN稱之為rooted/absolute FQDN。getByExactName將FQDN轉(zhuǎn)化為absolute FQDN,然后再嘗試解析。

修改/etc/hosts文件,將這些域名結(jié)尾都加上.,然后保持hadoop.security.token.service.use_ipfalse不變,重新提交任務(wù),任務(wù)運(yùn)行一切正常。

這里建議使用DNS方式解析域名。

最后我們?cè)陬~外看下getByNameWithSearch方法做了些什么。

InetAddress getByNameWithSearch(String host) {
    InetAddress addr = null;
    if (host.endsWith(".")) {
        addr = this.getByExactName(host);
    } else {
        Iterator i$ = this.searchDomains.iterator();

        while(i$.hasNext()) {
            String domain = (String)i$.next();
            String dot = !domain.startsWith(".") ? "." : "";
            addr = this.getByExactName(host + dot + domain);
            if (addr != null) {
                break;
            }
        }
    }

    return addr;
}

該方法遍歷每個(gè)search domain(Linux/etc/resolv.conf中的search配置),分別在原始host后面拼接上search domain,逐個(gè)嘗試解析。一旦解析成功立刻返回。

本博客為作者原創(chuàng),歡迎大家參與討論和批評(píng)指正。如需轉(zhuǎn)載請(qǐng)注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容