MapReduce源碼分析(一)作業(yè)提交流程

前言

Mapreduce 是一個分布式運算程序的編程框架,是用戶開發(fā)“基于 hadoop的數(shù)據(jù)分析 應用”的核心框架。Mapreduce 核心功能是將用戶編寫的業(yè)務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發(fā)運行在一個 hadoop 集群上。

執(zhí)行流程圖

一.waitForCompletion

在mapreduce程序的job類中,我們通過set Configuration對象,得到相應的job對象,在job對象中指定Mapper類、Reducer類,Job類等屬性后,通過waitForCompletion(true)方法提交并等待job執(zhí)行。傳入的boolean類型參數(shù)決定是否監(jiān)控并打印job的執(zhí)行情況。

public class MyJob {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        if (args.length!=2){
            System.out.println("請傳入?yún)?shù)");
            System.exit(1);
        }
        String inputPath = args[0];
        String outputPath = args[1];
        if (inputPath == null || inputPath == "" || outputPath == null || outputPath == ""){
            System.out.println("參數(shù)有誤");
            System.exit(1);
        }
        Configuration conf = new Configuration(true);

        Path out = new Path(outputPath);
        if (out.getFileSystem(conf).exists(out)){
            System.out.println("HDFS輸出目錄已存在");
            System.exit(1);
        }

        //構建job類
        Job job = Job.getInstance(conf);
        //設置運行主類
        job.setJarByClass(MyJob.class);
        //作業(yè)名稱
        job.setJobName("job");
        //設置輸入輸出路徑
        FileInputFormat.addInputPath(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,out);
        //先是format
        //job.setInputFormatClass();

        //mapTask
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(TQ.class);
        job.setMapOutputValueClass(IntWritable.class);
        //然后是排序比較器
        job.setSortComparatorClass(TQSortComparator.class);
        //partition分組規(guī)則
        job.setPartitionerClass(TQPartitioner.class);
        //分組比較器
        job.setGroupingComparatorClass(TQGroupComparator.class);
        //reduceTask
        job.setReducerClass(MyReduce.class);
        //設置ReduceTask數(shù)量
        job.setNumReduceTasks(2);
        //提交任務完成
        job.waitForCompletion(true);
    }
}

現(xiàn)在我們進入Job類中的waitForCompletion()方法查看,該方法傳入一個布爾值參數(shù)。方法首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法提交job。而后根據(jù)傳入的參數(shù)決定是否監(jiān)控并打印job的運行狀況。

該方法每隔 1 秒輪詢作業(yè)的進度,如果進度有所變化,將該進度報告給控制臺(console)。當作業(yè)成功完成,作業(yè)計數(shù)器被顯示出來。否則,導致作業(yè)失敗的錯誤被記錄到控制臺。

