讀Flink源碼談設(shè)計(jì):圖的抽象與分層

本文首發(fā)于泊浮目的語雀:https://www.yuque.com/17sing

版本 日期 備注
1.0 2022.1.26 文章首發(fā)

0.前言

前陣子組里的小伙伴問我“為什么Flink從我們的代碼到真正可執(zhí)行的狀態(tài),要經(jīng)過這么多個(gè)graph轉(zhuǎn)換?這樣做有什么好處嘛?”我早期看到這里的設(shè)計(jì)時(shí)的確有過相同的疑惑,當(dāng)時(shí)由于手里還在看別的東西,查閱過一些資料后就翻頁了。如今又碰到了這樣的問題,不妨就在這篇文章中好好搞清楚。

本文的源碼基于Flink1.14.0。

1. 分層設(shè)計(jì)

該圖來自Jark大佬的博客:http://wuchong.me/blog/2016/05/03/flink-internals-overview/

以上是Flink的Graph層次圖,在接下來的內(nèi)容我們會(huì)逐一揭開它們的面紗,得知它們存在的意義。

1.1 BatchAPI的OptimizedPlan

在這個(gè)小節(jié)中,我們會(huì)看到DataSet從Plan轉(zhuǎn)換到OptimizedPlan的過程中。為了方便讀者有個(gè)概念,我們?cè)谶@里解釋一下幾個(gè)名詞:

  • DataSet:面向用戶的批處理API。
  • Plan:描述DataSource以及DataSink以及Operation如何互動(dòng)的計(jì)劃。
  • OptimizedPlan:優(yōu)化過的執(zhí)行計(jì)劃。

代碼入口:

|--ClientFrontend#main
  \-- parseAndRun
  \-- runApplication
  \-- getPackagedProgram
  \-- buildProgram
  \-- executeProgram
|-- ClientUtils#executeProgram
|-- PackagedProgram#invokeInteractiveModeForExecution
  \-- callMainMethod //調(diào)用用戶編寫的程序入口
|-- ExecutionEnvironment#execute
  \-- executeAsync // 創(chuàng)建Plan
|-- PipelineExecutorFactory#execute
|-- EmbeddedExecutor#execute
  \-- submitAndGetJobClientFuture
|-- PipelineExecutorUtils#getJobGraph
|--   FlinkPipelineTranslationUtil#getJobGraph
|-- FlinkPipelineTranslator#translateToJobGraph //如果傳入的是Plan,則會(huì)在內(nèi)部實(shí)現(xiàn)中先轉(zhuǎn)換出OptimizedPlan,再轉(zhuǎn)換到JobGraph;如果是StreamGraph,則會(huì)直接轉(zhuǎn)換出JobGraph
|-- PlanTranslator#translateToJobGraph
  \-- compilePlan

我們看一下這段代碼:

    private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
        Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
        OptimizedPlan optimizedPlan = optimizer.compile(plan);

        JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
        return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
    }

非常的清晰。就是從OptimizedPlanJobGraph。OptimizedPlan的轉(zhuǎn)換過程我們看Optimizer#compile方法。先看方法簽名上的注釋:


    /**
     * Translates the given program to an OptimizedPlan. The optimized plan describes for each
     * operator which strategy to use (such as hash join versus sort-merge join), what data exchange
     * method to use (local pipe forward, shuffle, broadcast), what exchange mode to use (pipelined,
     * batch), where to cache intermediate results, etc,
     *
     * <p>The optimization happens in multiple phases:
     *
     * <ol>
     *   <li>Create optimizer dag implementation of the program.
     *       <p><tt>OptimizerNode</tt> representations of the PACTs, assign parallelism and compute
     *       size estimates.
     *   <li>Compute interesting properties and auxiliary structures.
     *   <li>Enumerate plan alternatives. This cannot be done in the same step as the interesting
     *       property computation (as opposed to the Database approaches), because we support plans
     *       that are not trees.
     * </ol>
     *
     * @param program The program to be translated.
     * @param postPasser The function to be used for post passing the optimizer's plan and setting
     *     the data type specific serialization routines.
     * @return The optimized plan.
     * @throws CompilerException Thrown, if the plan is invalid or the optimizer encountered an
     *     inconsistent situation during the compilation process.
     */
    private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser)

