Flink-application運行模式詳解

image.png

本文目的:

針對main()方法在ApplicationClusterEntryPoint入口類中執(zhí)行,從源碼角度解析

首先和其他集群比對

例如StandaloneSessionClusterEntryPoint模式:main()方法是在客戶端執(zhí)行的。當我們通過如下命令提交任務(wù)時:

$ ./bin/flink run examples/streaming/WordCount.jar

執(zhí)行flink命令,參數(shù)是run,將最終調(diào)用CliFrontend.java類的main()方法:核心邏輯如下
會通過解析將用戶程序生成PackagedProgram類型的對象,PackageProgram類型的對象主要封裝如下信息:

image.png

 /**
     * Executions the run action.
     *
     * @param args Command line arguments for the run action.
     */
    protected void run(String[] args) throws Exception {
        LOG.info("Running 'run' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        // evaluate help flag
        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRun(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ProgramOptions programOptions = ProgramOptions.create(commandLine);

        final List<URL> jobJars = getJobJarAndDependencies(programOptions);

        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
            /**
            * han_pf
            * 執(zhí)行用戶程序,通過反射執(zhí)行提交Job的main()方法,將用戶程序轉(zhuǎn)換成StreamGraph,并生成JobGraph提交到集群。
            */
            executeProgram(effectiveConfiguration, program);
        }


try {
                /**
                * han_pf
                * 通過反射調(diào)用提交job的main()方法。
                */
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetAsContext();
                StreamContextEnvironment.unsetAsContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }

    }

Yarn管理下的Application模式

而對于Yarn管理下的Application模式,客戶端只是進行jar包上傳,以上executeProgram()將在集群側(cè)執(zhí)行(準確來說是Dispatcher啟動過程中執(zhí)行),分析如下:
首先,提交作業(yè)啟動集群(yarn和K8s才支持如下命令,Standalone集群需要直接提交作業(yè)到JM上)

$ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
 protected void runApplication(String[] args) throws Exception {
        LOG.info("Running 'run-application' command.");

        final Options commandOptions = CliFrontendParser.getRunCommandOptions();
        final CommandLine commandLine = getCommandLine(commandOptions, args, true);

        if (commandLine.hasOption(HELP_OPTION.getOpt())) {
            CliFrontendParser.printHelpForRunApplication(customCommandLines);
            return;
        }

        final CustomCommandLine activeCommandLine =
                validateAndGetActiveCommandLine(checkNotNull(commandLine));

        final ApplicationDeployer deployer =
                new ApplicationClusterDeployer(clusterClientServiceLoader);

        final ProgramOptions programOptions;
        final Configuration effectiveConfiguration;

        。。。。

        final ApplicationConfiguration applicationConfiguration =
                new ApplicationConfiguration(
                        programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
      //調(diào)用ApplicationClusterDeployer.run()方法部署程序到集群中
        deployer.run(effectiveConfiguration, applicationConfiguration);
    }

ApplicationClusterDeployer:

 public <ClusterID> void run(
            final Configuration configuration,
            final ApplicationConfiguration applicationConfiguration)
            throws Exception {
        checkNotNull(configuration);
        checkNotNull(applicationConfiguration);

        LOG.info("Submitting application in 'Application Mode'.");

        final ClusterClientFactory<ClusterID> clientFactory =
                clientServiceLoader.getClusterClientFactory(configuration);
        try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                clientFactory.createClusterDescriptor(configuration)) {
            final ClusterSpecification clusterSpecification =
                    clientFactory.getClusterSpecification(configuration);

            clusterDescriptor.deployApplicationCluster(
                    clusterSpecification, applicationConfiguration);
        }
    }

至此,客戶端并未執(zhí)行StreamGraph和JobGraph的轉(zhuǎn)換,只是將用戶程序的JAR上傳。

Standalone管理下的Application模式

  1. 第一步準備jar并提交作業(yè)至JobManager
$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.WordCount

  1. 第二步啟動TM
$ ./bin/taskmanager.sh start

standalone-job.sh:

USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"

STARTSTOP=$1
ENTRY_POINT_NAME="standalonejob"

if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
  echo $USAGE
  exit 1
fi

bin=`dirname "$0"`
bin=`cd "$bin"; pwd`

. "$bin"/config.sh

# Startup parameters
ARGS=("${@:2}")

if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
    # Add cluster entry point specific JVM options
    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
    parseJmArgsAndExportLogs "${ARGS[@]}"

    if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
        ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
    fi
fi

ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")

if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
else
    "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
fi

最終調(diào)用flink-daemon.sh 并傳standalonejob參數(shù):
flink-daemon.sh:

(standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint

StandaloneApplicationClusterEntryPoint分析如下:

image.png

standaloneApplicationClusterEntryPoint:

public static void main(String[] args) {
       // startup checks and logging
       EnvironmentInformation.logEnvironmentInfo(
               LOG, StandaloneApplicationClusterEntryPoint.class.getSimpleName(), args);
       SignalHandler.register(LOG);
       JvmShutdownSafeguard.installAsShutdownHook(LOG);

       final StandaloneApplicationClusterConfiguration clusterConfiguration =
               ClusterEntrypointUtils.parseParametersOrExit(
                       args,
                       new StandaloneApplicationClusterConfigurationParserFactory(),
                       StandaloneApplicationClusterEntryPoint.class);

       Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
      /***************************/
       PackagedProgram program = null;
       try {
           /**
           * han_pf
           * session模式的客戶端會生成這個對象
           */
           program = getPackagedProgram(clusterConfiguration, configuration);
       } catch (Exception e) {
           LOG.error("Could not create application program.", e);
           System.exit(1);
       }

       try {
           configureExecution(configuration, program);
       } catch (Exception e) {
           LOG.error("Could not apply application configuration.", e);
           System.exit(1);
       }
       /***************************/

       StandaloneApplicationClusterEntryPoint entrypoint =
               new StandaloneApplicationClusterEntryPoint(configuration, program);

       ClusterEntrypoint.runClusterEntrypoint(entrypoint);
   }

如上源碼分析流程圖,在ClusterEntrypoint.runClusterEntrypoint()方法執(zhí)行前并未調(diào)用用戶程序的main()方法執(zhí)行,最終調(diào)用main()方法執(zhí)行的是在啟動JobManager的Dispatcher組件過程中調(diào)用的。
在創(chuàng)建DefaultDispatcherRunner對象后執(zhí)行l(wèi)eader選舉,選舉成功最終回調(diào)DefaultDispatcherRunner的grantLeadership()方法:

 @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    /**
                    * han_pf
                    *啟動dispatcher
                    */
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

startNewDispatcherLeaderProcess()繼續(xù)調(diào)用AbstractDispatcherLeaderProcess的onStart()方法:

   public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
        /**
        * han_pf
        *執(zhí)行實現(xiàn)類的onStart方法,實現(xiàn)類有兩個SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
         * 此時是session模式,所以看SessionDispatcherLeaderProcess
        */
        onStart();
    }
image.png

Application模式下調(diào)用JobDispatcherLeaderProcess類的onStart()方法:

    protected void onStart() {
        final DispatcherGatewayService dispatcherService =
                /**
                * han_pf
                * Application模式走不同分支,ApplicationDispatcherGatewayServiceFactory,DefaultDispatcherGatewayServiceFactory
                */
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()),
                        Collections.singleton(jobGraph),
                        ThrowingJobGraphWriter.INSTANCE);

        completeDispatcherSetup(dispatcherService);
    }

