Flink-1.10 源碼筆記 JobGraph生成

Flink源碼 - 生成JobGraph

什么是jobGraph

如果沒(méi)了解StreamGraph單擊我

JobGraph:StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。

在jobgraph中有一個(gè)組成“元素”:JobVertex:jobVertex用于產(chǎn)生intermediateDataset,并通過(guò)jobEdge串聯(lián)不同的jobVertex同時(shí)也是將operator chain的"關(guān)鍵點(diǎn)"。 jobVertex是從job層面對(duì)task進(jìn)行抽象。

在獲取到StreamGraph后,繼續(xù)分析,如何通過(guò)StreamGraph來(lái)生成JobGraph,StreamGraph和JobGraph都是在Client端生成的,我們可以通過(guò)debug斷點(diǎn)觀察Grpah的生成過(guò)程

JobGraph的入口在StreamingJobGraphGenerator.createJobGraph()方法中
    //創(chuàng)建JobGraph
    public static JobGraph createJobGraph(StreamGraph streamGraph) {
        return createJobGraph(streamGraph, null);
    }

    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {

        //StreamingJobGraphGenerator 傳入StreamGraph,以及jobId,初始化StreamingJobGraphGenerator
        //并創(chuàng)建JobGraph對(duì)象
        //createJobGraph  具體JobGraph生成的邏輯

        return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
    }

進(jìn)入到StreamingJobGraphGenerator方法中,這個(gè)方法初始化了一些需要的參數(shù),并通過(guò),JobID和JobName創(chuàng)建了一個(gè)JobGraph,

private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {

        this.streamGraph = streamGraph;
        this.defaultStreamGraphHasher = new StreamGraphHasherV2();
        this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());

        this.jobVertices = new HashMap<>();
        // 已經(jīng)構(gòu)建的JobVertex的id集合
        this.builtVertices = new HashSet<>();
        // 保存chain信息,部署時(shí)用來(lái)構(gòu)建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
        this.chainedConfigs = new HashMap<>();
        // 所有節(jié)點(diǎn)的配置信息,id -> StreamConfig
        this.vertexConfigs = new HashMap<>();
        // 保存每個(gè)節(jié)點(diǎn)的名字,id -> chainedName
        this.chainedNames = new HashMap<>();
        this.chainedMinResources = new HashMap<>();
        this.chainedPreferredResources = new HashMap<>();
        this.chainedInputOutputFormats = new HashMap<>();
        // 物理邊集合(排除了chain內(nèi)部的邊), 按創(chuàng)建順序排序
        this.physicalEdgesInOrder = new ArrayList<>();

        jobGraph = new JobGraph(jobID, streamGraph.getJobName());
    }

在調(diào)用createJobGraph這個(gè)方法時(shí)候開(kāi)始構(gòu)建JobGraph圖,進(jìn)入到createJobGraph方法中