這里提到了會(huì)有好幾步來做優(yōu)化:

  1. 創(chuàng)建優(yōu)化過的DAG,為其生成的OptimizerNode遵循PACT模型,并為其分配了并發(fā)度以及計(jì)算資源。
  2. 生成一些重要的屬性以及輔助性數(shù)據(jù)結(jié)構(gòu)。
  3. 枚舉所有的代替方案。

在方法的實(shí)現(xiàn)中,會(huì)創(chuàng)建大量的Visitor來對(duì)程序做遍歷優(yōu)化。

1.1.1 GraphCreatingVisitor

首先是創(chuàng)建GraphCreatingVisitor,對(duì)原始的Plan進(jìn)行優(yōu)化,將每個(gè)operator優(yōu)化成OptimizerNode,OptimizerNode之間通過DagConnection相連,DagConnection相當(dāng)于一個(gè)邊模型,有source和target,可以表示OptimizerNode的輸入和輸出。在這個(gè)過程中會(huì)做這些事:

  1. 為每個(gè)算子創(chuàng)建一個(gè)OptimizerNode——更加接近執(zhí)行描述的Node(估算出數(shù)據(jù)的大小、data flow在哪里進(jìn)行拆分和合并等)
  2. 用Channel將它們連接起來
  3. 根據(jù)建議生成相應(yīng)的策略:Operator用什么策略執(zhí)行:比如Hash Join or Sort Merge Join;Operator間的數(shù)據(jù)交換策略,是Local Pipe Forward、Shuffle,還是Broadcast;Operator間的數(shù)據(jù)交換模式,是Pipelined還是Batch。

1.1.2 IdAndEstimatesVisitor

顧名思義,為每個(gè)算子生成id,并估算其數(shù)據(jù)量。估算的實(shí)現(xiàn)見OptimizerNode#computeOutputEstimates——這是一個(gè)抽象函數(shù),我們可以關(guān)注一下DataSourceNode里的實(shí)現(xiàn),它會(huì)根據(jù)上游數(shù)據(jù)源的一系列屬性(比如行數(shù)、大?。┑贸龉浪阒?。但這段代碼放在這里并不合適
,作者的原意似乎是關(guān)注file類型的上游,注釋這么說道:see, if we have a statistics object that can tell us a bit about the file

1.1.3 UnionParallelismAndForwardEnforcer

這里會(huì)保證UnionNode的并發(fā)度與下游對(duì)其,避免數(shù)據(jù)分布有誤而導(dǎo)致數(shù)據(jù)不精準(zhǔn)(見https://github.com/apache/flink/pull/5742)。

1.1.4 BranchesVisitor

計(jì)算不會(huì)閉合的下游子DAG圖。見其定義:


    /**
     * Description of an unclosed branch. An unclosed branch is when the data flow branched (one
     * operator's result is consumed by multiple targets), but these different branches (targets)
     * have not been joined together.
     */
    public static final class UnclosedBranchDescriptor {

1.1.5 InterestingPropertyVisitor

根據(jù)Node的屬性估算成本。

估算算法見:node.computeInterestingPropertiesForInputs

  • WorksetIterationNode
  • TwoInputNode
  • SingleInputNode
  • BulkIterationNode

之后便會(huì)根據(jù)成本算出一系列的執(zhí)行計(jì)劃:

        // the final step is now to generate the actual plan alternatives
        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);

在這里,OptimizerNode優(yōu)化成了PlanNode,PlanNode是最終的優(yōu)化節(jié)點(diǎn)類型,它包含了節(jié)點(diǎn)的更多屬性,節(jié)點(diǎn)之間通過Channel進(jìn)行連接,Channel也是一種邊模型,同時(shí)確定了節(jié)點(diǎn)之間的數(shù)據(jù)交換方式ShipStrategyType和DataExchangeMode,ShipStrategyType表示的兩個(gè)節(jié)點(diǎn)之間數(shù)據(jù)的傳輸策略,比如是否進(jìn)行數(shù)據(jù)分區(qū),進(jìn)行hash分區(qū),范圍分區(qū)等; DataExchangeMode表示的是兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)交換的模式,有PIPELINED和BATCH,和ExecutionMode是一樣的,ExecutionMode決定了DataExchangeMode——直接發(fā)下去還是先落盤。

1.1.6 PlanFinalizer.createFinalPlan

PlanFinalizer.createFinalPlan()。其大致的實(shí)現(xiàn)就是將節(jié)點(diǎn)添加到sources、sinks、allNodes中,還可能會(huì)為每個(gè)節(jié)點(diǎn)設(shè)置任務(wù)占用的內(nèi)存等。

