flink之jobgraph---JobVertex

一。JobVertex

在jobgraph中有一個(gè)組成“元素”:JobVertex是不得不提的:jobvertex用于產(chǎn)生intermediate dataset,并通過jobedge串聯(lián)不同的jobvertex同時(shí)也是將operator chain的"關(guān)鍵點(diǎn)"。 jobvertex是從job層面對task進(jìn)行抽象。

二。源碼

第一部分:屬性或字段

       // --------------------------------------------------------------------------------------------
    // Members that define the structure / topology of the graph
    // --------------------------------------------------------------------------------------------

    // 當(dāng)前jobvertex id,與之關(guān)聯(lián)的對應(yīng)類:JobVertexID
    // 關(guān)于jobvertex id產(chǎn)生:
    //
    /** The ID of the vertex. */
    private final JobVertexID id;

    // 候選vertex id
    /** The alternative IDs of the vertex. */
    private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();

    // jobvertex關(guān)聯(lián)的operator
    /** The IDs of all operators contained in this vertex. */
    private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();

    // 候選operatorid
    /** The alternative IDs of all operators contained in this vertex. */
    private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();

    // 當(dāng)前jobvertex產(chǎn)出的臨時(shí)中間數(shù)據(jù)集:IntermediateDataSets
    /** List of produced data sets, one per writer */
    private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();

    // 當(dāng)前jobvertex提供給下游jobvertex讀取的通道:一個(gè)下游讀取vertex對應(yīng)一個(gè)reader關(guān)聯(lián)一個(gè)jobedge
    /** List of edges with incoming data. One per Reader. */
    private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();

    // 該task在runtime時(shí)被分割的subtask
    /** Number of subtasks to split this task into at runtime.*/
    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

    // 當(dāng)前task在runtime時(shí)被分割成subtask的最小任務(wù)
    /** Maximum number of subtasks to split this task into a runtime. */
    private int maxParallelism = -1;

    // 當(dāng)前jobvertex需要最小程度的資源
    /** The minimum resource of the vertex */
    private ResourceSpec minResources = ResourceSpec.DEFAULT;

    // 當(dāng)前jobvertex采用的最優(yōu)資源
    /** The preferred resource of the vertex */
    private ResourceSpec preferredResources = ResourceSpec.DEFAULT;

    // 在runtime期間分配給當(dāng)前task的配置信息
    /** Custom configuration passed to the assigned task at runtime. */
    private Configuration configuration;

    // 當(dāng)前task執(zhí)行invoke的class
    /** The class of the invokable. */
    private String invokableClassName;

    // 標(biāo)示當(dāng)前jobvertex是否停止
    /** Indicates of this job vertex is stoppable or not. */
    private boolean isStoppable = false;

    // input通過format被split
    /** Optionally, a source of input splits */
    private InputSplitSource<?> inputSplitSource;

    // 當(dāng)前jobvertex的name
    /** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
    private String name;

    // 通過定義sharing group來保證來自不同jobvertex能夠并發(fā)運(yùn)行在一個(gè)slot里面
    // 特別是使用coLocation時(shí) 是需要對應(yīng)的task屬于同于一個(gè)sharding group
    /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
    private SlotSharingGroup slotSharingGroup;

    // jobvertex的子任務(wù)共享slot的組,operator chain
    /** The group inside which the vertex subtasks share slots */
    private CoLocationGroup coLocationGroup;

    // 如下參數(shù)都是被記錄在json plan中的
    // operator name
    /** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */
    private String operatorName;

    // 針對當(dāng)前jobvertex的描述
    /** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
     * to be included in the JSON plan */
    private String operatorDescription;

    /** Optional, pretty name of the operator, to be displayed in the JSON plan */
    private String operatorPrettyName;

    // 主要記錄針對operator進(jìn)行優(yōu)化的property,會被寫入到j(luò)son plan中
    /** Optional, the JSON for the optimizer properties of the operator result,
     * to be included in the JSON plan */
    private String resultOptimizerProperties;

    // 調(diào)度這個(gè)jobvertex時(shí),依賴輸入限制策略:
    // 1。ANY:只要上游的input相關(guān)的subtask有完成的 即可開啟當(dāng)前jobvertex的subtask處理
    // 2。ALL:必須等待上游的input相關(guān)subtasks全部完成,才會啟動jobvertex的subtask來獲取數(shù)據(jù)
    // 一般來說在指定一致性語義時(shí)需要注意這兩種策略
    /** The input dependency constraint to schedule this vertex. */
    private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;