在這里可以分為6 步

  1. 廣度遍歷StreamGraph,為每個(gè)SteramNode生成hash值,這個(gè)算法能夠保證如果拓補(bǔ)圖沒(méi)有發(fā)生改變,每次生成的hash值都是一樣的,同時(shí)會(huì)生成用戶指定hash值(即map.setUidHash("hash")的值),
    這里是根據(jù)StreamGraph的配置生成的,給每個(gè)StreamNode生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組的hash值,這個(gè)散列值用于生成JobGraph的JobVertex的id,在checkpoint恢復(fù)的時(shí)候會(huì)根據(jù)jobVertexId為依據(jù),所以在重啟的任務(wù)的時(shí)候,對(duì)于對(duì)于相同的任務(wù),其各JobVertexID能夠保持不變,而StreamGraph中各個(gè)StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在對(duì)數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換的過(guò)程中,通過(guò)一個(gè)靜態(tài)的累加器生成的,比如有多個(gè)數(shù)據(jù)源時(shí),每個(gè)數(shù)據(jù)源添加的順序不一致,則有可能導(dǎo)致相同數(shù)據(jù)處理邏輯的任務(wù),就會(huì)對(duì)應(yīng)于不同的ID,所以為了得到確定的ID,在進(jìn)行JobVertexID的產(chǎn)生時(shí),需要以一種確定的方式來(lái)確定其值,要么是通過(guò)用戶為每個(gè)ID直接指定對(duì)應(yīng)的一個(gè)散列值,要么參考StreamGraph中的一些特征,為每個(gè)JobVertex產(chǎn)生一個(gè)確定的ID。
    如果用戶指定了hash值,則會(huì)基于用戶的指定的值生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組,如果用戶沒(méi)有指定,則會(huì)由flink生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組
  2. 構(gòu)建任務(wù)鏈,屬于Flink對(duì)我們的任務(wù)進(jìn)行調(diào)優(yōu),在這里會(huì)遍歷所有的source節(jié)點(diǎn)開(kāi)始構(gòu)建chain,當(dāng)條件滿足的時(shí)候會(huì)將兩個(gè)operator放到一個(gè)線程并行執(zhí)行,這也可以減少網(wǎng)絡(luò)的傳輸,并且在chain中的operator之間傳輸數(shù)據(jù)也不需要進(jìn)行序列化和反序列化,可以提交任務(wù)執(zhí)行效率
  3. 將每個(gè)JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中(出邊在構(gòu)建chain的時(shí)候已經(jīng)寫(xiě)入conf中).
    4.為每個(gè)JobVertex指定 slot sharing group,以及CoLocationGroup
    5.設(shè)置托管內(nèi)存分?jǐn)?shù)(權(quán)重比)
    6.配置檢查點(diǎn),保存點(diǎn)恢復(fù)模式以及添加用戶分布式緩存的文件
private JobGraph createJobGraph() {

        // 創(chuàng)建之前進(jìn)行預(yù)檢查
        preValidate();

        // make sure that all vertices start immediately
        //設(shè)置調(diào)度模式,    --  todo 默認(rèn)是所有job都啟動(dòng)
        jobGraph.setScheduleMode(streamGraph.getScheduleMode());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        /**
         * todo defaultStreamGraphHasher 使用的是StreamGraphHasherV2類(lèi),
         *      legacyStreamGraphHashers使用的是StreamGraphUserHashHasher類(lèi)
         *      兩個(gè)類(lèi)都實(shí)現(xiàn)了StreamGraphHasher接口,在使用userHash的時(shí)候,是調(diào)用
         *      我們?cè)谒阕雍竺嬲{(diào)用.setUidHash("hash")傳入的值生成的
         */
        // todo 廣度優(yōu)先遍歷遍歷StreamGraph,并且為每個(gè)SteamNode生成散列值, 這里的散列值產(chǎn)生算法,可以保證如
        //   果提交的拓?fù)錄](méi)有改變,則每次生成的散列值都是一樣的。一個(gè)StreamNode的ID對(duì)應(yīng)一個(gè)散列值。
        Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        // 為了保持兼容性創(chuàng)建的hash, 這里是user 傳入的setUidHash
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        //主要從source構(gòu)建任務(wù)鏈
        setChaining(hashes, legacyHashes);

        // 將每個(gè)JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中
        // (出邊集合已經(jīng)在setChaining的時(shí)候?qū)懭肓?
        setPhysicalEdges();

        //設(shè)置slot共享和coLocation。同一個(gè)coLocationGroup的task需要在同一個(gè)slot中運(yùn)行
        // CoLocationGroup =設(shè)置標(biāo)識(shí)共存組的鍵。具有相同共定位鍵的操作符將由調(diào)度器將它們對(duì)應(yīng)的子任務(wù)放到相同的槽中
        setSlotSharingAndCoLocation();

        // 設(shè)置托管內(nèi)存分?jǐn)?shù)(權(quán)重比) 
        setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getMinResources(),
            id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());


        // 配置檢查點(diǎn)
        configureCheckpointing();

        // 設(shè)置保存點(diǎn)恢復(fù)設(shè)置
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        // 添加用戶分布式緩存
        JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);

        // set the ExecutionConfig last when it has been finalized
        try {
            //設(shè)置 job execution 配置
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        }
        catch (IOException e) {
            throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
                    "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

主要分析一下setChaining方法中的邏輯

setChaining這里調(diào)用了createChain方法,傳入了source節(jié)點(diǎn)和節(jié)點(diǎn)在chain的索引位置,以及一個(gè)OperatorChainInfo對(duì)象,這個(gè)對(duì)用

    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
        // 從每個(gè)source節(jié)點(diǎn)構(gòu)建 任務(wù)鏈, 如果有多個(gè)source遍歷每個(gè)source調(diào)用createChain方法
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            createChain(
                    sourceNodeId,   //source 節(jié)點(diǎn)id
                    0,   // chain的索引位置 初始為0
                // 一個(gè)私有類(lèi) 遞歸調(diào)用期間幫助維護(hù)操作符鏈的信息
                //  sourceNodeId : chain的起始node id 為source
                //  hashes, legacyHashes : 是前面產(chǎn)生的StreamNode的hash值
                    new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
        }
    }