1.1.7 BinaryUnionReplacer

顧名思義,針對(duì)上游同樣是Union的操作做去重替換,合并到一起。筆者認(rèn)為,這在輸出等價(jià)的情況下,減少了Node的生成。

1.1.8 RangePartitionRewriter

在使用范圍分區(qū)這一特性時(shí),需要盡可能保證各分區(qū)所處理的數(shù)據(jù)集均衡性以最大化利用計(jì)算資源并減少作業(yè)的執(zhí)行時(shí)間。為此,優(yōu)化器提供了范圍分區(qū)重寫器(RangePartitionRewriter)來對(duì)范圍分區(qū)的分區(qū)策略進(jìn)行優(yōu)化,使其盡可能平均地分配數(shù)據(jù),避免數(shù)據(jù)傾斜。

如果要盡可能的平均分配數(shù)據(jù),肯定要對(duì)數(shù)據(jù)源進(jìn)行估算。但顯然是沒法讀取所有的數(shù)據(jù)進(jìn)行估算的,這里Flink采用了ReservoirSampling算法的改良版——可以參考論文Optimal Random Sampling from Distributed Streams Revisited,在代碼中由org.apache.flink.api.java.sampling.ReservoirSamplerWithReplacementorg.apache.flink.api.java.sampling.ReservoirSamplerWithoutReplacement實(shí)現(xiàn)。

值得一提的是,無論是Plan還是OptimizerNode都實(shí)現(xiàn)了Visitable接口,這是典型的策略模式使用,這讓代碼變得非常靈活,正如注釋所說的——遍歷方式是可以自由編寫的。

package org.apache.flink.util;

import org.apache.flink.annotation.Internal;

/**
 * This interface marks types as visitable during a traversal. The central method <i>accept(...)</i>
 * contains the logic about how to invoke the supplied {@link Visitor} on the visitable object, and
 * how to traverse further.
 *
 * <p>This concept makes it easy to implement for example a depth-first traversal of a tree or DAG
 * with different types of logic during the traversal. The <i>accept(...)</i> method calls the
 * visitor and then send the visitor to its children (or predecessors). Using different types of
 * visitors, different operations can be performed during the traversal, while writing the actual
 * traversal code only once.
 *
 * @see Visitor
 */
@Internal
public interface Visitable<T extends Visitable<T>> {

    /**
     * Contains the logic to invoke the visitor and continue the traversal. Typically invokes the
     * pre-visit method of the visitor, then sends the visitor to the children (or predecessors) and
     * then invokes the post-visit method.
     *
     * <p>A typical code example is the following:
     *
     * <pre>{@code
     * public void accept(Visitor<Operator> visitor) {
     *     boolean descend = visitor.preVisit(this);
     *     if (descend) {
     *         if (this.input != null) {
     *             this.input.accept(visitor);
     *         }
     *         visitor.postVisit(this);
     *     }
     * }
     * }</pre>
     *
     * @param visitor The visitor to be called with this object as the parameter.
     * @see Visitor#preVisit(Visitable)
     * @see Visitor#postVisit(Visitable)
     */
    void accept(Visitor<T> visitor);
}

1.2 StreamAPI的StreamGraph

構(gòu)造StreamGraph的入口函數(shù)是 StreamGraphGenerator.generate()。該函數(shù)會(huì)由觸發(fā)程序執(zhí)行的方法StreamExecutionEnvironment.execute()調(diào)用到。就像OptimizedPlan,StreamGraph 也是在 Client 端構(gòu)造的。

在這個(gè)過程中,流水線首先被轉(zhuǎn)換為Transformation流水線,然后被映射為StreamGraph,該圖與具體的執(zhí)行無關(guān),核心是表達(dá)計(jì)算過程的邏輯。

關(guān)于Transformation的引入,可以看社區(qū)的issue:https://issues.apache.org/jira/browse/FLINK-2398。本質(zhì)是為了避免DataStream這一層對(duì)StreamGraph的耦合,因此引入這一層做解耦。

Transformation關(guān)注的屬性偏向框架內(nèi)部,如:name(算子名)、uid(job重啟時(shí)分配之前相同的狀態(tài),持久保存狀態(tài))、bufferTimeout、parallelism、outputType、soltSharingGroup等。另外,Transformation分為物理Transformation和虛擬Transformation,這于下一層的StreamGraph實(shí)現(xiàn)是有關(guān)聯(lián)的。

