Flink源碼 - 生成JobGraph
什么是jobGraph
JobGraph:StreamGraph經(jīng)過(guò)優(yōu)化后生成了 JobGraph,提交給 JobManager 的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個(gè)符合條件的節(jié)點(diǎn) chain 在一起作為一個(gè)節(jié)點(diǎn),這樣可以減少數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)所需要的序列化/反序列化/傳輸消耗。
在jobgraph中有一個(gè)組成“元素”:JobVertex:jobVertex用于產(chǎn)生intermediateDataset,并通過(guò)jobEdge串聯(lián)不同的jobVertex同時(shí)也是將operator chain的"關(guān)鍵點(diǎn)"。 jobVertex是從job層面對(duì)task進(jìn)行抽象。
在獲取到StreamGraph后,繼續(xù)分析,如何通過(guò)StreamGraph來(lái)生成JobGraph,StreamGraph和JobGraph都是在Client端生成的,我們可以通過(guò)debug斷點(diǎn)觀察Grpah的生成過(guò)程
JobGraph的入口在StreamingJobGraphGenerator.createJobGraph()方法中
//創(chuàng)建JobGraph
public static JobGraph createJobGraph(StreamGraph streamGraph) {
return createJobGraph(streamGraph, null);
}
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
//StreamingJobGraphGenerator 傳入StreamGraph,以及jobId,初始化StreamingJobGraphGenerator
//并創(chuàng)建JobGraph對(duì)象
//createJobGraph 具體JobGraph生成的邏輯
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
進(jìn)入到StreamingJobGraphGenerator方法中,這個(gè)方法初始化了一些需要的參數(shù),并通過(guò),JobID和JobName創(chuàng)建了一個(gè)JobGraph,
private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
this.jobVertices = new HashMap<>();
// 已經(jīng)構(gòu)建的JobVertex的id集合
this.builtVertices = new HashSet<>();
// 保存chain信息,部署時(shí)用來(lái)構(gòu)建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
this.chainedConfigs = new HashMap<>();
// 所有節(jié)點(diǎn)的配置信息,id -> StreamConfig
this.vertexConfigs = new HashMap<>();
// 保存每個(gè)節(jié)點(diǎn)的名字,id -> chainedName
this.chainedNames = new HashMap<>();
this.chainedMinResources = new HashMap<>();
this.chainedPreferredResources = new HashMap<>();
this.chainedInputOutputFormats = new HashMap<>();
// 物理邊集合(排除了chain內(nèi)部的邊), 按創(chuàng)建順序排序
this.physicalEdgesInOrder = new ArrayList<>();
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}
在調(diào)用createJobGraph這個(gè)方法時(shí)候開(kāi)始構(gòu)建JobGraph圖,進(jìn)入到createJobGraph方法中
在這里可以分為6 步
- 廣度遍歷StreamGraph,為每個(gè)SteramNode生成hash值,這個(gè)算法能夠保證如果拓補(bǔ)圖沒(méi)有發(fā)生改變,每次生成的hash值都是一樣的,同時(shí)會(huì)生成用戶指定hash值(即map.setUidHash("hash")的值),
這里是根據(jù)StreamGraph的配置生成的,給每個(gè)StreamNode生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組的hash值,這個(gè)散列值用于生成JobGraph的JobVertex的id,在checkpoint恢復(fù)的時(shí)候會(huì)根據(jù)jobVertexId為依據(jù),所以在重啟的任務(wù)的時(shí)候,對(duì)于對(duì)于相同的任務(wù),其各JobVertexID能夠保持不變,而StreamGraph中各個(gè)StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在對(duì)數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換的過(guò)程中,通過(guò)一個(gè)靜態(tài)的累加器生成的,比如有多個(gè)數(shù)據(jù)源時(shí),每個(gè)數(shù)據(jù)源添加的順序不一致,則有可能導(dǎo)致相同數(shù)據(jù)處理邏輯的任務(wù),就會(huì)對(duì)應(yīng)于不同的ID,所以為了得到確定的ID,在進(jìn)行JobVertexID的產(chǎn)生時(shí),需要以一種確定的方式來(lái)確定其值,要么是通過(guò)用戶為每個(gè)ID直接指定對(duì)應(yīng)的一個(gè)散列值,要么參考StreamGraph中的一些特征,為每個(gè)JobVertex產(chǎn)生一個(gè)確定的ID。
如果用戶指定了hash值,則會(huì)基于用戶的指定的值生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組,如果用戶沒(méi)有指定,則會(huì)由flink生成一個(gè)長(zhǎng)度為16的字節(jié)數(shù)組- 構(gòu)建任務(wù)鏈,屬于Flink對(duì)我們的任務(wù)進(jìn)行調(diào)優(yōu),在這里會(huì)遍歷所有的source節(jié)點(diǎn)開(kāi)始構(gòu)建chain,當(dāng)條件滿足的時(shí)候會(huì)將兩個(gè)operator放到一個(gè)線程并行執(zhí)行,這也可以減少網(wǎng)絡(luò)的傳輸,并且在chain中的operator之間傳輸數(shù)據(jù)也不需要進(jìn)行序列化和反序列化,可以提交任務(wù)執(zhí)行效率
- 將每個(gè)JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中(出邊在構(gòu)建chain的時(shí)候已經(jīng)寫(xiě)入conf中).
4.為每個(gè)JobVertex指定 slot sharing group,以及CoLocationGroup
5.設(shè)置托管內(nèi)存分?jǐn)?shù)(權(quán)重比)
6.配置檢查點(diǎn),保存點(diǎn)恢復(fù)模式以及添加用戶分布式緩存的文件
private JobGraph createJobGraph() {
// 創(chuàng)建之前進(jìn)行預(yù)檢查
preValidate();
// make sure that all vertices start immediately
//設(shè)置調(diào)度模式, -- todo 默認(rèn)是所有job都啟動(dòng)
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
/**
* todo defaultStreamGraphHasher 使用的是StreamGraphHasherV2類(lèi),
* legacyStreamGraphHashers使用的是StreamGraphUserHashHasher類(lèi)
* 兩個(gè)類(lèi)都實(shí)現(xiàn)了StreamGraphHasher接口,在使用userHash的時(shí)候,是調(diào)用
* 我們?cè)谒阕雍竺嬲{(diào)用.setUidHash("hash")傳入的值生成的
*/
// todo 廣度優(yōu)先遍歷遍歷StreamGraph,并且為每個(gè)SteamNode生成散列值, 這里的散列值產(chǎn)生算法,可以保證如
// 果提交的拓?fù)錄](méi)有改變,則每次生成的散列值都是一樣的。一個(gè)StreamNode的ID對(duì)應(yīng)一個(gè)散列值。
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
// 為了保持兼容性創(chuàng)建的hash, 這里是user 傳入的setUidHash
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
//主要從source構(gòu)建任務(wù)鏈
setChaining(hashes, legacyHashes);
// 將每個(gè)JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中
// (出邊集合已經(jīng)在setChaining的時(shí)候?qū)懭肓?
setPhysicalEdges();
//設(shè)置slot共享和coLocation。同一個(gè)coLocationGroup的task需要在同一個(gè)slot中運(yùn)行
// CoLocationGroup =設(shè)置標(biāo)識(shí)共存組的鍵。具有相同共定位鍵的操作符將由調(diào)度器將它們對(duì)應(yīng)的子任務(wù)放到相同的槽中
setSlotSharingAndCoLocation();
// 設(shè)置托管內(nèi)存分?jǐn)?shù)(權(quán)重比)
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getMinResources(),
id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
// 配置檢查點(diǎn)
configureCheckpointing();
// 設(shè)置保存點(diǎn)恢復(fù)設(shè)置
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
// 添加用戶分布式緩存
JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
//設(shè)置 job execution 配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
主要分析一下setChaining方法中的邏輯
setChaining這里調(diào)用了createChain方法,傳入了source節(jié)點(diǎn)和節(jié)點(diǎn)在chain的索引位置,以及一個(gè)OperatorChainInfo對(duì)象,這個(gè)對(duì)用
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// 從每個(gè)source節(jié)點(diǎn)構(gòu)建 任務(wù)鏈, 如果有多個(gè)source遍歷每個(gè)source調(diào)用createChain方法
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(
sourceNodeId, //source 節(jié)點(diǎn)id
0, // chain的索引位置 初始為0
// 一個(gè)私有類(lèi) 遞歸調(diào)用期間幫助維護(hù)操作符鏈的信息
// sourceNodeId : chain的起始node id 為source
// hashes, legacyHashes : 是前面產(chǎn)生的StreamNode的hash值
new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
}
}
createChain方法
1.如果startNodeId已經(jīng)創(chuàng)建chain,那么直接返回一個(gè)空l(shuí)ist
2.如果沒(méi)有構(gòu)建,則開(kāi)始構(gòu)建chian2.1 首先會(huì)循環(huán)當(dāng)前StreamNode的所有StreamEdge(StreamEdge記錄了下游節(jié)點(diǎn)id,根據(jù)StreamGraph可以獲 取指定id的StreamNode),通過(guò)isChainable方法去判斷兩個(gè)節(jié)點(diǎn)是否能夠chain在一起,如果能chain在一 起,將edge存入chainableOutputs集合,否則放入nonChainableOutputs集合
2.2 循環(huán)將chainableOutputs集合,如果有值,遞歸的調(diào)用createChain方法,當(dāng)循環(huán)將chainableOutputs集 合循環(huán)完,會(huì)循環(huán)nonChainableOutputs集合遞歸調(diào)用createChain方法,開(kāi)始創(chuàng)建新的chain,這時(shí)候是構(gòu) 建以下游StreamNode為頂點(diǎn)開(kāi)始構(gòu)建chain
2.3 如果當(dāng)前節(jié)點(diǎn)是chain的起始節(jié)點(diǎn),會(huì)創(chuàng)建一個(gè)JobVertex,然后循環(huán)chain的出邊(outEdge)調(diào)用connect 方法,將它指向的出邊(outEdge)通過(guò)JobEdge進(jìn)行連接,并將配置信息通過(guò)StreamConfig提供的 setTransitiveChainedTaskConfigs方法將配置信息保存到JobVertex的configuration中
如果是chain的中間節(jié)點(diǎn),那么會(huì)將配置信息寫(xiě)入到StreamConfig中,并寫(xiě)入到chainedConfigs中,其數(shù)據(jù) 結(jié)構(gòu)為 Map<Integer, Map<Integer, StreamConfig>> key為 chain的頂點(diǎn),value為每個(gè)中間節(jié)點(diǎn) 配置信息
private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
Integer startNodeId = chainInfo.getStartNodeId();
// builtVertices存放了已經(jīng)被構(gòu)建了的StreamNode ID,避免重復(fù)操作
if (!builtVertices.contains(startNodeId)) {
// 過(guò)渡用的出邊集合, 用來(lái)生成最終的 JobEdge,
// 注意:存在某些StreamNode會(huì)連接到一起,比如source->map->flatMap,如果這幾個(gè)StreamNode連接到一起,
// 則transitiveOutEdges是不包括 chain 內(nèi)部的邊,既不包含source->map的StreamEdge的
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// 可以與當(dāng)前節(jié)點(diǎn)鏈接的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// 不可以與當(dāng)前節(jié)點(diǎn)鏈接的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 獲取當(dāng)前處理node
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 將當(dāng)前節(jié)點(diǎn)的出邊分成 chainable 和 nonChainable 兩類(lèi)
// 分類(lèi)可以被chain的edge和不可被chain的edge,使用isChainable的方法判斷
// 如果能chain 放入chainableOutputs集合,否則放入nonChainableOutputs集合
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
// todo 遞歸調(diào)用
// 對(duì)于每個(gè)可連接的StreamEdge,遞歸調(diào)用其目標(biāo)StreamNode,startNodeId保持不變,但是chainIndex會(huì)加1
for (StreamEdge chainable : chainableOutputs) {
// 如果是可被chain的StreamEdge,遞歸調(diào)用createChain方法
// 注意currentNode是chainable.getTargetId(),也就是(downStreamNode)下游節(jié)點(diǎn)id
// 遞歸直到currentNode的out edge為不可chain的edge,會(huì)執(zhí)行下一段for循環(huán),不可chain的邊被加入transitiveOutEdges,最終返回到遞歸最外層
// 這樣以來(lái),transitiveOutEdges收集齊了整個(gè)chain所有的出邊
transitiveOutEdges.addAll(
createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
}
// 對(duì)于每個(gè)不可連接的StreamEdge,則將對(duì)應(yīng)的StreamEdge就是當(dāng)前鏈的一個(gè)輸出StreamEdge,所以會(huì)添加到
// transitiveOutEdges這個(gè)集合中 然后遞歸調(diào)用其Target node,注意,startNodeID變成了nonChainable這個(gè)
// StreamEdge的輸出節(jié)點(diǎn)id,chainIndex也賦值為0,說(shuō)明重新開(kāi)始一條鏈的建立
for (StreamEdge nonChainable : nonChainableOutputs) {
// 將不可被chain的StreamEdge,添加到transitiveOutEdges集合中
transitiveOutEdges.add(nonChainable);
// 調(diào)用createChain,構(gòu)建新的chain
// todo 這里傳入StreamNodeId 都是下游StreamNodeId, 并創(chuàng)建新的OperatorChainInfo對(duì)象;
createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
}
//生成當(dāng)前節(jié)點(diǎn)的顯示名,如:"Keyed Aggregation -> Sink: Unnamed"
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
// 設(shè)置chain的最小資源
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 設(shè)置chain的首選資源
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 如果當(dāng)前節(jié)點(diǎn)是起始節(jié)點(diǎn),則直接創(chuàng)建 JobVertex 并返回 StreamConfig, 否則先創(chuàng)建一個(gè)空的 StreamConfig
// createJobVertex 函數(shù)就是根據(jù) StreamNode 創(chuàng)建對(duì)應(yīng)的 JobVertex, 并返回了空的 StreamConfig
// 如果currentNodeId和startNodeId相等,說(shuō)明需要?jiǎng)?chuàng)建一個(gè)新的chain,會(huì)生成一個(gè)JobVertex
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 設(shè)置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
// 其中包括 序列化器, StreamOperator, Checkpoint 等相關(guān)配置
// 設(shè)置的Vertex屬性到config中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
// 如果相等 說(shuō)明處理到了當(dāng)前節(jié)點(diǎn),也就是說(shuō)chain的子節(jié)點(diǎn)處理完了 需要?jiǎng)?chuàng)建Edge將兩個(gè)JobVertex進(jìn)行連接
// startNodeId是 chain的開(kāi)始節(jié)點(diǎn), currentNodeId 是當(dāng)前節(jié)點(diǎn),
// todo 如果開(kāi)始節(jié)點(diǎn)是1 當(dāng)前節(jié)點(diǎn)是4, 那么他的邏輯順序則是 1 => 4 , 1 => 3 , 1 => 2 , 1=>1
// 當(dāng) 當(dāng)前節(jié)點(diǎn)等于開(kāi)始節(jié)點(diǎn)的時(shí)候, 說(shuō)明chain中間的節(jié)點(diǎn)處理完了, 需要處理 chain的頂點(diǎn), 即chain的開(kāi)始節(jié)點(diǎn)
if (currentNodeId.equals(startNodeId)) {
//如果是chain的起始節(jié)點(diǎn)。(不是chain的中間節(jié)點(diǎn),會(huì)被標(biāo)記成 chain start)
// 意味著一個(gè)新chain的開(kāi)始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
//我們也會(huì)把物理出邊寫(xiě)入配置, 部署時(shí)會(huì)用到
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
// 將當(dāng)前節(jié)點(diǎn)(headOfChain)與所有出邊相連
// 對(duì)于每一個(gè)chain,把它和指向下一個(gè)chain的出邊連接起來(lái)
for (StreamEdge edge : transitiveOutEdges) {
//通過(guò)StreamEdge構(gòu)建出JobEdge,創(chuàng)建 IntermediateDataSet ,用來(lái)將JobVertex和JobEdge相連
connect(startNodeId, edge);
}
// 將chain中所有子節(jié)點(diǎn)的StreamConfig寫(xiě)入到 headOfChain 節(jié)點(diǎn)的 CHAINED_TASK_CONFIG 配置中
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
//如果是 chain 中的子節(jié)點(diǎn)
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
// 獲取到被chain的節(jié)點(diǎn)
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
// 將當(dāng)前節(jié)點(diǎn)的StreamConfig添加到該chain的config集合中
// 關(guān)聯(lián)chain內(nèi)節(jié)點(diǎn)的配置信息到chain的起始節(jié)點(diǎn)上
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
//如果節(jié)點(diǎn)的輸出StreamEdge已經(jīng)為空,則說(shuō)明是鏈的結(jié)尾
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// 返回連往chain外部的出邊集合
return transitiveOutEdges;
} else {
//startNodeId 如果已經(jīng)構(gòu)建過(guò),則直接返回
return new ArrayList<>();
}
}
看一下createChain內(nèi)調(diào)用的一些方法
isChainable
這個(gè)方法用于判斷兩個(gè)StreamEdge的源(即 源產(chǎn)生的operator<并不一定是 source operator>)是否可以chain一起進(jìn)行執(zhí)行,只有當(dāng)所有的條件都滿足的時(shí)候才返回true(即可以chain)
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
/** 獲取StreamEdge的源和目標(biāo)StreamNode */
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
/**
* 1、下游節(jié)點(diǎn)只有一個(gè)輸入
* 4、上下游節(jié)點(diǎn)在一個(gè)槽位共享組內(nèi)
* 5、下游節(jié)點(diǎn)的連接策略是 ALWAYS
* 6、上游節(jié)點(diǎn)的連接策略是 HEAD 或者 ALWAYS
* 7、sourceFactory不能是YieldingOperatorFactory
* 8、edge 的分區(qū)函數(shù)是 ForwardPartitioner 的實(shí)例
* 9、shuffle模式不能是batch模式
* 10、上下游節(jié)點(diǎn)的并行度相等
* 11、可以進(jìn)行節(jié)點(diǎn)連接操作
*/
return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
createJobVertex
JobVertex只有在chain的頂點(diǎn)創(chuàng)建,chain的其余節(jié)點(diǎn)以配置信息的形式記錄在JobVertex中
這個(gè)方法首先會(huì)獲取StreamNode(chain的開(kāi)始節(jié)點(diǎn)),并創(chuàng)建一個(gè)JobVertexId,是根據(jù)當(dāng)前StreamNode的hash創(chuàng)建的, 獲取到chain頂點(diǎn)的operator hash對(duì) (用戶與flink生成),然后根據(jù)JobVertexName和JobVertexId以及hash對(duì)生成JobVertex,生成jobVertex后設(shè)置一些參數(shù),將jobVertex存入到j(luò)obGreaph中,到這里,一個(gè)JobVertex就生成完了
private StreamConfig createJobVertex(
Integer streamNodeId,
OperatorChainInfo chainInfo) {
JobVertex jobVertex;
// 獲取開(kāi)始節(jié)點(diǎn)
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
byte[] hash = chainInfo.getHash(streamNodeId);
if (hash == null) {
throw new IllegalStateException("Cannot find node hash. " +
"Did you generate them before calling this method?");
}
// 通過(guò)hash創(chuàng)建JobVertexId
JobVertexID jobVertexId = new JobVertexID(hash);
// 獲取任務(wù)鏈 operator的哈希值
List<Tuple2<byte[], byte[]>> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
if (chainedOperators != null) {
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
}
}
// 如果chain是 輸人輸出格式, 創(chuàng)建InputOutputFormatVertex對(duì)象, -- > 這個(gè)只是嘗試性的
if (chainedInputOutputFormats.containsKey(streamNodeId)) {
jobVertex = new InputOutputFormatVertex(
chainedNames.get(streamNodeId),
jobVertexId,
operatorIDPairs);
chainedInputOutputFormats
.get(streamNodeId)
.write(new TaskConfig(jobVertex.getConfiguration()));
} else { // 否則創(chuàng)建 JobVertex 對(duì)象
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId,
operatorIDPairs);
}
for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
try {
jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
} catch (IOException e) {
throw new FlinkRuntimeException(String.format(
"Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId)));
}
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
int parallelism = streamNode.getParallelism();
if (parallelism > 0) {
jobVertex.setParallelism(parallelism);
} else {
parallelism = jobVertex.getParallelism();
}
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
}
// TODO: inherit InputDependencyConstraint from the head operator
// 設(shè)置輸入狀態(tài)的情況下安排任務(wù)方式。 , 默認(rèn) 如果有可使用的輸入,則調(diào)度該任務(wù)
jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
//將生成好的jobVertex存入jobVertices列表
jobVertices.put(streamNodeId, jobVertex);
//已經(jīng)構(gòu)建好的jobVertex Id列表
builtVertices.add(streamNodeId);
// 將jovVertex添加到JobGraph圖中
jobGraph.addVertex(jobVertex);
return new StreamConfig(jobVertex.getConfiguration());
}
connect
這個(gè)方法主要用于 將兩個(gè)JobVertex連接在一起,首先會(huì)獲取到ShuffleMode(數(shù)據(jù)交互模式)生成resultPartitionType,然后通過(guò)不同的partition創(chuàng)建不同的JobEdge,如果是ForwardPartitioner或RescalePartitioner使用POINTWISE模式,否則使用ALL_TO_ALL(即 一個(gè)subTask對(duì)應(yīng)下游subTask是一對(duì)一還是一對(duì)多),最后connectNewDataSetAsInput方法,創(chuàng)建一個(gè)JobEdge,將兩個(gè)JobVertex連接在一起
private void connect(Integer headOfChain, StreamEdge edge) {
physicalEdgesInOrder.add(edge);
Integer downStreamVertexID = edge.getTargetId();
JobVertex headVertex = jobVertices.get(headOfChain);
JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
// 每次循環(huán) input數(shù)量+1
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
//獲取 edge的分區(qū)模式 --
StreamPartitioner<?> partitioner = edge.getPartitioner();
//結(jié)果分區(qū)的類(lèi)型
ResultPartitionType resultPartitionType;
// 最終返回 PIPELINED_BOUNDED 或者 BLOCKING
// 如果沒(méi)有定義,會(huì)進(jìn)行判斷全局?jǐn)?shù)據(jù)交換模式, 最終確定分區(qū)模式
switch (edge.getShuffleMode()) {
case PIPELINED:
resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
break;
case BATCH:
resultPartitionType = ResultPartitionType.BLOCKING;
break;
case UNDEFINED:
resultPartitionType = determineResultPartitionType(partitioner);
break;
default:
throw new UnsupportedOperationException("Data exchange mode " +
edge.getShuffleMode() + " is not supported yet.");
}
JobEdge jobEdge;
// 根據(jù)不同的分區(qū)模式創(chuàng)建JobEdge
// 如果分區(qū)模式為ForwardPartitioner或RescalePartitioner;
// 則使用 POINTWISE分配模式, 否則使用ALL_TO_ALL
if (isPointwisePartitioner(partitioner)) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 每個(gè)生產(chǎn)子任務(wù)都連接到消費(fèi)任務(wù)的一個(gè)或多個(gè)子任務(wù)
DistributionPattern.POINTWISE,
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 每個(gè)生產(chǎn)子任務(wù)都與消費(fèi)任務(wù)的子任務(wù)相連接
DistributionPattern.ALL_TO_ALL,
resultPartitionType);
}
// set strategy name so that web interface can show it.
// 設(shè)置分發(fā)策略模式,用于 web顯示
jobEdge.setShipStrategyName(partitioner.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
headOfChain, downStreamVertexID);
}
}
downStreamVertex.connectNewDataSetAsInput方法
首先由input的JobVertex(即上游),創(chuàng)建一個(gè)IntermediateDataSet(簡(jiǎn)稱(chēng) dataSet),并將自己添加到dataSet的producer中,,然后當(dāng)前JobVertex(即下游)構(gòu)建一個(gè)JobEdge,并將edge添加到inputs的列表中,并將dataSet的consumer進(jìn)行賦值為JobEdge, 這是JobEdge已經(jīng)構(gòu)建完畢了
JobEdge中的dataSet中有兩個(gè)參數(shù)(producer,consumer),分別對(duì)應(yīng)生產(chǎn)者(上游JobVertex)和消費(fèi)者(下游JobVertex),通過(guò)這種形式將兩個(gè)JobVertex進(jìn)行連接
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
// 由上游節(jié)點(diǎn) 創(chuàng)建一個(gè)IntermediateDataSet
// todo 中間數(shù)據(jù)集是操作符 - sour操作或任何中間操作 - 產(chǎn)生的數(shù)據(jù)集
// 中間數(shù)據(jù)集可能被其他操作符讀取、具體化或丟棄。
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
// 由此可以看出, 他的輸入就是上游節(jié)點(diǎn)的一個(gè)輸出
// JobEdge的輸入就是當(dāng)前節(jié)點(diǎn)
// 構(gòu)建一個(gè)jobEdge
JobEdge edge = new JobEdge(dataSet, this, distPattern);
// jobEdge 連接edge
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
構(gòu)建JobGraph最主要的邏輯就是構(gòu)建chain,源碼中剩下方法就是設(shè)置了一些JobGraph的參數(shù),比如設(shè)置slot共享組,設(shè)置托管內(nèi)存分?jǐn)?shù),配置檢查點(diǎn)等內(nèi)容,最終會(huì)將JobGraph構(gòu)建完成并返回
下面是一段簡(jiǎn)單的代碼,我們看一下他的生成JobGraph的過(guò)程

在StreamGraph向JobGraph轉(zhuǎn)換的時(shí)候,會(huì)將兩個(gè)或多個(gè)算子chain一起進(jìn)行執(zhí)行,這是Flink對(duì)我們的程序進(jìn)行的一個(gè)優(yōu)化,

現(xiàn)在我看一下 具體任務(wù)執(zhí)行后的樣子
