MapReduce 原理和源碼-實(shí)戰(zhàn)

Local模式下的 MapReduce 計(jì)算步驟(圖解)

步驟詳解

  1. "main"線程中完成input切片和Job提交
  • 本地構(gòu)建submitJobDir臨時(shí)目錄
  • 根據(jù)InputPath文件數(shù)和blockSize大小進(jìn)行分片: InputSplit[]
  • 將分片信息和作業(yè)配置一起寫入到submitJobDir中;
  • 將submitJobDir下所有上傳(本地告知)MR作業(yè)Job,完成提交;
  1. "?"線程完成作業(yè)信息的接收和解析
  • 處理submitJobDir提交上來的信息;
  • 根據(jù)conf信息創(chuàng)建相關(guān)類
  1. "Thread-3"的Job.run()線程執(zhí)行Job的運(yùn)算
  • 根據(jù)分片構(gòu)建Map任務(wù)的List<RunnableWithThrowable>
  • 遍歷提交每個(gè)MapTask線程任務(wù);
    • 下接: "LocalJobRunner Map Task"線程的 JobRunner.MapTaskRunnable.run()
  • 根據(jù)job.reduces數(shù)量構(gòu)建Reduce的線程任務(wù)reduceRunnables: List<RunnableWithThrowable>
  • 遍歷并執(zhí)行每個(gè)ReduceTask線程任務(wù);
    • 下接: "pool-n-thread-1"線程中的 ReduceTaskRunnable.run()方法;
  1. "LocalJobRunner Map Task"線程: 完成一個(gè)分配Map計(jì)算;
  • 獲取map對應(yīng)分片信息和數(shù)據(jù),
  • 構(gòu)建MapTask任務(wù)
  • input.init加載分片輸入數(shù)據(jù);
  • mapper.run() 循環(huán)調(diào)用User定義的map()方法完成計(jì)算;
  • 將Map結(jié)果寫入中間shuffle文件并告知Tracker;
  1. "pool-3-thread-1"線程完成一次Reduce計(jì)算
  • 構(gòu)建ReduceTask任務(wù);
  • 構(gòu)建shuffleContext 洗牌任務(wù)并執(zhí)行shuffle.run()洗牌;
  • 基于shuffle后的結(jié)果數(shù)據(jù), 封裝進(jìn)KeyValueIterator迭代器中, 并執(zhí)行reducer.reduce()聚合計(jì)算;
  • 將reduce計(jì)算結(jié)果寫入保存到OutputPath路徑上;
  • 結(jié)束Job并釋放/消耗相應(yīng)資源;

"main"線程切片和提交

JobSubmitter.submitJobInternal源碼.png

構(gòu)建上傳目錄submitJobDir的原理源碼

源碼詳解:

JobSubmitter.submitJobInternal(){
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);{
        client.getStagingAreaDir();{
            Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging"));
            user = ugi.getShortUserName() + rand.nextInt(Integer.MAX_VALUE);//username-randid;
            return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
        }
    }
    JobID jobId = submitClient.getNewJobID();{//LocalJobRunner.getNewJobID(): 以local+上面rand.nextInt(MAX)生成的隨機(jī)數(shù)+0/n, 作為jobId;
        return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
    }
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
}
LocalJobRunner.getStagingAreaDir()方法.png
生成submitJobDir的構(gòu)成.png

1.2 計(jì)算并寫入split信息: writeSplits()

job.split文件: 應(yīng)該是對應(yīng)每個(gè)FileSplit, 他的Zfile類型和存儲(chǔ)Path;

// job.split的內(nèi)容
org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_33M.data   

/org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_10M.data

// job.splitmetainfo的內(nèi)容, ?
4d45 5441 2d53 504c 0102 0109 6c6f 6361
6c68 6f73 7407 8c02 1000 0001 096c 6f63
616c 686f 7374 8fa2 8da0 0004 

MR Driver端的 Submit MR Job的邏輯

  • 先構(gòu)建本地 submitJobDir臨時(shí)目錄;
  • 再 根據(jù)InputPath 獲取文件分片 splits
  • 最后 分片和配置信息, 統(tǒng)一writeConf 寫入到 conf文件中;
  • submitJob() 將Job和配置, jars都上傳到遠(yuǎn)程MR服務(wù);
UserDriver.main(){
    
    job.waitForCompletion(true){//Job: 
        if (state == JobState.DEFINE) {
            submit();{
                setUseNewAPI();//設(shè)置新的Api
                connect();{//創(chuàng)建Cluster對象,用于表示與目標(biāo)FileSystem建立了連接;
                    if (cluster == null) {
                         cluster =  ugi.doAs(new PrivilegedExceptionAction<Cluster>() { return new Cluster(getConfiguration());  });
                    }
                }
                // 構(gòu)建JobSubmitter , 主要是封裝了: jtFs:目標(biāo)文件系統(tǒng)(本地/hdfs), submitClient(通信客戶端), hostAndPort: 地址端口;
                final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
                
                status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
                    run(){return submitter.submitJobInternal(Job.this, cluster);{//JobSubmitter.submitJobInternal()
                        // 校驗(yàn)Jobs 的輸出格式和合法性;
                        checkSpecs(job);
                        
                        //根據(jù)(哈希)算法創(chuàng)建本地 submitJobDir目錄,用于收集/存放/提交 Job所需配置和資源;
                        Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
                        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
                        
                        try{
                            copyAndConfigureFiles(job, submitJobDir);
                            
                            /* 核心代碼: 根據(jù)file文件數(shù)量, 創(chuàng)建相關(guān)MR任務(wù)的分片/分區(qū): splits
                            *
                            */
                            int maps = writeSplits(job, submitJobDir);{//JobSubmitter.writeSplits()
                                if (jConf.getUseNewMapper()) {//mapred.mapper.new-api==true屬性時(shí),用新的api
                                    maps = writeNewSplits(job, jobSubmitDir);{
                                        List<InputSplit> splits = input.getSplits(job);{//FileInputFormat.getSplits()
                                            List<FileStatus> files = listStatus(job);{
                                                Path[] dirs = getInputPaths(job);{//FileInputFormat.getInputPaths()
                                                    String dirs = context.getConfiguration().get(INPUT_DIR, "");// FileInputFormat.setInputPaths() => mapreduce.input.fileinputformat.inputdir (INPUT_DIR) ,即傳入Input路勁;
                                                    Path[] result = new Path[list.length];
                                                    return result;
                                                }
                                                
                                                // 根據(jù)是否遞歸,遞歸獲取子文件
                                                boolean recursive = getInputDirRecursive(job);//INPUT_DIR_RECURSIVE參數(shù)(mapreduce.input.fileinputformat.input.dir.recursive)設(shè)定
                                            }
                                        }
                                        
                                        T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
                                        Arrays.sort(array, new SplitComparator());
                                        return array.length;
                                    }
                                }
                            }
                            conf.setInt(MRJobConfig.NUM_MAPS, maps);
                            
                            writeConf(conf, submitJobFile);{
                                FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
                                conf.writeXml(out);
                            }
                            status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
                        }finally{
                            jtFs.delete(submitJobDir, true); //清除 submitJobDir臨時(shí)提交目錄;
                        }
                    }}
                });
            }
        }
        if (verbose) {//打印狀態(tài);
            monitorAndPrintJob();
        }
    }
}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容