StreamGraph的核心對(duì)象有兩個(gè):

  • StreamNode:它可以有多個(gè)輸出,也可以有多個(gè)輸入。由Transformation轉(zhuǎn)換而來——實(shí)體的StreamNode會(huì)最終變成物算子,虛擬的StreamNode會(huì)附著在StreamEdge上。
  • StreamEdge:StreamGraph的邊,用于連接兩個(gè)StreamNode。就像上面說的——一個(gè)StreamNode可以有多個(gè)出邊、入邊。StreamEdge中包含了旁路輸出、分區(qū)器、字段篩選輸出(與SQL Select中選擇字段的邏輯一樣)等的信息。

具體的轉(zhuǎn)換代碼在org.apache.flink.streaming.api.graph.StreamGraphGenerator中,每個(gè)Transformation都有對(duì)應(yīng)的轉(zhuǎn)換邏輯:

    static {
        @SuppressWarnings("rawtypes")
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                tmp = new HashMap<>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
        tmp.put(
                TimestampsAndWatermarksTransformation.class,
                new TimestampsAndWatermarksTransformationTranslator<>());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
        tmp.put(
                KeyedBroadcastStateTransformation.class,
                new KeyedBroadcastStateTransformationTranslator<>());
        translatorMap = Collections.unmodifiableMap(tmp);
    }

1.3 流批一體的JobGraph

代碼入口和1.1小節(jié)幾乎一摸一樣,DataSet的入口類是ExecutionEnvironment,而DataStream的入口是StreamExecutionEnvironment。PlanTranslator變成了StreamGraphTranslator。所以,StreamGraph到JobGraph的轉(zhuǎn)化也是在Client端進(jìn)行的,主要工作做優(yōu)化。其中非常重要的一個(gè)優(yōu)化就是Operator Chain,它會(huì)將條件允許的算子合并到一起,避免跨線程、跨網(wǎng)絡(luò)的傳遞。

是否開啟OperationChain可以在程序中顯示的調(diào)整。

接下來,我們來看下JobGraph到底是什么。先看注釋:


/**
 * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
 * All programs from higher level APIs are transformed into JobGraphs.
 *
 * <p>The JobGraph is a graph of vertices and intermediate results that are connected together to
 * form a DAG. Note that iterations (feedback edges) are currently not encoded inside the JobGraph
 * but inside certain special vertices that establish the feedback channel amongst themselves.
 *
 * <p>The JobGraph defines the job-wide configuration settings, while each vertex and intermediate
 * result define the characteristics of the concrete operation and intermediate data.
 */
