flink之jobgraph---JobGraph

關(guān)于JobGraph的解讀

  • 1.在flink里,JobGraph代表一個(gè)flink dataflow程序,最終無(wú)論是低級(jí)的api還是高級(jí)的api編寫(xiě)的program都會(huì)轉(zhuǎn)換為JobGraph;

  • 2.JobGraph通過(guò)將graph的vertices和intermediate results以DAG的形式匯聚在一起,需要注意Iterations(feedback edges)不包括在JobGraph中,在一些特殊指定的vertices,彼此通過(guò)feeback channel關(guān)聯(lián)起來(lái);

  • 3.通過(guò)JobGraph來(lái)進(jìn)行job級(jí)別的參數(shù)配置,能夠?yàn)槊總€(gè)vertex和intermediate result設(shè)置當(dāng)前operation和intermediate data的特點(diǎn)內(nèi)容;

關(guān)于源碼

一。定義的屬性或字段

/** List of task vertices included in this job graph. */
// 定義當(dāng)前job相關(guān)的vertex,采用鏈表的方式存儲(chǔ),采用vertextid作為key,保障vertexid在job唯一
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

/** The job configuration attached to this job. */
// 定義當(dāng)前job的配置信息
private final Configuration jobConfiguration = new Configuration();

/** ID of this job. May be set if specific job id is desired (e.g. session management) */
// job id用于區(qū)別不同的job
private final JobID jobID;

/** Name of this job. */
// job執(zhí)行時(shí)的名稱(chēng)
private final String jobName;

/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 * job manager after it has been executed. */
// 指定對(duì)應(yīng)的ExecutionGraph在jobmanager執(zhí)行有效期多久,超時(shí)后會(huì)被removed。 
private long sessionTimeout = 0;

/** flag to enable queued scheduling */
// job schedule使用采用queue的方式按序執(zhí)行
private boolean allowQueuedScheduling;

/** The mode in which the job is scheduled */
// job被執(zhí)行模式:默認(rèn)lazy模式
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

// --- checkpointing ---
// 以下進(jìn)行checkpoint設(shè)置
/** Job specific execution config */
// 執(zhí)行job execution相關(guān)配置
private SerializedValue<ExecutionConfig> serializedExecutionConfig;

/** The settings for the job checkpoints */
// 設(shè)置當(dāng)前job的checkpoint設(shè)置
private JobCheckpointingSettings snapshotSettings;

/** Savepoint restore settings. */
// 設(shè)置job的savepoint的恢復(fù)設(shè)置:常用語(yǔ)job恢復(fù),默認(rèn)為none
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

// --- attached resources ---
// job執(zhí)行時(shí)需要的一些附加的設(shè)置
/** Set of JAR files required to run this job. */
// 當(dāng)用戶(hù)使用附加的jar時(shí) 可以通過(guò)操作userJars來(lái)指定額外jar的路徑
private final List<Path> userJars = new ArrayList<Path>();

/** Set of custom files required to run this job. */
// 當(dāng)job執(zhí)行需要一些額外的自定義files,通過(guò)userArtifacts來(lái)進(jìn)行配置
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();

/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();

/** List of classpaths required to run this job. */
// 指定job時(shí)需要指定的classpath
private List<URL> classpaths = Collections.emptyList();

二。關(guān)于方法或函數(shù)

在JobGraph有兩個(gè)構(gòu)造JobGprah的函數(shù)
構(gòu)造函數(shù)一:

/**
 * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
 * the given name and the given execution configuration (see {@link ExecutionConfig}).
 * The ExecutionConfig will be serialized and can't be modified afterwards.
 *
 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 * @param jobName The name of the job.
 */
 // 通過(guò)指定的jobId和jobName構(gòu)造jobgraph:未指定jobid或jobname時(shí),都有對(duì)應(yīng)的默認(rèn)值
 // 針對(duì)該job來(lái)設(shè)置execution config相關(guān)配置內(nèi)容,默認(rèn)直接產(chǎn)生初始的
 // 1.關(guān)于jobid隨機(jī)生成:jobid可人工指定,也可自動(dòng)隨機(jī)生成 
 // 2.關(guān)于jobname未指定:直接使用“(unnamed job)”代替
 // 3.設(shè)置ExecutionConfig:通常用來(lái)配置program執(zhí)行的行為,比較常用配置選項(xiàng):
 //  3.1 parallelism:program執(zhí)行默認(rèn)的并行度;
 //  3.2 retries:執(zhí)行時(shí)失敗重試的次數(shù)
 //  3.3 delay:通常是和retires有關(guān),兩次重試需要的間隔,延遲時(shí)間
 //  3.4 execution mode:通常是batch和pipelined。默認(rèn)是pipelined
 //  3.5 是否開(kāi)啟“closure cleaner”:關(guān)于“closure cleaner”用于對(duì)function實(shí)現(xiàn)進(jìn)行預(yù)處理,比如“closure”是匿名的存在類(lèi)的內(nèi)部(內(nèi)部類(lèi)),它會(huì)移除一些未使用的“closure”的引用,來(lái)修復(fù)一些和序列化有關(guān)的問(wèn)題并減少“closure”的大小
 // 3.6 注冊(cè)type和serializer:通常為了提升處理“泛型”和“pojo”的性能,不僅僅需要返回聲明的類(lèi)型,還包括這些類(lèi)型的subclass才會(huì)需要我們手動(dòng)進(jìn)行type和serializer注冊(cè)/聲明。
public JobGraph(JobID jobId, String jobName) {
   this.jobID = jobId == null ? new JobID() : jobId;
   this.jobName = jobName == null ? "(unnamed job)" : jobName;

   try {
      setExecutionConfig(new ExecutionConfig());
   } catch (IOException e) {
      // this should never happen, since an empty execution config is always serializable
      throw new RuntimeException("bug, empty execution config is not serializable");
   }
}

構(gòu)造函數(shù)二:

// 從source開(kāi)始來(lái)構(gòu)建topology
// 1.根據(jù)jobgraph中已被注冊(cè)的vertex構(gòu)建一個(gè)list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到j(luò)obgraph中已注冊(cè)的source vertex 
   {
     // 通過(guò)set來(lái)將重復(fù)的vertex進(jìn)行去重 防止某個(gè)vertex重復(fù)被記錄 形成循環(huán)
     // 故而使用LinkedHashMap來(lái)存放對(duì)應(yīng)的vertex 以鏈表的形式存儲(chǔ)
     // 1.首先遍歷set中每個(gè)jobvertex
     // 2.判斷該jobvertex是否有input 沒(méi)有的話(huà) 即認(rèn)為是source
     // 3.將所有的source存放list中,同時(shí)將對(duì)應(yīng)的set中該jovvertex remove,防止重復(fù)記錄
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循環(huán)中提取所有的source以及孤立的dataset,接下來(lái)要對(duì)sort中剩余的非source或孤立的dataset進(jìn)行處理
   // 1.通過(guò)移動(dòng)startNodePos的值 防止jobgraph形成循環(huán): 不超過(guò)list中記錄的source及孤立dataset個(gè)數(shù)
   // 2.以下操作的目的就是將jobgraph中屬于source或孤立dataset提取出來(lái),最終在streamgraph基礎(chǔ)上形成jobgraph的topology
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      // 首先要保證開(kāi)始遍歷的node位置仍處于list中,否則會(huì)導(dǎo)致對(duì)應(yīng)的jobgraph就變循環(huán)的,是不能形成DAG topology的
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source輸出的IntermediateDataSet
// 2.接著循環(huán)遍歷每個(gè)IntermediateDataSet對(duì)應(yīng)的consumer:JobEdge
// 3.再獲取對(duì)應(yīng)的JobEdge輸出的target,并檢查其target是否存在set中,不存在需要拋棄對(duì)應(yīng)的jobedge
//  3.1 針對(duì)在set中的target ,則需要獲取該target所有的input,同時(shí)需要移除重復(fù)的JobEdge
//  3.2 通過(guò)前面的3.1循環(huán)遍歷出set中vertex沒(méi)有predecessor的添加到target
//  3.3 循環(huán)迭代針對(duì)每個(gè)jobvertex進(jìn)行如上的處理 直至所有的jobvertex遍歷完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
  
   // forward traverse over all produced data sets and all their consumers
   // 以當(dāng)前的jobvertex為主,獲取其相關(guān)的輸出結(jié)果IntermediateDataset,
   // 接著獲取每個(gè)IntermediateDataset對(duì)應(yīng)的consumer:JobEdge
   // 最終獲取每個(gè)jobedge的輸出接收的目標(biāo)vertex:jobvertex(也就是我們需要遍歷提取的jobvertex)
   // 接下來(lái)再對(duì)獲取到的jobvertex判斷是否存在set集合中
   // 不存在的 則直接忽略
   // 存在的則需要 需要進(jìn)行逆向檢查,從目標(biāo)target的JobVertex開(kāi)始逆向?qū)ふ摇伴L(zhǎng)輩級(jí)”jobvertex(需要剔除當(dāng)前jobvertex,因?yàn)槟繕?biāo)jobvertex就是從當(dāng)前jobvertex獲取的,沒(méi)有必要重復(fù)檢查)
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;
          
         // 要檢查
         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重復(fù),主要還是為了剔除從當(dāng)前jobvertex流向目標(biāo)jobvertex,沒(méi)有必要重復(fù)檢查
            if (e == edge) {
               continue;
            }

            // 獲取該jobedge的source
            // 獲取目標(biāo)jobvertex所有的關(guān)聯(lián)的jobedge對(duì)應(yīng)的IntermediateDataSets
           // 這樣就可以獲取目標(biāo)jobvertex對(duì)應(yīng)的提供source的vertex
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且沒(méi)有predecessor的jobvertex,同時(shí)需要完成添加到target中并清除set中記錄
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

三。構(gòu)建jobgraph

// 從source開(kāi)始來(lái)構(gòu)建topology
// 1.根據(jù)jobgraph中已被注冊(cè)的vertex構(gòu)建一個(gè)list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到j(luò)obgraph中已注冊(cè)的source vertex 
   {
     // 通過(guò)set來(lái)將重復(fù)的vertex進(jìn)行去重 防止某個(gè)vertex重復(fù)被記錄 形成循環(huán)
     // 故而使用LinkedHashMap來(lái)存放對(duì)應(yīng)的vertex 以鏈表的形式存儲(chǔ)
     // 1.首先遍歷set中每個(gè)jobvertex
     // 2.判斷該jobvertex是否有input 沒(méi)有的話(huà) 即認(rèn)為是source
     // 3.將所有的source存放list中,同時(shí)將對(duì)應(yīng)的set中該jovvertex remove,防止重復(fù)記錄
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循環(huán)中提取所有的source,接下來(lái)要對(duì)sort中剩余的非source進(jìn)行處理
   // 1.通過(guò)移動(dòng)startNodePos的值 防止jobgraph形成循環(huán): 不超過(guò)list中記錄的source個(gè)數(shù)
   // 2.
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source輸出的IntermediateDataSet
// 2.接著循環(huán)遍歷每個(gè)IntermediateDataSet對(duì)應(yīng)的consumer:JobEdge
// 3.再獲取對(duì)應(yīng)的JobEdge輸出的target,并檢查其target是否存在set中,不存在需要拋棄對(duì)應(yīng)的jobedge
//  3.1 針對(duì)在set中的target ,則需要獲取該target所有的input,同時(shí)需要移除重復(fù)的JobEdge
//  3.2 通過(guò)前面的3.1循環(huán)遍歷出set中vertex沒(méi)有predecessor的添加到target
//  3.3 循環(huán)迭代針對(duì)每個(gè)jobvertex進(jìn)行如上的處理 直至所有的jobvertex遍歷完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {

   // forward traverse over all produced data sets and all their consumers
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;

         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重復(fù)
            if (e == edge) {
               continue;
            }

            // 獲取該jobedge的source
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且沒(méi)有predecessor的jobvertex,同時(shí)需要完成添加到target中并清除set中記錄
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

四。jobgraph結(jié)構(gòu)

jobgraph

五。源碼

JobGraph.java源碼

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容