第二部分:方法或函數(shù)

  1. 被執(zhí)行的具體task
  /**
     * Returns the invokable class which represents the task of this vertex
     * 
     * @param cl The classloader used to resolve user-defined classes
     * @return The invokable class, <code>null</code> if it is not set
     */
    // 用來指定當(dāng)前jobvertex對應(yīng)具體的task
    // 比如DataSinkTask/BatchTask等 會被執(zhí)行的task
    public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
        if (cl == null) {
            throw new NullPointerException("The classloader must not be null.");
        }
        if (invokableClassName == null) {
            return null;
        }

        try {
            return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("The user-code class could not be resolved.", e);
        }
        catch (ClassCastException e) {
            throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
        }
    }

2.跟operator chain相關(guān)的內(nèi)容

/**
     * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
     * slot sharing group can run one subtask each in the same slot.
     * 
     * @param grp The slot sharing group to associate the vertex with.
     */
    // 將該jobvertex通過slotgroup組合起來便于schedule,
    // 只要在相同的slot group隸屬不同的job vertex能夠彼此在同一個(gè)slot被調(diào)用
    public void setSlotSharingGroup(SlotSharingGroup grp) {
        if (this.slotSharingGroup != null) {
            this.slotSharingGroup.removeVertexFromGroup(id);
        }

        this.slotSharingGroup = grp;
        if (grp != null) {
            grp.addVertexToGroup(id);
        }
    }

    /**
     * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
     * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
     * a slot sharing group, this method returns {@code null}.
     * 
     * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
     */
    // 當(dāng)一個(gè)jobvertex未被分配slotgroup時(shí) 與該jobvertex是沒有關(guān)聯(lián)的slotgroup,直接返回null
    // 默認(rèn)情況下slotgroup名字:default
    @Nullable
    public SlotSharingGroup getSlotSharingGroup() {
        return slotSharingGroup;
    }

    /**
     * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
     * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
     * instance (TaskManager) as the n'th subtask of the given vertex.
     * 
     * NOTE: Co-location is only possible between vertices in a slot sharing group.
     * 
     * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
     * respective vertex must be a (transitive) input of this vertex.
     * 
     * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
     * 
     * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
     *                                  slot sharing group.
     * 
     * @see #setSlotSharingGroup(SlotSharingGroup)
     */
    // 將當(dāng)前jobvertex與給定的jobvertex的subtasks關(guān)聯(lián)在一起
    // 也就意味著比如當(dāng)前jobvertex的第n個(gè)subtask和給定的jobvertex第n任務(wù)在相同tm上被同一個(gè)slot執(zhí)行
    // 需要一些限制條件:
    // 1。需要當(dāng)前的jobvertex和給定jobvertex隸屬同一個(gè)slotgroup
    // 2。當(dāng)前jobvertex的input是依賴與之關(guān)聯(lián)jobvertex,是可傳遞的,forward
    // 在進(jìn)行operator chain時(shí)會通過這種方式來進(jìn)行處理
    public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
        if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
            throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
        }

        CoLocationGroup thisGroup = this.coLocationGroup;
        CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
        // 首先保證兩個(gè)jobvertex隸屬同一slotgroup
        if (otherGroup == null) {
            if (thisGroup == null) { // 兩個(gè)jobvertex不隸屬任何slotgroup;直接構(gòu)建一個(gè)colocationgroup將jobvertex關(guān)聯(lián)在一起
                CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
                this.coLocationGroup = group;
                strictlyCoLocatedWith.coLocationGroup = group;
            }
            else { // 指定的jobvertex沒有對應(yīng)的slotgroup,直接使用當(dāng)前的jobvertex所在的slotgroup
                thisGroup.addVertex(strictlyCoLocatedWith);
                strictlyCoLocatedWith.coLocationGroup = thisGroup;
            }
        }
        else {
            if (thisGroup == null) { // 當(dāng)前的jobvertex不存在對應(yīng)的slotgroup,使用給定的jobvertex的slotgroup
                otherGroup.addVertex(this);
                this.coLocationGroup = otherGroup;
            }
            else { // 兩個(gè)jobvertex具備不同的slotgroup 需要進(jìn)行合并
                // both had yet distinct groups, we need to merge them
                thisGroup.mergeInto(otherGroup);
            }
        }
    }
  1. ResultPartitionType
    表示jobvertex產(chǎn)生intermediate dataset后以什么方式進(jìn)行partition