createChain方法

1.如果startNodeId已經(jīng)創(chuàng)建chain,那么直接返回一個(gè)空l(shuí)ist
2.如果沒(méi)有構(gòu)建,則開(kāi)始構(gòu)建chian

2.1 首先會(huì)循環(huán)當(dāng)前StreamNode的所有StreamEdge(StreamEdge記錄了下游節(jié)點(diǎn)id,根據(jù)StreamGraph可以獲 取指定id的StreamNode),通過(guò)isChainable方法去判斷兩個(gè)節(jié)點(diǎn)是否能夠chain在一起,如果能chain在一 起,將edge存入chainableOutputs集合,否則放入nonChainableOutputs集合
2.2 循環(huán)將chainableOutputs集合,如果有值,遞歸的調(diào)用createChain方法,當(dāng)循環(huán)將chainableOutputs集 合循環(huán)完,會(huì)循環(huán)nonChainableOutputs集合遞歸調(diào)用createChain方法,開(kāi)始創(chuàng)建新的chain,這時(shí)候是構(gòu) 建以下游StreamNode為頂點(diǎn)開(kāi)始構(gòu)建chain
2.3 如果當(dāng)前節(jié)點(diǎn)是chain的起始節(jié)點(diǎn),會(huì)創(chuàng)建一個(gè)JobVertex,然后循環(huán)chain的出邊(outEdge)調(diào)用connect 方法,將它指向的出邊(outEdge)通過(guò)JobEdge進(jìn)行連接,并將配置信息通過(guò)StreamConfig提供的 setTransitiveChainedTaskConfigs方法將配置信息保存到JobVertex的configuration中
如果是chain的中間節(jié)點(diǎn),那么會(huì)將配置信息寫(xiě)入到StreamConfig中,并寫(xiě)入到chainedConfigs中,其數(shù)據(jù) 結(jié)構(gòu)為 Map<Integer, Map<Integer, StreamConfig>> key為 chain的頂點(diǎn),value為每個(gè)中間節(jié)點(diǎn) 配置信息