/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose

                                  ) throws IOException, InterruptedException,

                                            ClassNotFoundException {

    //首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法向集群提交job

    if (state == JobState.DEFINE) {

      submit();

    }

    //若傳入?yún)?shù)為true,則監(jiān)控并打印job運行情況

    if (verbose) {

      monitorAndPrintJob();

    } else {

      // get the completion poll interval from the client.

      int completionPollIntervalMillis =

        Job.getCompletionPollInterval(cluster.getConf());

      while (!isComplete()) {

        try {

          Thread.sleep(completionPollIntervalMillis);

        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }  

二.submit()方法

waitForCompletion內部主要處理任務是調用了submit方法,接下來我們關注核心點submit方法。

該方法負責向集群提交job,方法首先再次檢查job的狀態(tài),如果不是DEFINE則不能提交作業(yè),setUseNewAPI()方法作用是指定job使用的是新版mapreduce的API,即org.apache.hadoop.mapreduce包下的Mapper和Reducer,而不是老版的mapred包下的類。
submit()中執(zhí)行了兩個比較重要的方法:
其一,connect()方法會對Job類中的Cluster類型的成員進行初始化,該成員對象中封裝了通過Configuration設置的集群的信息,其內部創(chuàng)建了真正的通信協(xié)議對象,它將用于最終的job提交。
其二,getJobSubmitter()方法通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對象,該對象負責最終向集群提交job并返回job的運行進度。最后job提交器對象submitter.submitJobInternal(Job.this, cluster)將當前job對象提交到cluster中,并返回job運行狀態(tài)給status成員,該方法是JobSubmitter中最核心的功能代碼。提交成功后,JobState被設置為RUNNING,表示當前job進入運行階段,最后控制臺中打印跟蹤job運行狀況的URL。

 /**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    //通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對象,該對象負責最終向集群提交job并返回job的運行進度
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  

三.submitJobInternal方法

可以看到在 Job 對象上面調用 submit() 方法之后,在內部創(chuàng)建一個 JobSubmitter 實例,然后調用該實例的 submitJobInternal() 方法。

任務提交器(JobSubmitter)是最終提交任務到集群的方法。

submitJobInternal的執(zhí)行過程如下:

1.首先checkSpecs(job)方法檢查作業(yè)輸出路徑是否配置并且是否存在。正確情況是已經(jīng)配置且不存在,輸出路徑的配置參數(shù)為mapreduce.output.fileoutputformat.outputdir

2.而后獲取job中封裝的Configuration對象,添加MAPREDUCE_APPLICATION_FRAMEWORK_PATH(應用框架路徑)到分布式緩存中。

3.通過JobSubmissionFiles中的靜態(tài)方法getStagingDir()獲取作業(yè)執(zhí)行時相關資源的存放路徑。默認路徑是: /tmp/hadoop-yarn/staging/root/.staging

4.獲取提交任務的當前主機的IP,并將ip、主機名等相關信息封裝進Configuration對象中。

5.生成JobID并將其設置進job對象中,構造提交job的路徑。然后是對該路徑設置一系列權限的操作。

6.copyAndConfigureFiles,拷貝作業(yè)運行必備的資源,作業(yè) JAR 文件,作業(yè) JAR 文件以一個高副本因子(a high replication factor)進行拷貝(由 mapreduce.client.submit.file.replication 屬性控制,默認值為 10),所以在作業(yè)任務運行時,在集群中有很多的作業(yè) JAR 副本供節(jié)點管理器來訪問。

7.調用writeSplits()方法,(非常重要)為作業(yè)計算輸入分片(input splits)。寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo,計算map任務數(shù)。

8.writeConf()方法,寫 xml 配置文件

9.提交作業(yè)submitClient.submitJob,通過在資源管理器上調用 submitApplication 來提交作業(yè)。

源碼如下

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
         
          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

四.writeSplits 方法

    // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

使用newAPI將會調用writeNewSplits()方法

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //如果使用newAPI則調用writeNewSplits
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

writeNewSplits()方法將會根據(jù)我們設置的inputFormat.class通過反射獲得inputFormat對象input,然后調用inputFormat對象的getSplits方法,當獲得分片信息之后調用JobSplitWriter.createSplitFiles方法將分片的信息寫入到submitJobDir/job.split文件中。

反射獲取InputFormat 格式化的規(guī)則,如果用戶不指定,默認TextInputFormat。

五.FileInputFormat類中的getSplits()方法

這里我們需要注意這一代碼:

....
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);
....
   if (isSplitable(job, path)) {
         //獲取塊的大小
          long blockSize = file.getBlockSize();
          //計算切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
         //分塊中剩余的字節(jié)大小
          long bytesRemaining = length;

          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

            splits.add(makeSplit(path, length-bytesRemaining, splitSize,

                        blkLocations[blkIndex].getHosts(),

                        blkLocations[blkIndex].getCachedHosts()));

            bytesRemaining -= splitSize;

          }

          if (bytesRemaining != 0) {

            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,

                      blkLocations[blkIndex].getHosts(),

                      blkLocations[blkIndex].getCachedHosts()));

          }

        } else { // not splitable
....
}

這里的之前代碼首先獲取long minSizelong maxSize,然后計算切片大小。

計算規(guī)則如下:

computeSplitSize >>> Math.max(minSize, Math.min(maxSize, blockSize));

如果想要修改切片的最大值就修改minSize,最小值就修改MaxSize,反著來就行。

這里的while循環(huán)中的判定語句作用是判斷分塊中剩余的字節(jié)大小與預設分片大小的比例是否超過某個限定值SPLIT_SLOP,該值是一個常量,為1.1,在FileInputFormat類中定義。也就是說當剩余字節(jié)大于預設分片大小的110%后,對剩余的文件繼續(xù)分片,否則不足110%,直接將剩余文件生成一個分片。

private static final double SPLIT_SLOP = 1.1;

之后生成切片信息,放入到切片集合中。

這里注意切片的5個組成部分

  • Path
  • 起始偏移量
  • 切片大小
  • 切片的存儲的host地址
  • 切片的副本的host地址
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));

回到JobSubmitter的writeNewSplits方法中,這里對切片進行排序,根據(jù)大小將拆分排序,以便最大的拆分優(yōu)先

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

之后調用createSplitFiles,根據(jù)切片清單創(chuàng)建元數(shù)據(jù)信息文件,主要是將數(shù)據(jù)寫入到job.split和job.splitmetainfo文件中。由createSplitFiles函數(shù)完成核心功能。

public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir, 
      Configuration conf, FileSystem fs, T[] splits) 
  throws IOException, InterruptedException {
    //創(chuàng)建job.split文件,并以流的方式打開該文件
    FSDataOutputStream out = createFile(fs, 
        JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
    //將所有分片(InputSplit的實例)的信息都寫入job.split文件中
    //同時會返回各個分片的原數(shù)據(jù)信息,放入info數(shù)組中
    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
    out.close();
   //將info中的split元數(shù)據(jù)信息寫入到job.splitmetainfo文件中
    writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), 
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
        info);
  }

六.submitJob真正的提交作業(yè)

submitClient.submitJob,會調用YARN的submitJob方法。構造開始一個MapReduce的ApplicationMaster的必要信息。

// Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

調用submitApplication(appContext),提交到ResourceManager。

下一篇文章會對作業(yè)的初始化源碼進行分析。敬請關注

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容