/**
     * Blocking partitions represent blocking data exchanges, where the data stream is first
     * fully produced and then consumed. This is an option that is only applicable to bounded
     * streams and can be used in bounded stream runtime and recovery.
     *
     * <p>Blocking partitions can be consumed multiple times and concurrently.
     *
     * <p>The partition is not automatically released after being consumed (like for example the
     * {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
     * that the partition is no longer needed.
     */
    // 以block方式傳輸數(shù)據(jù)
    // 需要當(dāng)前的數(shù)據(jù)要全部生產(chǎn)完成,方可消費(fèi)
    // 該partition方式常用于bound stream,既能用于正常的數(shù)據(jù)處理又可以用于故障恢復(fù)
    // 需要注意一點(diǎn):block產(chǎn)生的partition可以被重復(fù)消費(fèi)
    BLOCKING(false, false, false, false),

    /**
     * BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have
     * a user-specified life cycle.
     *
     * <p>BLOCKING_PERSISTENT partitions are dropped upon explicit API calls to the
     * JobManager or ResourceManager, rather than by the scheduler.
     *
     * <p>Otherwise, the partition may only be dropped by safety-nets during failure handling
     * scenarios, like when the TaskManager exits or when the TaskManager looses connection
     * to JobManager / ResourceManager for too long.
     */
    // 類似block partition方式,不過用戶可以指定生命周期
    // 針對BLOCKING_PERSISTENT partition被清理,只能由jobmanager或resourcemanager來完成,不能通過scheduler來完成
    // 不過出現(xiàn)tm退出或者tm與jobmanager間失聯(lián)時(shí)間過長,此時(shí)BLOCKING_PERSISTENT partition只能在失敗處理期間通過safety-nets來清理
    BLOCKING_PERSISTENT(false, false, false, true),

    /**
     * A pipelined streaming data exchange. This is applicable to both bounded and unbounded streams.
     *
     * <p>Pipelined results can be consumed only once by a single consumer and are automatically
     * disposed when the stream has been consumed.
     *
     * <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
     * the {@link #PIPELINED_BOUNDED} variant.
     */
    // 數(shù)據(jù)以pipelined的方式進(jìn)行傳輸,能夠支持流和批處理
    //  針對pipelined產(chǎn)生的partition只能被消費(fèi)一次并且是一個(gè)consumer;
    //  一旦pipelined的partition被消費(fèi)過 將會自動被丟棄
    PIPELINED(true, true, false, false),

    /**
     * Pipelined partitions with a bounded (local) buffer pool.
     *
     * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
     * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
     * overall network buffer pool size, this, however, still allows to be flexible with regards
     * to the total number of partitions by selecting an appropriately big network buffer pool size.
     *
     * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
     * no checkpoint barriers.
     */
    // 類似Pipelined partition,不過它還附帶了一個(gè)有界的本地buffer pool
    // PIPELINED_BOUNDED用于stream job時(shí)通過指定固定大小的buffer pool
    // 方面一。能夠?qū)?shù)據(jù)固定大小的數(shù)據(jù)進(jìn)行buffer,這樣也可以使得checkpoint對齊不會被延遲太久,整體數(shù)據(jù)處理吞吐量也會提升;
    // 另一方面,由于使用的是固定大小的buffer pool將數(shù)據(jù)buffer,能夠相對較好調(diào)整及時(shí)性和吞吐量兩者的平衡
    // 不過該partition方式的本地buffer pool不同于network buffer pool大小的限制,該方式能通過選擇適當(dāng)?shù)拇蟮木W(wǎng)絡(luò)緩沖池大小來靈活地控制分區(qū)的總數(shù)
    // 針對batch job來說由于不存在checkpoint對齊的過程,是沒有限制的
    PIPELINED_BOUNDED(true, true, true, false);

三。源碼
JobVertex.java源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • System Architecture 分布式系統(tǒng)需要解決:分配和管理在集群的計(jì)算資源、處理配合、持久和可訪問的數(shù)...
    allin8116閱讀 1,009評論 0 0
  • 1. 主要內(nèi)容 本文主要是將用戶寫的java程序如何生成Flink JobGraph的過程與邏輯追蹤了一下,歡迎有...
    ni_d58f閱讀 1,455評論 0 1
  • 很實(shí)用的編程英語詞庫,共收錄一千五百余條詞匯。 第一部分: application 應(yīng)用程式 應(yīng)用、應(yīng)用程序app...
    春天的蜜蜂閱讀 1,606評論 0 22
  • Java基礎(chǔ)常見英語詞匯(共70個(gè))['?bd?ekt] ['?:rientid]導(dǎo)向的 ...
    今夜子辰閱讀 3,478評論 1 34
  • Apache Flink是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計(jì)算平臺,它能夠基于同一個(gè)Flink運(yùn)行時(shí),...
    康康6840閱讀 1,290評論 0 7

友情鏈接更多精彩內(nèi)容