public class JobGraph implements Serializable {

它是一張圖,由verticesintermediate組成。并且它是一個(gè)低等級(jí)的API,為JobMaster而生——所有高等級(jí)的API都會(huì)被轉(zhuǎn)換成JobGraph。接下來我們需要關(guān)注的對(duì)象分別是JobVertex、JobEdgeIntermediateDataSet。其中,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。

1.3.1 JobVertex

經(jīng)過符合條件的多個(gè)StreamNode經(jīng)過優(yōu)化后的可能會(huì)融合在一起生成一個(gè)JobVertex,即一個(gè)JobVertex包含一個(gè)或多個(gè)算子(有興趣的同學(xué)可以看StreamingJobGraphGenerator#buildChainedInputsAndGetHeadInputs或者閱讀相關(guān)的Issue:https://issues.apache.org/jira/browse/FLINK-19434)。

1.3.2 JobEdge

JobEdge是連接IntermediateDatSet和JobVertex的邊,代表著JobGraph中的一個(gè)數(shù)據(jù)流轉(zhuǎn)通道,其上游是IntermediateDataSet,下游是JobVertex——數(shù)據(jù)通過JobEdge由IntermediateDataSet傳遞給目標(biāo)JobVertex。

在這里,我們要關(guān)注它的一個(gè)成員變量:


/**
 * A distribution pattern determines, which sub tasks of a producing task are connected to which
 * consuming sub tasks.
 *
 * <p>It affects how {@link ExecutionVertex} and {@link IntermediateResultPartition} are connected
 * in {@link EdgeManagerBuildUtil}
 */
public enum DistributionPattern {

    /** Each producing sub task is connected to each sub task of the consuming task. */
    ALL_TO_ALL,

    /** Each producing sub task is connected to one or more subtask(s) of the consuming task. */
    POINTWISE
}

該分發(fā)模式會(huì)直接影響執(zhí)行時(shí)Task之間的數(shù)據(jù)連接關(guān)系:點(diǎn)對(duì)點(diǎn)連接or全連接(或者叫廣播)。

1.3.3 IntermediateDataSet

中間數(shù)據(jù)集IntermediateDataSet是一種邏輯結(jié)構(gòu),用來表示JobVertex的輸出,即該JobVertex中包含的算子會(huì)產(chǎn)生的數(shù)據(jù)集。在這里我們需要關(guān)注ResultPartitionType:

  • Blocking:顧名思義。都上游處理完數(shù)據(jù)后,再交給下游處理。這個(gè)數(shù)據(jù)分區(qū)可以被消費(fèi)多次,也可以并發(fā)消費(fèi)。這個(gè)分區(qū)并不會(huì)被自動(dòng)銷毀,而是交給調(diào)度器判斷。
  • BlokingPersistent:類似于Blocking,但是其生命周期由用戶端指定。調(diào)用JobMaster或者ResourceManager的API來銷毀,而不是由調(diào)度器控制。
  • Pipelined:流交換模式??梢杂糜谟薪绾蜔o界流。這種分區(qū)類型的數(shù)據(jù)只能被每個(gè)消費(fèi)者消費(fèi)一次。且這種分區(qū)可以保留任意數(shù)據(jù)。
  • PipelinedBounded:與Pipelined有些不同,這種分區(qū)保留的數(shù)據(jù)是有限的,這不會(huì)使數(shù)據(jù)和檢查點(diǎn)延遲太久。因此適用于流計(jì)算場(chǎng)景(需注意,批處理模式下沒有CheckpointBarrier)。
  • Pipelined_Approximate:1.12引入的策略,用于針對(duì)單個(gè)task做fast failover的分區(qū)策略。有興趣的同學(xué)可以閱讀相關(guān)issue:https://issues.apache.org/jira/browse/FLINK-18112。

不同的執(zhí)行模式下,其對(duì)應(yīng)的結(jié)果分區(qū)類型不同,決定了在執(zhí)行時(shí)刻數(shù)據(jù)交換的模式。

IntermediateDataSet的個(gè)數(shù)與該JobVertext對(duì)應(yīng)的StreamNode的出邊數(shù)量相同,可以是一個(gè)或者多個(gè)。

1.4 ExecutionGraph

JobManager接收到Client端提交的JobGraph及其依賴Jar之后就要開始調(diào)度運(yùn)行該任務(wù)了,但JobGraph還是一個(gè)邏輯上的圖,需要再進(jìn)一步轉(zhuǎn)化為并行化、可調(diào)度的執(zhí)行圖。這個(gè)動(dòng)作是JobMaster做的——通過SchedulerBase觸發(fā),實(shí)際動(dòng)作交給DefaultExecutionGraphBuilder#buildGraph來做。在這些動(dòng)作中,會(huì)生成與JobVertex對(duì)應(yīng)的ExecutionJobVertex(邏輯概念)和ExecutionVertex,與IntermediateDataSet對(duì)應(yīng)的IntermediateResult(邏輯概念)和IntermediateResultPartition等,所謂的并行度也將通過上述類實(shí)現(xiàn)。

接下來要聊聊ExecutionGraph的一些細(xì)節(jié),會(huì)涉及一些邏輯概念,因此在這里筆者畫了一張圖,便于參考。


1.4.1 ExecutionJobVertex與ExecutionVertex

ExecutionJobVertex和JobGraph中的JobVertex一一對(duì)應(yīng)。該對(duì)象還包含一組ExecutionVertex,數(shù)量與該JobVertex中所包含的StreamNode的并行度一致,如上圖所示,如果并行度為N,那么就會(huì)有N個(gè)ExecutionVertex。所以每一個(gè)并行執(zhí)行的實(shí)例就是ExecutionVertex。同時(shí)也會(huì)構(gòu)建ExecutionVertex的輸出IntermediateResult。

因此ExecutionJobVertex更像是一個(gè)邏輯概念。

1.4.2 IntermediaResult與IntermediaResultParitition

IntermediateResult表示ExecutionJobVertex的輸出,和JobGraph中的IntermediateDataSet一一對(duì)應(yīng),該對(duì)象也是一個(gè)邏輯概念。同理,一個(gè)ExecutionJobVertex可以有多個(gè)中間結(jié)果,取決于當(dāng)前JobVertex有幾個(gè)出邊(JobEdge)。

一個(gè)中間結(jié)果集包含多個(gè)中間結(jié)果分區(qū)IntermediateResultPartition,其個(gè)數(shù)等于該Job Vertext的并發(fā)度,或者叫作算子的并行度。每個(gè)IntermediateResultPartition表示1個(gè)ExecutionVertex輸出結(jié)果。

1.4.3 Execution

ExecutionVertex在Runtime對(duì)應(yīng)了一個(gè)Task。在真正執(zhí)行的時(shí)會(huì)將ExecutionVerterx包裝為一個(gè)Execution。

關(guān)于JobGraph如何提交到JobMaster不是本文的重點(diǎn),有興趣的同學(xué)可以自行查看org.apache.flink.runtime.dispatcher.DispatcherGateway#submitJob的相關(guān)調(diào)用棧。

1.4.5 從JobGraph到ExecutionGraph

上面介紹了幾個(gè)重要概念。接下來看一下ExecutionGraph的構(gòu)建過程。主要參考方法為org.apache.flink.runtime.executiongraph.DefaultExecutionGraph#attachJobGraph

首先是構(gòu)建ExecutionJobVertex(參考其構(gòu)造方法),設(shè)置其并行度、共享Solt、CoLocationGroup,并構(gòu)建IntermediaResult與IntermediaResuktParitition,根據(jù)并發(fā)度創(chuàng)建ExecutionVertex,并檢查IntermediateResults是否有重復(fù)引用。最后,會(huì)對(duì)可切分的數(shù)據(jù)源進(jìn)行切分。

其次便是構(gòu)建Edge(參考 org.apache.flink.runtime.executiongraph.EdgeManagerBuildUtil#connectVertexToResult)。根據(jù)DistributionPattern來創(chuàng)建EdgeManager,并將ExecutionVertex和IntermediateResult關(guān)聯(lián)起來,為運(yùn)行時(shí)建立Task之間的數(shù)據(jù)交換就是以此為基礎(chǔ)建立數(shù)據(jù)的物理傳輸通道的。

1.4.6 開胃菜:從ExecutionGraph到真正的執(zhí)行

當(dāng)JobMaster生成ExecutionGraph后,便進(jìn)入了作業(yè)調(diào)度階段。這里面涉及到了不同的調(diào)度策略、資源申請(qǐng)、任務(wù)分發(fā)以及Failover的管理。涉及的內(nèi)容極多,因此會(huì)在另外的文章中討論。對(duì)此好奇的同學(xué),可以先看DefaultExecutionGraphDeploymentTest#setupScheduler,里面的代碼較為簡單,可以觀察ExecutionGraph到Scheduling的過程。

    private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2)
            throws Exception {
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);

        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);

        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();