ApplicationDispatcherGatewayServiceFactory:

  public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);

        final Dispatcher dispatcher;
        try {
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            /**
                            * han_pf
                            * application模式main方法的執(zhí)行入口
                            */
                            (dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap(application, recoveredJobIds,configuration,dispatcherGateway, scheduledExecutor,errorHandler)
                            ,
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }

        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

調(diào)用new ApplicationDispatcherBootstrap()創(chuàng)建對象:

public ApplicationDispatcherBootstrap(
            final PackagedProgram application,
            final Collection<JobID> recoveredJobIds,
            final Configuration configuration,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final FatalErrorHandler errorHandler) {
        this.configuration = checkNotNull(configuration);
        this.recoveredJobIds = checkNotNull(recoveredJobIds);
        this.application = checkNotNull(application);
        this.errorHandler = checkNotNull(errorHandler);
        /**
        * han_pf
        * 執(zhí)行用戶程序
        */
        this.applicationCompletionFuture =
                fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);

        this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
    }


private void runApplicationEntryPoint(
            final CompletableFuture<List<JobID>> jobIdsFuture,
            final Set<JobID> tolerateMissingResult,
            final DispatcherGateway dispatcherGateway,
            final ScheduledExecutor scheduledExecutor,
            final boolean enforceSingleJobExecution) {
        try {
            final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);

            final PipelineExecutorServiceLoader executorServiceLoader =
                    new EmbeddedExecutorServiceLoader(
                            applicationJobIds, dispatcherGateway, scheduledExecutor);
            /**
            * han_pf
            * 跟客戶端CliFrontend.executeProgram調(diào)用同一個方法。
            */
            ClientUtils.executeProgram(
                    executorServiceLoader,
                    configuration,
                    application,
                    enforceSingleJobExecution,
                    true /* suppress sysout */);

            if (applicationJobIds.isEmpty()) {
                jobIdsFuture.completeExceptionally(
                        new ApplicationExecutionException(
                                "The application contains no execute() calls."));
            } else {
                jobIdsFuture.complete(applicationJobIds);
            }
        } catch (Throwable t) {
            
        }
    }

至此在集群側(cè)將執(zhí)行用戶程序main()方法進行StreamGraph及JobGraph的轉(zhuǎn)換。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容