關(guān)于JobGraph的解讀
1.在flink里,JobGraph代表一個(gè)flink dataflow程序,最終無(wú)論是低級(jí)的api還是高級(jí)的api編寫(xiě)的program都會(huì)轉(zhuǎn)換為JobGraph;
2.JobGraph通過(guò)將graph的vertices和intermediate results以DAG的形式匯聚在一起,需要注意Iterations(feedback edges)不包括在JobGraph中,在一些特殊指定的vertices,彼此通過(guò)feeback channel關(guān)聯(lián)起來(lái);
3.通過(guò)JobGraph來(lái)進(jìn)行job級(jí)別的參數(shù)配置,能夠?yàn)槊總€(gè)vertex和intermediate result設(shè)置當(dāng)前operation和intermediate data的特點(diǎn)內(nèi)容;
關(guān)于源碼
一。定義的屬性或字段
/** List of task vertices included in this job graph. */
// 定義當(dāng)前job相關(guān)的vertex,采用鏈表的方式存儲(chǔ),采用vertextid作為key,保障vertexid在job唯一
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
/** The job configuration attached to this job. */
// 定義當(dāng)前job的配置信息
private final Configuration jobConfiguration = new Configuration();
/** ID of this job. May be set if specific job id is desired (e.g. session management) */
// job id用于區(qū)別不同的job
private final JobID jobID;
/** Name of this job. */
// job執(zhí)行時(shí)的名稱(chēng)
private final String jobName;
/** The number of seconds after which the corresponding ExecutionGraph is removed at the
* job manager after it has been executed. */
// 指定對(duì)應(yīng)的ExecutionGraph在jobmanager執(zhí)行有效期多久,超時(shí)后會(huì)被removed。
private long sessionTimeout = 0;
/** flag to enable queued scheduling */
// job schedule使用采用queue的方式按序執(zhí)行
private boolean allowQueuedScheduling;
/** The mode in which the job is scheduled */
// job被執(zhí)行模式:默認(rèn)lazy模式
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
// --- checkpointing ---
// 以下進(jìn)行checkpoint設(shè)置
/** Job specific execution config */
// 執(zhí)行job execution相關(guān)配置
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** The settings for the job checkpoints */
// 設(shè)置當(dāng)前job的checkpoint設(shè)置
private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
// 設(shè)置job的savepoint的恢復(fù)設(shè)置:常用語(yǔ)job恢復(fù),默認(rèn)為none
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
// --- attached resources ---
// job執(zhí)行時(shí)需要的一些附加的設(shè)置
/** Set of JAR files required to run this job. */
// 當(dāng)用戶(hù)使用附加的jar時(shí) 可以通過(guò)操作userJars來(lái)指定額外jar的路徑
private final List<Path> userJars = new ArrayList<Path>();
/** Set of custom files required to run this job. */
// 當(dāng)job執(zhí)行需要一些額外的自定義files,通過(guò)userArtifacts來(lái)進(jìn)行配置
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
/** List of classpaths required to run this job. */
// 指定job時(shí)需要指定的classpath
private List<URL> classpaths = Collections.emptyList();
二。關(guān)于方法或函數(shù)
在JobGraph有兩個(gè)構(gòu)造JobGprah的函數(shù)
構(gòu)造函數(shù)一:
/**
* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
* the given name and the given execution configuration (see {@link ExecutionConfig}).
* The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
*/
// 通過(guò)指定的jobId和jobName構(gòu)造jobgraph:未指定jobid或jobname時(shí),都有對(duì)應(yīng)的默認(rèn)值
// 針對(duì)該job來(lái)設(shè)置execution config相關(guān)配置內(nèi)容,默認(rèn)直接產(chǎn)生初始的
// 1.關(guān)于jobid隨機(jī)生成:jobid可人工指定,也可自動(dòng)隨機(jī)生成
// 2.關(guān)于jobname未指定:直接使用“(unnamed job)”代替
// 3.設(shè)置ExecutionConfig:通常用來(lái)配置program執(zhí)行的行為,比較常用配置選項(xiàng):
// 3.1 parallelism:program執(zhí)行默認(rèn)的并行度;
// 3.2 retries:執(zhí)行時(shí)失敗重試的次數(shù)
// 3.3 delay:通常是和retires有關(guān),兩次重試需要的間隔,延遲時(shí)間
// 3.4 execution mode:通常是batch和pipelined。默認(rèn)是pipelined
// 3.5 是否開(kāi)啟“closure cleaner”:關(guān)于“closure cleaner”用于對(duì)function實(shí)現(xiàn)進(jìn)行預(yù)處理,比如“closure”是匿名的存在類(lèi)的內(nèi)部(內(nèi)部類(lèi)),它會(huì)移除一些未使用的“closure”的引用,來(lái)修復(fù)一些和序列化有關(guān)的問(wèn)題并減少“closure”的大小
// 3.6 注冊(cè)type和serializer:通常為了提升處理“泛型”和“pojo”的性能,不僅僅需要返回聲明的類(lèi)型,還包括這些類(lèi)型的subclass才會(huì)需要我們手動(dòng)進(jìn)行type和serializer注冊(cè)/聲明。
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
try {
setExecutionConfig(new ExecutionConfig());
} catch (IOException e) {
// this should never happen, since an empty execution config is always serializable
throw new RuntimeException("bug, empty execution config is not serializable");
}
}
構(gòu)造函數(shù)二:
// 從source開(kāi)始來(lái)構(gòu)建topology
// 1.根據(jù)jobgraph中已被注冊(cè)的vertex構(gòu)建一個(gè)list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
// 首先找到j(luò)obgraph中已注冊(cè)的source vertex
{
// 通過(guò)set來(lái)將重復(fù)的vertex進(jìn)行去重 防止某個(gè)vertex重復(fù)被記錄 形成循環(huán)
// 故而使用LinkedHashMap來(lái)存放對(duì)應(yīng)的vertex 以鏈表的形式存儲(chǔ)
// 1.首先遍歷set中每個(gè)jobvertex
// 2.判斷該jobvertex是否有input 沒(méi)有的話(huà) 即認(rèn)為是source
// 3.將所有的source存放list中,同時(shí)將對(duì)應(yīng)的set中該jovvertex remove,防止重復(fù)記錄
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
int startNodePos = 0;
// traverse from the nodes that were added until we found all elements
// 在前面的迭代循環(huán)中提取所有的source以及孤立的dataset,接下來(lái)要對(duì)sort中剩余的非source或孤立的dataset進(jìn)行處理
// 1.通過(guò)移動(dòng)startNodePos的值 防止jobgraph形成循環(huán): 不超過(guò)list中記錄的source及孤立dataset個(gè)數(shù)
// 2.以下操作的目的就是將jobgraph中屬于source或孤立dataset提取出來(lái),最終在streamgraph基礎(chǔ)上形成jobgraph的topology
while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the
// graph is cyclic, which is not permitted
// 首先要保證開(kāi)始遍歷的node位置仍處于list中,否則會(huì)導(dǎo)致對(duì)應(yīng)的jobgraph就變循環(huán)的,是不能形成DAG topology的
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
//
// 1.首先收到所有的source輸出的IntermediateDataSet
// 2.接著循環(huán)遍歷每個(gè)IntermediateDataSet對(duì)應(yīng)的consumer:JobEdge
// 3.再獲取對(duì)應(yīng)的JobEdge輸出的target,并檢查其target是否存在set中,不存在需要拋棄對(duì)應(yīng)的jobedge
// 3.1 針對(duì)在set中的target ,則需要獲取該target所有的input,同時(shí)需要移除重復(fù)的JobEdge
// 3.2 通過(guò)前面的3.1循環(huán)遍歷出set中vertex沒(méi)有predecessor的添加到target
// 3.3 循環(huán)迭代針對(duì)每個(gè)jobvertex進(jìn)行如上的處理 直至所有的jobvertex遍歷完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
// 以當(dāng)前的jobvertex為主,獲取其相關(guān)的輸出結(jié)果IntermediateDataset,
// 接著獲取每個(gè)IntermediateDataset對(duì)應(yīng)的consumer:JobEdge
// 最終獲取每個(gè)jobedge的輸出接收的目標(biāo)vertex:jobvertex(也就是我們需要遍歷提取的jobvertex)
// 接下來(lái)再對(duì)獲取到的jobvertex判斷是否存在set集合中
// 不存在的 則直接忽略
// 存在的則需要 需要進(jìn)行逆向檢查,從目標(biāo)target的JobVertex開(kāi)始逆向?qū)ふ摇伴L(zhǎng)輩級(jí)”jobvertex(需要剔除當(dāng)前jobvertex,因?yàn)槟繕?biāo)jobvertex就是從當(dāng)前jobvertex獲取的,沒(méi)有必要重復(fù)檢查)
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
boolean hasNewPredecessors = false;
// 要檢查
for (JobEdge e : v.getInputs()) {
// skip the edge through which we came
// 防止jobedge重復(fù),主要還是為了剔除從當(dāng)前jobvertex流向目標(biāo)jobvertex,沒(méi)有必要重復(fù)檢查
if (e == edge) {
continue;
}
// 獲取該jobedge的source
// 獲取目標(biāo)jobvertex所有的關(guān)聯(lián)的jobedge對(duì)應(yīng)的IntermediateDataSets
// 這樣就可以獲取目標(biāo)jobvertex對(duì)應(yīng)的提供source的vertex
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
// 添加存在set中且沒(méi)有predecessor的jobvertex,同時(shí)需要完成添加到target中并清除set中記錄
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}
三。構(gòu)建jobgraph
// 從source開(kāi)始來(lái)構(gòu)建topology
// 1.根據(jù)jobgraph中已被注冊(cè)的vertex構(gòu)建一個(gè)list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
// 首先找到j(luò)obgraph中已注冊(cè)的source vertex
{
// 通過(guò)set來(lái)將重復(fù)的vertex進(jìn)行去重 防止某個(gè)vertex重復(fù)被記錄 形成循環(huán)
// 故而使用LinkedHashMap來(lái)存放對(duì)應(yīng)的vertex 以鏈表的形式存儲(chǔ)
// 1.首先遍歷set中每個(gè)jobvertex
// 2.判斷該jobvertex是否有input 沒(méi)有的話(huà) 即認(rèn)為是source
// 3.將所有的source存放list中,同時(shí)將對(duì)應(yīng)的set中該jovvertex remove,防止重復(fù)記錄
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
int startNodePos = 0;
// traverse from the nodes that were added until we found all elements
// 在前面的迭代循環(huán)中提取所有的source,接下來(lái)要對(duì)sort中剩余的非source進(jìn)行處理
// 1.通過(guò)移動(dòng)startNodePos的值 防止jobgraph形成循環(huán): 不超過(guò)list中記錄的source個(gè)數(shù)
// 2.
while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the
// graph is cyclic, which is not permitted
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
//
// 1.首先收到所有的source輸出的IntermediateDataSet
// 2.接著循環(huán)遍歷每個(gè)IntermediateDataSet對(duì)應(yīng)的consumer:JobEdge
// 3.再獲取對(duì)應(yīng)的JobEdge輸出的target,并檢查其target是否存在set中,不存在需要拋棄對(duì)應(yīng)的jobedge
// 3.1 針對(duì)在set中的target ,則需要獲取該target所有的input,同時(shí)需要移除重復(fù)的JobEdge
// 3.2 通過(guò)前面的3.1循環(huán)遍歷出set中vertex沒(méi)有predecessor的添加到target
// 3.3 循環(huán)迭代針對(duì)每個(gè)jobvertex進(jìn)行如上的處理 直至所有的jobvertex遍歷完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
boolean hasNewPredecessors = false;
for (JobEdge e : v.getInputs()) {
// skip the edge through which we came
// 防止jobedge重復(fù)
if (e == edge) {
continue;
}
// 獲取該jobedge的source
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
// 添加存在set中且沒(méi)有predecessor的jobvertex,同時(shí)需要完成添加到target中并清除set中記錄
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}
四。jobgraph結(jié)構(gòu)
jobgraph