Local模式下的 MapReduce 計(jì)算步驟(圖解)
步驟詳解
- "main"線程中完成input切片和Job提交
- 本地構(gòu)建submitJobDir臨時(shí)目錄
- 根據(jù)InputPath文件數(shù)和blockSize大小進(jìn)行分片: InputSplit[]
- 將分片信息和作業(yè)配置一起寫入到submitJobDir中;
- 將submitJobDir下所有上傳(本地告知)MR作業(yè)Job,完成提交;
- "?"線程完成作業(yè)信息的接收和解析
- 處理submitJobDir提交上來的信息;
- 根據(jù)conf信息創(chuàng)建相關(guān)類
- "Thread-3"的Job.run()線程執(zhí)行Job的運(yùn)算
- 根據(jù)分片構(gòu)建Map任務(wù)的List<RunnableWithThrowable>
- 遍歷提交每個(gè)MapTask線程任務(wù);
- 下接: "LocalJobRunner Map Task"線程的 JobRunner.MapTaskRunnable.run()
- 根據(jù)job.reduces數(shù)量構(gòu)建Reduce的線程任務(wù)reduceRunnables: List<RunnableWithThrowable>
- 遍歷并執(zhí)行每個(gè)ReduceTask線程任務(wù);
- 下接: "pool-n-thread-1"線程中的 ReduceTaskRunnable.run()方法;
- "LocalJobRunner Map Task"線程: 完成一個(gè)分配Map計(jì)算;
- 獲取map對應(yīng)分片信息和數(shù)據(jù),
- 構(gòu)建MapTask任務(wù)
- input.init加載分片輸入數(shù)據(jù);
- mapper.run() 循環(huán)調(diào)用User定義的map()方法完成計(jì)算;
- 將Map結(jié)果寫入中間shuffle文件并告知Tracker;
- "pool-3-thread-1"線程完成一次Reduce計(jì)算
- 構(gòu)建ReduceTask任務(wù);
- 構(gòu)建shuffleContext 洗牌任務(wù)并執(zhí)行shuffle.run()洗牌;
- 基于shuffle后的結(jié)果數(shù)據(jù), 封裝進(jìn)KeyValueIterator迭代器中, 并執(zhí)行reducer.reduce()聚合計(jì)算;
- 將reduce計(jì)算結(jié)果寫入保存到OutputPath路徑上;
- 結(jié)束Job并釋放/消耗相應(yīng)資源;
"main"線程切片和提交

JobSubmitter.submitJobInternal源碼.png
構(gòu)建上傳目錄submitJobDir的原理源碼
源碼詳解:
JobSubmitter.submitJobInternal(){
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);{
client.getStagingAreaDir();{
Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, "/tmp/hadoop/mapred/staging"));
user = ugi.getShortUserName() + rand.nextInt(Integer.MAX_VALUE);//username-randid;
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
}
}
JobID jobId = submitClient.getNewJobID();{//LocalJobRunner.getNewJobID(): 以local+上面rand.nextInt(MAX)生成的隨機(jī)數(shù)+0/n, 作為jobId;
return new org.apache.hadoop.mapreduce.JobID("local" + randid, ++jobid);
}
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
}

LocalJobRunner.getStagingAreaDir()方法.png

生成submitJobDir的構(gòu)成.png
1.2 計(jì)算并寫入split信息: writeSplits()
job.split文件: 應(yīng)該是對應(yīng)每個(gè)FileSplit, 他的Zfile類型和存儲(chǔ)Path;
// job.split的內(nèi)容
org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_33M.data
/org.apache.hadoop.mapreduce.lib.input.FileSplitZfile:/E:/ws/ws-idea/ruoze-study/g9-proj-0701/G9-03/hadoop-cdh-demo/input/wc/words_10M.data
// job.splitmetainfo的內(nèi)容, ?
4d45 5441 2d53 504c 0102 0109 6c6f 6361
6c68 6f73 7407 8c02 1000 0001 096c 6f63
616c 686f 7374 8fa2 8da0 0004
MR Driver端的 Submit MR Job的邏輯
- 先構(gòu)建本地 submitJobDir臨時(shí)目錄;
- 再 根據(jù)InputPath 獲取文件分片 splits
- 最后 分片和配置信息, 統(tǒng)一writeConf 寫入到 conf文件中;
- submitJob() 將Job和配置, jars都上傳到遠(yuǎn)程MR服務(wù);
UserDriver.main(){
job.waitForCompletion(true){//Job:
if (state == JobState.DEFINE) {
submit();{
setUseNewAPI();//設(shè)置新的Api
connect();{//創(chuàng)建Cluster對象,用于表示與目標(biāo)FileSystem建立了連接;
if (cluster == null) {
cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { return new Cluster(getConfiguration()); });
}
}
// 構(gòu)建JobSubmitter , 主要是封裝了: jtFs:目標(biāo)文件系統(tǒng)(本地/hdfs), submitClient(通信客戶端), hostAndPort: 地址端口;
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
run(){return submitter.submitJobInternal(Job.this, cluster);{//JobSubmitter.submitJobInternal()
// 校驗(yàn)Jobs 的輸出格式和合法性;
checkSpecs(job);
//根據(jù)(哈希)算法創(chuàng)建本地 submitJobDir目錄,用于收集/存放/提交 Job所需配置和資源;
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
try{
copyAndConfigureFiles(job, submitJobDir);
/* 核心代碼: 根據(jù)file文件數(shù)量, 創(chuàng)建相關(guān)MR任務(wù)的分片/分區(qū): splits
*
*/
int maps = writeSplits(job, submitJobDir);{//JobSubmitter.writeSplits()
if (jConf.getUseNewMapper()) {//mapred.mapper.new-api==true屬性時(shí),用新的api
maps = writeNewSplits(job, jobSubmitDir);{
List<InputSplit> splits = input.getSplits(job);{//FileInputFormat.getSplits()
List<FileStatus> files = listStatus(job);{
Path[] dirs = getInputPaths(job);{//FileInputFormat.getInputPaths()
String dirs = context.getConfiguration().get(INPUT_DIR, "");// FileInputFormat.setInputPaths() => mapreduce.input.fileinputformat.inputdir (INPUT_DIR) ,即傳入Input路勁;
Path[] result = new Path[list.length];
return result;
}
// 根據(jù)是否遞歸,遞歸獲取子文件
boolean recursive = getInputDirRecursive(job);//INPUT_DIR_RECURSIVE參數(shù)(mapreduce.input.fileinputformat.input.dir.recursive)設(shè)定
}
}
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
Arrays.sort(array, new SplitComparator());
return array.length;
}
}
}
conf.setInt(MRJobConfig.NUM_MAPS, maps);
writeConf(conf, submitJobFile);{
FSDataOutputStream out = FileSystem.create(jtFs, jobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
conf.writeXml(out);
}
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
}finally{
jtFs.delete(submitJobDir, true); //清除 submitJobDir臨時(shí)提交目錄;
}
}}
});
}
}
if (verbose) {//打印狀態(tài);
monitorAndPrintJob();
}
}
}