Hadoop wordcount作業(yè)提交流程源碼分析

一、概括:

wordcount作業(yè)提交流程,主要集中在JobSubmitter.submitJobInternal中,包括檢測(cè)輸出目錄合法性,設(shè)置作業(yè)提交信息(主機(jī)和用戶),獲得JobID,向HDFS中拷貝作業(yè)所需文件(Job.jar Job.xml split文件等)最后執(zhí)行作業(yè)提交。這里以WordCount為例介紹提交流程.

二、wordcount流程代碼

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //step 1: 獲取job對(duì)象
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        String input = "data/wc.data";
        String output = "out/";
        FileUtils.deleteTarget(output, configuration);

        //step 2: 設(shè)置jar的相關(guān)信息
        job.setJarByClass(WordCountApp.class);

        //step 3:設(shè)置自定義的mapper和reducer信息
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        //step 4:設(shè)置mapper階段輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //step 5:設(shè)置reducer階段輸出的key和value類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //step 6:設(shè)置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        //step 7:提交job
        boolean result = job.waitForCompletion(true);//此處進(jìn)入作業(yè)提交流程,然后循環(huán)監(jiān)控作業(yè)狀態(tài) 
        System.exit(result ? 0 : 1);
    }

三、流程主要代碼講解

  • waitForCompletion方法中主要是提交作業(yè)和監(jiān)控狀態(tài),這里我們主要分析作業(yè)提交
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      //提交作業(yè)
      submit();
    }
    if (verbose) {
      //監(jiān)控job,并實(shí)時(shí)打印任務(wù)狀態(tài)
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }
  • submit方法中主要是:1.連接集群。2.作業(yè)提交
public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    //連接集群(lcoal ,Yarn)
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
         //提交作業(yè)
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
  • 連接集群時(shí)會(huì)通過conf構(gòu)造一個(gè)cluster實(shí)例,重要的是cluster的初始化部份
private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }
public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            //實(shí)際上是根據(jù)你的配置創(chuàng)建YarnRunner對(duì)象,還是LocalJobRunner對(duì)象
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }
          //初始化cluster內(nèi)部成員變量
          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }
  • YARNRunner 構(gòu)造方法
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
      ClientCache clientCache) {
    this.conf = conf;
    try {
      this.resMgrDelegate = resMgrDelegate;
      this.clientCache = clientCache;
      this.defaultFileContext = FileContext.getFileContext(this.conf);
    } catch (UnsupportedFileSystemException ufe) {
      throw new RuntimeException("Error in instantiating YarnClient", ufe);
    }
  }
  • ResourceMgrDelegate構(gòu)造方法
public ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
    this.conf = conf;
    //實(shí)際上是創(chuàng)建YarnClientImpl對(duì)象
    this.client = YarnClient.createYarnClient();
    init(conf);
    //實(shí)際上是創(chuàng)建rmClient對(duì)象
    start();
  }
  • 接下來看JobSubmitter.submitJobInternal
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
    
    //檢測(cè)輸出目錄合法性,是否已存在,或未設(shè)置
    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
    //獲取staging路徑,用以存放作業(yè)執(zhí)行過程中用到的文件,默認(rèn)位置/tmp/hadoop-yarn/staging/hadoop/.staging ,可通過yarn.app.mapreduce.am.staging-dir修改
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    //主機(jī)名和地址設(shè)置 
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
     //獲取新的JobID,此處需要RPC調(diào)用
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    //獲取提交的job目錄:/tmp/hadoop-yarn/staging/root/.staging/job_xxxxxx
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
         
          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      //向集群中拷貝所需文件,包含-files, -libjars and -archives和你作業(yè)的jar包
      copyAndConfigureFiles(job, submitJobDir);
      
      //設(shè)置job.xml路徑
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
     
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
       // 寫分片文件(job.split文件、job.splitmetainfo文件)到提交的目錄
       //獲取map數(shù)量
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      //設(shè)置隊(duì)列名
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      //將本job信息寫入job.xml 到提交的目錄
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      //真正提交job到RM,底層是通過rmClient提交應(yīng)用
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          //刪除提交的目錄
          jtFs.delete(submitJobDir, true);
      }
    }
  }
  • writeSplits方法中主要的writeNewSplits方法
private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    //反射獲取輸入的inputFormat類型,默認(rèn)是TextInputFormat
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    //inputFomat對(duì)讀取的文件進(jìn)行分片操作
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    //對(duì)分片進(jìn)行排序,把分片大的放數(shù)組前面
    Arrays.sort(array, new SplitComparator());
    //將分片文件寫入提交的目錄中去
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }
  • 默認(rèn)是走的TextInputFormat的getSplit方法
public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    //分片最小size,默認(rèn)為1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //分片最大size,默認(rèn)為L(zhǎng)ong.MAX_VALUE
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //獲取文件的文件狀態(tài)
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      //如果文件大小不為0
      if (length != 0) {
        //獲取文件block地址
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判斷文件是否可分割
        if (isSplitable(job, path)) {
          //獲取文件blocksize,默認(rèn)128M
          long blockSize = file.getBlockSize();
          //計(jì)算splitSize,具體就是取中間那個(gè)。
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          //如果bytesRemaining)/splitSize>1.1就執(zhí)行
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
          //添加最后一個(gè)塊信息到splits數(shù)組中
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          //不可分的,直接作為一個(gè)塊加到splits數(shù)組中
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.elapsedMillis());
    }
    return splits;
  }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 目的這篇教程從用戶的角度出發(fā),全面地介紹了Hadoop Map/Reduce框架的各個(gè)方面。先決條件請(qǐng)先確認(rèn)Had...
    SeanC52111閱讀 1,836評(píng)論 0 1
  • 前言 Mapreduce 是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開發(fā)“基于 hadoop的數(shù)據(jù)分析 應(yīng)用”的核心框...
    叫我不矜持閱讀 608評(píng)論 0 3
  • 預(yù)覽 Hadoop MapReduce是一個(gè)軟件框架,用于編寫并行處理海量數(shù)據(jù)的應(yīng)用程序,應(yīng)用程序運(yùn)行在一個(gè)通用硬...
    sakersun閱讀 1,756評(píng)論 0 1
  • 詞/修緣 拉著妹妹的手趕著羊群 奔向山坡上的草叢中 潔白的羊群 散落在綠色草...
    賀文聰閱讀 251評(píng)論 0 2
  • 我是佛祖蓮花池中的朝露,晨鐘響時(shí)我便彌散。彌散的我自由自在,隨性停留在佛陀、菩薩、金剛、羅漢、各路神仙周圍,看他們...
    長(zhǎng)風(fēng)18閱讀 852評(píng)論 0 0

友情鏈接更多精彩內(nèi)容