private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
        Integer startNodeId = chainInfo.getStartNodeId();
        // builtVertices存放了已經(jīng)被構(gòu)建了的StreamNode ID,避免重復(fù)操作
        if (!builtVertices.contains(startNodeId)) {

            // 過(guò)渡用的出邊集合, 用來(lái)生成最終的 JobEdge,
            // 注意:存在某些StreamNode會(huì)連接到一起,比如source->map->flatMap,如果這幾個(gè)StreamNode連接到一起,
            // 則transitiveOutEdges是不包括 chain 內(nèi)部的邊,既不包含source->map的StreamEdge的
            List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

            // 可以與當(dāng)前節(jié)點(diǎn)鏈接的StreamEdge
            List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            // 不可以與當(dāng)前節(jié)點(diǎn)鏈接的StreamEdge
            List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

            // 獲取當(dāng)前處理node
            StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
            // 將當(dāng)前節(jié)點(diǎn)的出邊分成 chainable 和 nonChainable 兩類(lèi)
            // 分類(lèi)可以被chain的edge和不可被chain的edge,使用isChainable的方法判斷
            // 如果能chain 放入chainableOutputs集合,否則放入nonChainableOutputs集合
            for (StreamEdge outEdge : currentNode.getOutEdges()) {
                if (isChainable(outEdge, streamGraph)) {
                    chainableOutputs.add(outEdge);
                } else {
                    nonChainableOutputs.add(outEdge);
                }
            }

            // todo 遞歸調(diào)用
            // 對(duì)于每個(gè)可連接的StreamEdge,遞歸調(diào)用其目標(biāo)StreamNode,startNodeId保持不變,但是chainIndex會(huì)加1
            for (StreamEdge chainable : chainableOutputs) {
                // 如果是可被chain的StreamEdge,遞歸調(diào)用createChain方法
                // 注意currentNode是chainable.getTargetId(),也就是(downStreamNode)下游節(jié)點(diǎn)id
                // 遞歸直到currentNode的out edge為不可chain的edge,會(huì)執(zhí)行下一段for循環(huán),不可chain的邊被加入transitiveOutEdges,最終返回到遞歸最外層
                // 這樣以來(lái),transitiveOutEdges收集齊了整個(gè)chain所有的出邊
                transitiveOutEdges.addAll(
                        createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
            }

            // 對(duì)于每個(gè)不可連接的StreamEdge,則將對(duì)應(yīng)的StreamEdge就是當(dāng)前鏈的一個(gè)輸出StreamEdge,所以會(huì)添加到
            // transitiveOutEdges這個(gè)集合中 然后遞歸調(diào)用其Target node,注意,startNodeID變成了nonChainable這個(gè)
            // StreamEdge的輸出節(jié)點(diǎn)id,chainIndex也賦值為0,說(shuō)明重新開(kāi)始一條鏈的建立
            for (StreamEdge nonChainable : nonChainableOutputs) {
                // 將不可被chain的StreamEdge,添加到transitiveOutEdges集合中
                transitiveOutEdges.add(nonChainable);
                // 調(diào)用createChain,構(gòu)建新的chain
                // todo  這里傳入StreamNodeId 都是下游StreamNodeId, 并創(chuàng)建新的OperatorChainInfo對(duì)象;
                createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
            }

            //生成當(dāng)前節(jié)點(diǎn)的顯示名,如:"Keyed Aggregation -> Sink: Unnamed"
            chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
            // 設(shè)置chain的最小資源
            chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
            // 設(shè)置chain的首選資源
            chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

            OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));


            if (currentNode.getInputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
            }
            if (currentNode.getOutputFormat() != null) {
                getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
            }


            // 如果當(dāng)前節(jié)點(diǎn)是起始節(jié)點(diǎn),則直接創(chuàng)建 JobVertex 并返回 StreamConfig, 否則先創(chuàng)建一個(gè)空的 StreamConfig
            // createJobVertex 函數(shù)就是根據(jù) StreamNode 創(chuàng)建對(duì)應(yīng)的 JobVertex, 并返回了空的 StreamConfig
            // 如果currentNodeId和startNodeId相等,說(shuō)明需要?jiǎng)?chuàng)建一個(gè)新的chain,會(huì)生成一個(gè)JobVertex
            StreamConfig config = currentNodeId.equals(startNodeId)
                    ? createJobVertex(startNodeId, chainInfo)
                    : new StreamConfig(new Configuration());

            // 設(shè)置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
            // 其中包括 序列化器, StreamOperator, Checkpoint 等相關(guān)配置
            // 設(shè)置的Vertex屬性到config中
            setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

            // 如果相等 說(shuō)明處理到了當(dāng)前節(jié)點(diǎn),也就是說(shuō)chain的子節(jié)點(diǎn)處理完了 需要?jiǎng)?chuàng)建Edge將兩個(gè)JobVertex進(jìn)行連接
            // startNodeId是 chain的開(kāi)始節(jié)點(diǎn),  currentNodeId 是當(dāng)前節(jié)點(diǎn),  
            // todo 如果開(kāi)始節(jié)點(diǎn)是1 當(dāng)前節(jié)點(diǎn)是4, 那么他的邏輯順序則是  1 => 4  ,  1 => 3  ,  1 => 2 ,  1=>1
            //      當(dāng) 當(dāng)前節(jié)點(diǎn)等于開(kāi)始節(jié)點(diǎn)的時(shí)候, 說(shuō)明chain中間的節(jié)點(diǎn)處理完了, 需要處理 chain的頂點(diǎn), 即chain的開(kāi)始節(jié)點(diǎn)
            if (currentNodeId.equals(startNodeId)) {

                //如果是chain的起始節(jié)點(diǎn)。(不是chain的中間節(jié)點(diǎn),會(huì)被標(biāo)記成 chain start)
                // 意味著一個(gè)新chain的開(kāi)始
                config.setChainStart();
                config.setChainIndex(0);
                config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
                //我們也會(huì)把物理出邊寫(xiě)入配置, 部署時(shí)會(huì)用到
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

                //  將當(dāng)前節(jié)點(diǎn)(headOfChain)與所有出邊相連
                // 對(duì)于每一個(gè)chain,把它和指向下一個(gè)chain的出邊連接起來(lái)
                for (StreamEdge edge : transitiveOutEdges) {
                    //通過(guò)StreamEdge構(gòu)建出JobEdge,創(chuàng)建 IntermediateDataSet ,用來(lái)將JobVertex和JobEdge相連
                    connect(startNodeId, edge);
                }
                // 將chain中所有子節(jié)點(diǎn)的StreamConfig寫(xiě)入到 headOfChain 節(jié)點(diǎn)的 CHAINED_TASK_CONFIG 配置中
                config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

            } else {
                //如果是 chain 中的子節(jié)點(diǎn)
                chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());

                config.setChainIndex(chainIndex);
                // 獲取到被chain的節(jié)點(diǎn)
                StreamNode node = streamGraph.getStreamNode(currentNodeId);
                config.setOperatorName(node.getOperatorName());
                // 將當(dāng)前節(jié)點(diǎn)的StreamConfig添加到該chain的config集合中
                // 關(guān)聯(lián)chain內(nèi)節(jié)點(diǎn)的配置信息到chain的起始節(jié)點(diǎn)上
                chainedConfigs.get(startNodeId).put(currentNodeId, config);
            }

            config.setOperatorID(currentOperatorId);
            //如果節(jié)點(diǎn)的輸出StreamEdge已經(jīng)為空,則說(shuō)明是鏈的結(jié)尾
            if (chainableOutputs.isEmpty()) {
                config.setChainEnd();
            }
            // 返回連往chain外部的出邊集合
            return transitiveOutEdges;

        } else {
            //startNodeId 如果已經(jīng)構(gòu)建過(guò),則直接返回
            return new ArrayList<>();
        }
    }