        // execution graph that executes actions synchronously
        final SchedulerBase scheduler =
                SchedulerTestingUtils.newSchedulerBuilder(
                                JobGraphTestUtils.streamingJobGraph(v1, v2),
                                ComponentMainThreadExecutorServiceAdapter.forMainThread())
                        .setExecutionSlotAllocatorFactory(
                                SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory())
                        .setFutureExecutor(executorService)
                        .setBlobWriter(blobWriter)
                        .build();
        final ExecutionGraph eg = scheduler.getExecutionGraph();

        checkJobOffloaded((DefaultExecutionGraph) eg);

        // schedule, this triggers mock deployment
        scheduler.startScheduling();

        Map<ExecutionAttemptID, Execution> executions = eg.getRegisteredExecutions();
        assertEquals(dop1 + dop2, executions.size());

        return scheduler;
    }

2.小結(jié)

通過本文,我們了解各層圖存在的意義:

  • StreamGraph與OptimizedPlan:從外部API轉(zhuǎn)向內(nèi)部API,生成Graph的基本屬性。如果是批處理,則會(huì)進(jìn)行一系列的優(yōu)化。
  • JobGraph:流批統(tǒng)一的Graph。在這里做一些通用的優(yōu)化,比如OperatorChain。
  • ExecutionGraph:可執(zhí)行級(jí)別的圖,構(gòu)建時(shí)關(guān)注大量的執(zhí)行細(xì)節(jié):如并發(fā)、Checkpoint配置有效性、監(jiān)控打點(diǎn)設(shè)置、重復(fù)引用檢查、可切分的數(shù)據(jù)源進(jìn)行切分等等。

通過圖的分層,F(xiàn)link將不同的優(yōu)化項(xiàng)、檢查項(xiàng)放到了合適它們的層次,這也是單一職責(zé)原則的體現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容