MapReduce

提交

提交源碼

//true代表打印執(zhí)行過程
boolean b = job.waitForCompletion(true);

Job.java

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {
//Job現(xiàn)在狀態(tài)還是DEFINE,會(huì)執(zhí)行submit
    if (state == JobState.DEFINE) {
      submit();
    if (verbose) {
//監(jiān)控打印job的信息,執(zhí)行完臨時(shí)目錄的內(nèi)容就刪掉了
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
    }
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    //確認(rèn)狀態(tài)
    ensureState(JobState.DEFINE);
    //設(shè)置新舊api兼容
    setUseNewAPI();
    //設(shè)置連接,如果沒設(shè)置用戶,doAs設(shè)置當(dāng)前主機(jī)為登錄用戶
    connect();
    //設(shè)置提交job的fs,客戶端,狀態(tài)等信息
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(),          cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
       //墨跡半天終于提交了
        return       submitter.submitJobInternal(Job.this, cluster);
    //提交流程結(jié)束,JobState改變?yōu)閞unning
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());

提交進(jìn)JobSubmitter.java

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //檢查輸出路徑 
    checkSpecs(job);
    //進(jìn)去看一眼,全是site.xml,存的都是配置文件
    Configuration conf = job.getConfiguration();
//字面意思,添加MR框架去分布式緩存
//大概就是把配置文件發(fā)給每個(gè)maptask和//reducetask    addMRFrameworkToDistributedCache(conf);
    //會(huì)生成一個(gè)臨時(shí)目錄,可為什么在d盤啊
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    //生成了唯一標(biāo)識(shí)jobid
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
//在剛才生成的臨時(shí)目錄下定義路徑,準(zhǔn)備生成
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
   //配置suffleid/qeque什么的
   ...
   //這個(gè)方法一執(zhí)行,剛才的定義的路徑就生成文件夾了
  //進(jìn)到JobSubmitter里了,執(zhí)行rUploader.uploadResources
   //然后進(jìn)JobResourceUploader里,mkdir生成了文件夾
  //uploadJobJar,集群模式?jīng)]jar,要提交到集群,所以先提交到文件夾里
   copyAndConfigureFiles(job, submitJobDir);
   //路徑是文件夾路徑/job.xml
   Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
     //切片源碼,執(zhí)行完切片文件放入剛才文件夾
      int maps = writeSplits(job, submitJobDir);
     //設(shè)置切片個(gè)數(shù),切片賦給MRJobConfig.NUM_MAPS了
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
     ...
  //把xml放進(jìn)來了,都是配置信息
   writeConf(conf, submitJobFile);
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
    //設(shè)置job為的status變?yōu)閞unning
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

切片

切片源碼

JobSubmitter.java

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    //jobSubmitDir 存儲(chǔ)路徑(就是生成那個(gè)臨時(shí)文件夾)
    //jConf是job的信息,id什么的
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //hadoop3.x的切片方式,我就進(jìn)入這個(gè)方法。
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
   //hadoop2.x的切片方式
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }
 //hadoop3.x具體切片
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
   //切片了!!!!!!!!!!!
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

InputFormat有很多實(shí)現(xiàn)類。默認(rèn)進(jìn)入FileInputFormat.java實(shí)現(xiàn)類
FileInputFormat.java

 public static long getMaxSplitSize(JobContext context) {
   //SPLIT_MAXSIZE不設(shè)置沒有
    return context.getConfiguration().getLong(SPLIT_MAXSIZE, 
                                              Long.MAX_VALUE);
  }
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
   //切片最小的size,getFormatMinSplitSize()返回1
   //getMinSplitSize()設(shè)置在yarn-default里,默認(rèn)值是0
   //mapreduce.input.fileinputformat.split.minsize
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //切片最大的size
    long maxSize = getMaxSplitSize(job);
    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
   ...
    循環(huán)遍歷文件列表進(jìn)行切片,說明是按照文件進(jìn)行切分
    for (FileStatus file: files) {
             if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //文件是否可以切割
        if (isSplitable(job, path)) {
          //獲取塊大小
          long blockSize = file.getBlockSize();
           //獲取切片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          //一個(gè)文件的大小
          long bytesRemaining = length;
          //SPLIT_SLOP=1.1 
          //如果文件大于切片大小的1.1倍就切片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          //感覺就是循環(huán)切完片了,如果還剩下點(diǎn),就放在最后一片
            if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
      }else { 
          // not splitable
      }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
  }
圖片.png

圖片.png

圖片.png

提交


圖片.png

切片
圖片.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容