提交
提交源碼
//true代表打印執(zhí)行過程
boolean b = job.waitForCompletion(true);
Job.java
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException, ClassNotFoundException {
//Job現(xiàn)在狀態(tài)還是DEFINE,會(huì)執(zhí)行submit
if (state == JobState.DEFINE) {
submit();
if (verbose) {
//監(jiān)控打印job的信息,執(zhí)行完臨時(shí)目錄的內(nèi)容就刪掉了
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
}
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//確認(rèn)狀態(tài)
ensureState(JobState.DEFINE);
//設(shè)置新舊api兼容
setUseNewAPI();
//設(shè)置連接,如果沒設(shè)置用戶,doAs設(shè)置當(dāng)前主機(jī)為登錄用戶
connect();
//設(shè)置提交job的fs,客戶端,狀態(tài)等信息
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//墨跡半天終于提交了
return submitter.submitJobInternal(Job.this, cluster);
//提交流程結(jié)束,JobState改變?yōu)閞unning
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
提交進(jìn)JobSubmitter.java
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//檢查輸出路徑
checkSpecs(job);
//進(jìn)去看一眼,全是site.xml,存的都是配置文件
Configuration conf = job.getConfiguration();
//字面意思,添加MR框架去分布式緩存
//大概就是把配置文件發(fā)給每個(gè)maptask和//reducetask addMRFrameworkToDistributedCache(conf);
//會(huì)生成一個(gè)臨時(shí)目錄,可為什么在d盤啊
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
//生成了唯一標(biāo)識(shí)jobid
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//在剛才生成的臨時(shí)目錄下定義路徑,準(zhǔn)備生成
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
//配置suffleid/qeque什么的
...
//這個(gè)方法一執(zhí)行,剛才的定義的路徑就生成文件夾了
//進(jìn)到JobSubmitter里了,執(zhí)行rUploader.uploadResources
//然后進(jìn)JobResourceUploader里,mkdir生成了文件夾
//uploadJobJar,集群模式?jīng)]jar,要提交到集群,所以先提交到文件夾里
copyAndConfigureFiles(job, submitJobDir);
//路徑是文件夾路徑/job.xml
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
//切片源碼,執(zhí)行完切片文件放入剛才文件夾
int maps = writeSplits(job, submitJobDir);
//設(shè)置切片個(gè)數(shù),切片賦給MRJobConfig.NUM_MAPS了
conf.setInt(MRJobConfig.NUM_MAPS, maps);
...
//把xml放進(jìn)來了,都是配置信息
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
//設(shè)置job為的status變?yōu)閞unning
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
切片
切片源碼
JobSubmitter.java
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
//jobSubmitDir 存儲(chǔ)路徑(就是生成那個(gè)臨時(shí)文件夾)
//jConf是job的信息,id什么的
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//hadoop3.x的切片方式,我就進(jìn)入這個(gè)方法。
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
//hadoop2.x的切片方式
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
//hadoop3.x具體切片
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//切片了!!!!!!!!!!!
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
InputFormat有很多實(shí)現(xiàn)類。默認(rèn)進(jìn)入FileInputFormat.java實(shí)現(xiàn)類
FileInputFormat.java
public static long getMaxSplitSize(JobContext context) {
//SPLIT_MAXSIZE不設(shè)置沒有
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
//切片最小的size,getFormatMinSplitSize()返回1
//getMinSplitSize()設(shè)置在yarn-default里,默認(rèn)值是0
//mapreduce.input.fileinputformat.split.minsize
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//切片最大的size
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
...
循環(huán)遍歷文件列表進(jìn)行切片,說明是按照文件進(jìn)行切分
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//文件是否可以切割
if (isSplitable(job, path)) {
//獲取塊大小
long blockSize = file.getBlockSize();
//獲取切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//一個(gè)文件的大小
long bytesRemaining = length;
//SPLIT_SLOP=1.1
//如果文件大于切片大小的1.1倍就切片
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
//感覺就是循環(huán)切完片了,如果還剩下點(diǎn),就放在最后一片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}else {
// not splitable
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
}

圖片.png

圖片.png

圖片.png
提交

圖片.png
切片

圖片.png