看一下createChain內(nèi)調(diào)用的一些方法

isChainable

這個(gè)方法用于判斷兩個(gè)StreamEdge的源(即 源產(chǎn)生的operator<并不一定是 source operator>)是否可以chain一起進(jìn)行執(zhí)行,只有當(dāng)所有的條件都滿足的時(shí)候才返回true(即可以chain)

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        /** 獲取StreamEdge的源和目標(biāo)StreamNode */
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
        /**
         * 1、下游節(jié)點(diǎn)只有一個(gè)輸入
         * 4、上下游節(jié)點(diǎn)在一個(gè)槽位共享組內(nèi)
         * 5、下游節(jié)點(diǎn)的連接策略是 ALWAYS
         * 6、上游節(jié)點(diǎn)的連接策略是 HEAD 或者 ALWAYS
         * 7、sourceFactory不能是YieldingOperatorFactory
         * 8、edge 的分區(qū)函數(shù)是 ForwardPartitioner 的實(shí)例
         * 9、shuffle模式不能是batch模式
         * 10、上下游節(jié)點(diǎn)的并行度相等
         * 11、可以進(jìn)行節(jié)點(diǎn)連接操作
         */
        return downStreamVertex.getInEdges().size() == 1
                && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
                && (edge.getPartitioner() instanceof ForwardPartitioner)
                && edge.getShuffleMode() != ShuffleMode.BATCH
                && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
                && streamGraph.isChainingEnabled();
    }
createJobVertex

JobVertex只有在chain的頂點(diǎn)創(chuàng)建,chain的其余節(jié)點(diǎn)以配置信息的形式記錄在JobVertex中
這個(gè)方法首先會(huì)獲取StreamNode(chain的開(kāi)始節(jié)點(diǎn)),并創(chuàng)建一個(gè)JobVertexId,是根據(jù)當(dāng)前StreamNode的hash創(chuàng)建的, 獲取到chain頂點(diǎn)的operator hash對(duì) (用戶與flink生成),然后根據(jù)JobVertexName和JobVertexId以及hash對(duì)生成JobVertex,生成jobVertex后設(shè)置一些參數(shù),將jobVertex存入到j(luò)obGreaph中,到這里,一個(gè)JobVertex就生成完了

