一。JobVertex
在jobgraph中有一個(gè)組成“元素”:JobVertex是不得不提的:jobvertex用于產(chǎn)生intermediate dataset,并通過jobedge串聯(lián)不同的jobvertex同時(shí)也是將operator chain的"關(guān)鍵點(diǎn)"。 jobvertex是從job層面對task進(jìn)行抽象。
二。源碼
第一部分:屬性或字段
// --------------------------------------------------------------------------------------------
// Members that define the structure / topology of the graph
// --------------------------------------------------------------------------------------------
// 當(dāng)前jobvertex id,與之關(guān)聯(lián)的對應(yīng)類:JobVertexID
// 關(guān)于jobvertex id產(chǎn)生:
//
/** The ID of the vertex. */
private final JobVertexID id;
// 候選vertex id
/** The alternative IDs of the vertex. */
private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();
// jobvertex關(guān)聯(lián)的operator
/** The IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();
// 候選operatorid
/** The alternative IDs of all operators contained in this vertex. */
private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();
// 當(dāng)前jobvertex產(chǎn)出的臨時(shí)中間數(shù)據(jù)集:IntermediateDataSets
/** List of produced data sets, one per writer */
private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();
// 當(dāng)前jobvertex提供給下游jobvertex讀取的通道:一個(gè)下游讀取vertex對應(yīng)一個(gè)reader關(guān)聯(lián)一個(gè)jobedge
/** List of edges with incoming data. One per Reader. */
private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();
// 該task在runtime時(shí)被分割的subtask
/** Number of subtasks to split this task into at runtime.*/
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
// 當(dāng)前task在runtime時(shí)被分割成subtask的最小任務(wù)
/** Maximum number of subtasks to split this task into a runtime. */
private int maxParallelism = -1;
// 當(dāng)前jobvertex需要最小程度的資源
/** The minimum resource of the vertex */
private ResourceSpec minResources = ResourceSpec.DEFAULT;
// 當(dāng)前jobvertex采用的最優(yōu)資源
/** The preferred resource of the vertex */
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
// 在runtime期間分配給當(dāng)前task的配置信息
/** Custom configuration passed to the assigned task at runtime. */
private Configuration configuration;
// 當(dāng)前task執(zhí)行invoke的class
/** The class of the invokable. */
private String invokableClassName;
// 標(biāo)示當(dāng)前jobvertex是否停止
/** Indicates of this job vertex is stoppable or not. */
private boolean isStoppable = false;
// input通過format被split
/** Optionally, a source of input splits */
private InputSplitSource<?> inputSplitSource;
// 當(dāng)前jobvertex的name
/** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
private String name;
// 通過定義sharing group來保證來自不同jobvertex能夠并發(fā)運(yùn)行在一個(gè)slot里面
// 特別是使用coLocation時(shí) 是需要對應(yīng)的task屬于同于一個(gè)sharding group
/** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
private SlotSharingGroup slotSharingGroup;
// jobvertex的子任務(wù)共享slot的組,operator chain
/** The group inside which the vertex subtasks share slots */
private CoLocationGroup coLocationGroup;
// 如下參數(shù)都是被記錄在json plan中的
// operator name
/** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */
private String operatorName;
// 針對當(dāng)前jobvertex的描述
/** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
* to be included in the JSON plan */
private String operatorDescription;
/** Optional, pretty name of the operator, to be displayed in the JSON plan */
private String operatorPrettyName;
// 主要記錄針對operator進(jìn)行優(yōu)化的property,會被寫入到j(luò)son plan中
/** Optional, the JSON for the optimizer properties of the operator result,
* to be included in the JSON plan */
private String resultOptimizerProperties;
// 調(diào)度這個(gè)jobvertex時(shí),依賴輸入限制策略:
// 1。ANY:只要上游的input相關(guān)的subtask有完成的 即可開啟當(dāng)前jobvertex的subtask處理
// 2。ALL:必須等待上游的input相關(guān)subtasks全部完成,才會啟動jobvertex的subtask來獲取數(shù)據(jù)
// 一般來說在指定一致性語義時(shí)需要注意這兩種策略
/** The input dependency constraint to schedule this vertex. */
private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;
第二部分:方法或函數(shù)
- 被執(zhí)行的具體task
/**
* Returns the invokable class which represents the task of this vertex
*
* @param cl The classloader used to resolve user-defined classes
* @return The invokable class, <code>null</code> if it is not set
*/
// 用來指定當(dāng)前jobvertex對應(yīng)具體的task
// 比如DataSinkTask/BatchTask等 會被執(zhí)行的task
public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
if (cl == null) {
throw new NullPointerException("The classloader must not be null.");
}
if (invokableClassName == null) {
return null;
}
try {
return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
}
catch (ClassNotFoundException e) {
throw new RuntimeException("The user-code class could not be resolved.", e);
}
catch (ClassCastException e) {
throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
}
}
2.跟operator chain相關(guān)的內(nèi)容
/**
* Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
* slot sharing group can run one subtask each in the same slot.
*
* @param grp The slot sharing group to associate the vertex with.
*/
// 將該jobvertex通過slotgroup組合起來便于schedule,
// 只要在相同的slot group隸屬不同的job vertex能夠彼此在同一個(gè)slot被調(diào)用
public void setSlotSharingGroup(SlotSharingGroup grp) {
if (this.slotSharingGroup != null) {
this.slotSharingGroup.removeVertexFromGroup(id);
}
this.slotSharingGroup = grp;
if (grp != null) {
grp.addVertexToGroup(id);
}
}
/**
* Gets the slot sharing group that this vertex is associated with. Different vertices in the same
* slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
* a slot sharing group, this method returns {@code null}.
*
* @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
*/
// 當(dāng)一個(gè)jobvertex未被分配slotgroup時(shí) 與該jobvertex是沒有關(guān)聯(lián)的slotgroup,直接返回null
// 默認(rèn)情況下slotgroup名字:default
@Nullable
public SlotSharingGroup getSlotSharingGroup() {
return slotSharingGroup;
}
/**
* Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
* Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
* instance (TaskManager) as the n'th subtask of the given vertex.
*
* NOTE: Co-location is only possible between vertices in a slot sharing group.
*
* NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
* respective vertex must be a (transitive) input of this vertex.
*
* @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
*
* @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
* slot sharing group.
*
* @see #setSlotSharingGroup(SlotSharingGroup)
*/
// 將當(dāng)前jobvertex與給定的jobvertex的subtasks關(guān)聯(lián)在一起
// 也就意味著比如當(dāng)前jobvertex的第n個(gè)subtask和給定的jobvertex第n任務(wù)在相同tm上被同一個(gè)slot執(zhí)行
// 需要一些限制條件:
// 1。需要當(dāng)前的jobvertex和給定jobvertex隸屬同一個(gè)slotgroup
// 2。當(dāng)前jobvertex的input是依賴與之關(guān)聯(lián)jobvertex,是可傳遞的,forward
// 在進(jìn)行operator chain時(shí)會通過這種方式來進(jìn)行處理
public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
}
CoLocationGroup thisGroup = this.coLocationGroup;
CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
// 首先保證兩個(gè)jobvertex隸屬同一slotgroup
if (otherGroup == null) {
if (thisGroup == null) { // 兩個(gè)jobvertex不隸屬任何slotgroup;直接構(gòu)建一個(gè)colocationgroup將jobvertex關(guān)聯(lián)在一起
CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
this.coLocationGroup = group;
strictlyCoLocatedWith.coLocationGroup = group;
}
else { // 指定的jobvertex沒有對應(yīng)的slotgroup,直接使用當(dāng)前的jobvertex所在的slotgroup
thisGroup.addVertex(strictlyCoLocatedWith);
strictlyCoLocatedWith.coLocationGroup = thisGroup;
}
}
else {
if (thisGroup == null) { // 當(dāng)前的jobvertex不存在對應(yīng)的slotgroup,使用給定的jobvertex的slotgroup
otherGroup.addVertex(this);
this.coLocationGroup = otherGroup;
}
else { // 兩個(gè)jobvertex具備不同的slotgroup 需要進(jìn)行合并
// both had yet distinct groups, we need to merge them
thisGroup.mergeInto(otherGroup);
}
}
}
- ResultPartitionType
表示jobvertex產(chǎn)生intermediate dataset后以什么方式進(jìn)行partition
/**
* Blocking partitions represent blocking data exchanges, where the data stream is first
* fully produced and then consumed. This is an option that is only applicable to bounded
* streams and can be used in bounded stream runtime and recovery.
*
* <p>Blocking partitions can be consumed multiple times and concurrently.
*
* <p>The partition is not automatically released after being consumed (like for example the
* {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
* that the partition is no longer needed.
*/
// 以block方式傳輸數(shù)據(jù)
// 需要當(dāng)前的數(shù)據(jù)要全部生產(chǎn)完成,方可消費(fèi)
// 該partition方式常用于bound stream,既能用于正常的數(shù)據(jù)處理又可以用于故障恢復(fù)
// 需要注意一點(diǎn):block產(chǎn)生的partition可以被重復(fù)消費(fèi)
BLOCKING(false, false, false, false),
/**
* BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have
* a user-specified life cycle.
*
* <p>BLOCKING_PERSISTENT partitions are dropped upon explicit API calls to the
* JobManager or ResourceManager, rather than by the scheduler.
*
* <p>Otherwise, the partition may only be dropped by safety-nets during failure handling
* scenarios, like when the TaskManager exits or when the TaskManager looses connection
* to JobManager / ResourceManager for too long.
*/
// 類似block partition方式,不過用戶可以指定生命周期
// 針對BLOCKING_PERSISTENT partition被清理,只能由jobmanager或resourcemanager來完成,不能通過scheduler來完成
// 不過出現(xiàn)tm退出或者tm與jobmanager間失聯(lián)時(shí)間過長,此時(shí)BLOCKING_PERSISTENT partition只能在失敗處理期間通過safety-nets來清理
BLOCKING_PERSISTENT(false, false, false, true),
/**
* A pipelined streaming data exchange. This is applicable to both bounded and unbounded streams.
*
* <p>Pipelined results can be consumed only once by a single consumer and are automatically
* disposed when the stream has been consumed.
*
* <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
* the {@link #PIPELINED_BOUNDED} variant.
*/
// 數(shù)據(jù)以pipelined的方式進(jìn)行傳輸,能夠支持流和批處理
// 針對pipelined產(chǎn)生的partition只能被消費(fèi)一次并且是一個(gè)consumer;
// 一旦pipelined的partition被消費(fèi)過 將會自動被丟棄
PIPELINED(true, true, false, false),
/**
* Pipelined partitions with a bounded (local) buffer pool.
*
* <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
* data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
* overall network buffer pool size, this, however, still allows to be flexible with regards
* to the total number of partitions by selecting an appropriately big network buffer pool size.
*
* <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
* no checkpoint barriers.
*/
// 類似Pipelined partition,不過它還附帶了一個(gè)有界的本地buffer pool
// PIPELINED_BOUNDED用于stream job時(shí)通過指定固定大小的buffer pool
// 方面一。能夠?qū)?shù)據(jù)固定大小的數(shù)據(jù)進(jìn)行buffer,這樣也可以使得checkpoint對齊不會被延遲太久,整體數(shù)據(jù)處理吞吐量也會提升;
// 另一方面,由于使用的是固定大小的buffer pool將數(shù)據(jù)buffer,能夠相對較好調(diào)整及時(shí)性和吞吐量兩者的平衡
// 不過該partition方式的本地buffer pool不同于network buffer pool大小的限制,該方式能通過選擇適當(dāng)?shù)拇蟮木W(wǎng)絡(luò)緩沖池大小來靈活地控制分區(qū)的總數(shù)
// 針對batch job來說由于不存在checkpoint對齊的過程,是沒有限制的
PIPELINED_BOUNDED(true, true, true, false);
三。源碼
JobVertex.java源碼