private StreamConfig createJobVertex(
            Integer streamNodeId,
            OperatorChainInfo chainInfo) {

        JobVertex jobVertex;
        // 獲取開(kāi)始節(jié)點(diǎn)
        StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);

        byte[] hash = chainInfo.getHash(streamNodeId);

        if (hash == null) {
            throw new IllegalStateException("Cannot find node hash. " +
                    "Did you generate them before calling this method?");
        }
        // 通過(guò)hash創(chuàng)建JobVertexId
        JobVertexID jobVertexId = new JobVertexID(hash);

        // 獲取任務(wù)鏈 operator的哈希值
        List<Tuple2<byte[], byte[]>> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
        List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
        if (chainedOperators != null) {
            for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
                OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
                operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
            }
        }
        // 如果chain是 輸人輸出格式, 創(chuàng)建InputOutputFormatVertex對(duì)象,   -- >  這個(gè)只是嘗試性的
        if (chainedInputOutputFormats.containsKey(streamNodeId)) {
            jobVertex = new InputOutputFormatVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,
                    operatorIDPairs);

            chainedInputOutputFormats
                .get(streamNodeId)
                .write(new TaskConfig(jobVertex.getConfiguration()));
        } else { // 否則創(chuàng)建 JobVertex 對(duì)象
            jobVertex = new JobVertex(
                    chainedNames.get(streamNodeId),
                    jobVertexId,
                    operatorIDPairs);
        }
        for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
            try {
                jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
            } catch (IOException e) {
                throw new FlinkRuntimeException(String.format(
                        "Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId)));
            }
        }

        jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));

        jobVertex.setInvokableClass(streamNode.getJobVertexClass());

        int parallelism = streamNode.getParallelism();

        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        } else {
            parallelism = jobVertex.getParallelism();
        }

        jobVertex.setMaxParallelism(streamNode.getMaxParallelism());

        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
        }

        // TODO: inherit InputDependencyConstraint from the head operator
        //  設(shè)置輸入狀態(tài)的情況下安排任務(wù)方式。  , 默認(rèn) 如果有可使用的輸入,則調(diào)度該任務(wù)
        jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());

        //將生成好的jobVertex存入jobVertices列表
        jobVertices.put(streamNodeId, jobVertex);
        //已經(jīng)構(gòu)建好的jobVertex Id列表
        builtVertices.add(streamNodeId);
        // 將jovVertex添加到JobGraph圖中
        jobGraph.addVertex(jobVertex);

        return new StreamConfig(jobVertex.getConfiguration());
    }

connect

這個(gè)方法主要用于 將兩個(gè)JobVertex連接在一起,首先會(huì)獲取到ShuffleMode(數(shù)據(jù)交互模式)生成resultPartitionType,然后通過(guò)不同的partition創(chuàng)建不同的JobEdge,如果是ForwardPartitioner或RescalePartitioner使用POINTWISE模式,否則使用ALL_TO_ALL(即 一個(gè)subTask對(duì)應(yīng)下游subTask是一對(duì)一還是一對(duì)多),最后connectNewDataSetAsInput方法,創(chuàng)建一個(gè)JobEdge,將兩個(gè)JobVertex連接在一起

    private void connect(Integer headOfChain, StreamEdge edge) {

        physicalEdgesInOrder.add(edge);

        Integer downStreamVertexID = edge.getTargetId();

        JobVertex headVertex = jobVertices.get(headOfChain);
        JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

        // 每次循環(huán) input數(shù)量+1
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

        //獲取 edge的分區(qū)模式 --
        StreamPartitioner<?> partitioner = edge.getPartitioner();

        //結(jié)果分區(qū)的類(lèi)型
        ResultPartitionType resultPartitionType;
        // 最終返回  PIPELINED_BOUNDED 或者  BLOCKING
        // 如果沒(méi)有定義,會(huì)進(jìn)行判斷全局?jǐn)?shù)據(jù)交換模式, 最終確定分區(qū)模式
        switch (edge.getShuffleMode()) {
            case PIPELINED:
                resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                break;
            case BATCH:
                resultPartitionType = ResultPartitionType.BLOCKING;
                break;
            case UNDEFINED:
                resultPartitionType = determineResultPartitionType(partitioner);
                break;
            default:
                throw new UnsupportedOperationException("Data exchange mode " +
                    edge.getShuffleMode() + " is not supported yet.");
        }

        JobEdge jobEdge;
        // 根據(jù)不同的分區(qū)模式創(chuàng)建JobEdge
        // 如果分區(qū)模式為ForwardPartitioner或RescalePartitioner;
        // 則使用 POINTWISE分配模式,  否則使用ALL_TO_ALL
        if (isPointwisePartitioner(partitioner)) {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                headVertex,
                // 每個(gè)生產(chǎn)子任務(wù)都連接到消費(fèi)任務(wù)的一個(gè)或多個(gè)子任務(wù)
                DistributionPattern.POINTWISE,
                resultPartitionType);
        } else {
            jobEdge = downStreamVertex.connectNewDataSetAsInput(
                    headVertex,
                    // 每個(gè)生產(chǎn)子任務(wù)都與消費(fèi)任務(wù)的子任務(wù)相連接
                    DistributionPattern.ALL_TO_ALL,
                    resultPartitionType);
        }
        // set strategy name so that web interface can show it.
        // 設(shè)置分發(fā)策略模式,用于 web顯示
        jobEdge.setShipStrategyName(partitioner.toString());

        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
                    headOfChain, downStreamVertexID);
        }
    }
downStreamVertex.connectNewDataSetAsInput方法

首先由input的JobVertex(即上游),創(chuàng)建一個(gè)IntermediateDataSet(簡(jiǎn)稱(chēng) dataSet),并將自己添加到dataSet的producer中,,然后當(dāng)前JobVertex(即下游)構(gòu)建一個(gè)JobEdge,并將edge添加到inputs的列表中,并將dataSet的consumer進(jìn)行賦值為JobEdge, 這是JobEdge已經(jīng)構(gòu)建完畢了

JobEdge中的dataSet中有兩個(gè)參數(shù)(producer,consumer),分別對(duì)應(yīng)生產(chǎn)者(上游JobVertex)和消費(fèi)者(下游JobVertex),通過(guò)這種形式將兩個(gè)JobVertex進(jìn)行連接

    public JobEdge connectNewDataSetAsInput(
            JobVertex input,
            DistributionPattern distPattern,
            ResultPartitionType partitionType) {

        // 由上游節(jié)點(diǎn) 創(chuàng)建一個(gè)IntermediateDataSet
        // todo 中間數(shù)據(jù)集是操作符 - sour操作或任何中間操作 - 產(chǎn)生的數(shù)據(jù)集
        //      中間數(shù)據(jù)集可能被其他操作符讀取、具體化或丟棄。
        IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

        // 由此可以看出, 他的輸入就是上游節(jié)點(diǎn)的一個(gè)輸出
        // JobEdge的輸入就是當(dāng)前節(jié)點(diǎn)
        // 構(gòu)建一個(gè)jobEdge
        JobEdge edge = new JobEdge(dataSet, this, distPattern);
        // jobEdge 連接edge
        this.inputs.add(edge);
        dataSet.addConsumer(edge);
        return edge;
    }

構(gòu)建JobGraph最主要的邏輯就是構(gòu)建chain,源碼中剩下方法就是設(shè)置了一些JobGraph的參數(shù),比如設(shè)置slot共享組,設(shè)置托管內(nèi)存分?jǐn)?shù),配置檢查點(diǎn)等內(nèi)容,最終會(huì)將JobGraph構(gòu)建完成并返回

下面是一段簡(jiǎn)單的代碼,我們看一下他的生成JobGraph的過(guò)程

簡(jiǎn)單的代碼

在StreamGraph向JobGraph轉(zhuǎn)換的時(shí)候,會(huì)將兩個(gè)或多個(gè)算子chain一起進(jìn)行執(zhí)行,這是Flink對(duì)我們的程序進(jìn)行的一個(gè)優(yōu)化,

StreamGraph向JobGraph轉(zhuǎn)換

現(xiàn)在我看一下 具體任務(wù)執(zhí)行后的樣子

任務(wù)運(yùn)行時(shí)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容