前言
Mapreduce 是一個分布式運算程序的編程框架,是用戶開發(fā)“基于 hadoop的數(shù)據(jù)分析 應用”的核心框架。Mapreduce 核心功能是將用戶編寫的業(yè)務邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發(fā)運行在一個 hadoop 集群上。

一.waitForCompletion
在mapreduce程序的job類中,我們通過set Configuration對象,得到相應的job對象,在job對象中指定Mapper類、Reducer類,Job類等屬性后,通過waitForCompletion(true)方法提交并等待job執(zhí)行。傳入的boolean類型參數(shù)決定是否監(jiān)控并打印job的執(zhí)行情況。
public class MyJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length!=2){
System.out.println("請傳入?yún)?shù)");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
if (inputPath == null || inputPath == "" || outputPath == null || outputPath == ""){
System.out.println("參數(shù)有誤");
System.exit(1);
}
Configuration conf = new Configuration(true);
Path out = new Path(outputPath);
if (out.getFileSystem(conf).exists(out)){
System.out.println("HDFS輸出目錄已存在");
System.exit(1);
}
//構建job類
Job job = Job.getInstance(conf);
//設置運行主類
job.setJarByClass(MyJob.class);
//作業(yè)名稱
job.setJobName("job");
//設置輸入輸出路徑
FileInputFormat.addInputPath(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,out);
//先是format
//job.setInputFormatClass();
//mapTask
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(TQ.class);
job.setMapOutputValueClass(IntWritable.class);
//然后是排序比較器
job.setSortComparatorClass(TQSortComparator.class);
//partition分組規(guī)則
job.setPartitionerClass(TQPartitioner.class);
//分組比較器
job.setGroupingComparatorClass(TQGroupComparator.class);
//reduceTask
job.setReducerClass(MyReduce.class);
//設置ReduceTask數(shù)量
job.setNumReduceTasks(2);
//提交任務完成
job.waitForCompletion(true);
}
}
現(xiàn)在我們進入Job類中的waitForCompletion()方法查看,該方法傳入一個布爾值參數(shù)。方法首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法提交job。而后根據(jù)傳入的參數(shù)決定是否監(jiān)控并打印job的運行狀況。
該方法每隔 1 秒輪詢作業(yè)的進度,如果進度有所變化,將該進度報告給控制臺(console)。當作業(yè)成功完成,作業(yè)計數(shù)器被顯示出來。否則,導致作業(yè)失敗的錯誤被記錄到控制臺。
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//首先檢查Job狀態(tài),若處于DEFINE狀態(tài)則通過submit()方法向集群提交job
if (state == JobState.DEFINE) {
submit();
}
//若傳入?yún)?shù)為true,則監(jiān)控并打印job運行情況
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
二.submit()方法
waitForCompletion內部主要處理任務是調用了submit方法,接下來我們關注核心點submit方法。
該方法負責向集群提交job,方法首先再次檢查job的狀態(tài),如果不是DEFINE則不能提交作業(yè),setUseNewAPI()方法作用是指定job使用的是新版mapreduce的API,即org.apache.hadoop.mapreduce包下的Mapper和Reducer,而不是老版的mapred包下的類。
submit()中執(zhí)行了兩個比較重要的方法:
其一,connect()方法會對Job類中的Cluster類型的成員進行初始化,該成員對象中封裝了通過Configuration設置的集群的信息,其內部創(chuàng)建了真正的通信協(xié)議對象,它將用于最終的job提交。
其二,getJobSubmitter()方法通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對象,該對象負責最終向集群提交job并返回job的運行進度。最后job提交器對象submitter.submitJobInternal(Job.this, cluster)將當前job對象提交到cluster中,并返回job運行狀態(tài)給status成員,該方法是JobSubmitter中最核心的功能代碼。提交成功后,JobState被設置為RUNNING,表示當前job進入運行階段,最后控制臺中打印跟蹤job運行狀況的URL。
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
//通過cluster中封裝的集群信息(這里是文件系統(tǒng)和客戶端)獲取JobSubmitter對象,該對象負責最終向集群提交job并返回job的運行進度
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
三.submitJobInternal方法
可以看到在 Job 對象上面調用 submit() 方法之后,在內部創(chuàng)建一個 JobSubmitter 實例,然后調用該實例的 submitJobInternal() 方法。
任務提交器(JobSubmitter)是最終提交任務到集群的方法。
submitJobInternal的執(zhí)行過程如下:
1.首先checkSpecs(job)方法檢查作業(yè)輸出路徑是否配置并且是否存在。正確情況是已經(jīng)配置且不存在,輸出路徑的配置參數(shù)為mapreduce.output.fileoutputformat.outputdir
2.而后獲取job中封裝的Configuration對象,添加MAPREDUCE_APPLICATION_FRAMEWORK_PATH(應用框架路徑)到分布式緩存中。
3.通過JobSubmissionFiles中的靜態(tài)方法getStagingDir()獲取作業(yè)執(zhí)行時相關資源的存放路徑。默認路徑是: /tmp/hadoop-yarn/staging/root/.staging
4.獲取提交任務的當前主機的IP,并將ip、主機名等相關信息封裝進Configuration對象中。
5.生成JobID并將其設置進job對象中,構造提交job的路徑。然后是對該路徑設置一系列權限的操作。
6.copyAndConfigureFiles,拷貝作業(yè)運行必備的資源,作業(yè) JAR 文件,作業(yè) JAR 文件以一個高副本因子(a high replication factor)進行拷貝(由 mapreduce.client.submit.file.replication 屬性控制,默認值為 10),所以在作業(yè)任務運行時,在集群中有很多的作業(yè) JAR 副本供節(jié)點管理器來訪問。
7.調用writeSplits()方法,(非常重要)為作業(yè)計算輸入分片(input splits)。寫分片數(shù)據(jù)文件job.splits和分片元數(shù)據(jù)文件job.splitmetainfo,計算map任務數(shù)。
8.writeConf()方法,寫 xml 配置文件
9.提交作業(yè)submitClient.submitJob,通過在資源管理器上調用 submitApplication 來提交作業(yè)。
源碼如下
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
int keyLen = CryptoUtils.isShuffleEncrypted(conf)
? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
: SHUFFLE_KEY_LENGTH;
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(keyLen);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// Write job file to submit dir
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
四.writeSplits 方法
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
使用newAPI將會調用writeNewSplits()方法
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
//如果使用newAPI則調用writeNewSplits
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
writeNewSplits()方法將會根據(jù)我們設置的inputFormat.class通過反射獲得inputFormat對象input,然后調用inputFormat對象的getSplits方法,當獲得分片信息之后調用JobSplitWriter.createSplitFiles方法將分片的信息寫入到submitJobDir/job.split文件中。
反射獲取InputFormat 格式化的規(guī)則,如果用戶不指定,默認TextInputFormat。
五.FileInputFormat類中的getSplits()方法
這里我們需要注意這一代碼:
....
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
....
if (isSplitable(job, path)) {
//獲取塊的大小
long blockSize = file.getBlockSize();
//計算切片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//分塊中剩余的字節(jié)大小
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
....
}
這里的之前代碼首先獲取long minSize 和long maxSize,然后計算切片大小。
計算規(guī)則如下:
computeSplitSize >>> Math.max(minSize, Math.min(maxSize, blockSize));
如果想要修改切片的最大值就修改minSize,最小值就修改MaxSize,反著來就行。
這里的while循環(huán)中的判定語句作用是判斷分塊中剩余的字節(jié)大小與預設分片大小的比例是否超過某個限定值SPLIT_SLOP,該值是一個常量,為1.1,在FileInputFormat類中定義。也就是說當剩余字節(jié)大于預設分片大小的110%后,對剩余的文件繼續(xù)分片,否則不足110%,直接將剩余文件生成一個分片。
private static final double SPLIT_SLOP = 1.1;
之后生成切片信息,放入到切片集合中。
這里注意切片的5個組成部分
- Path
- 起始偏移量
- 切片大小
- 切片的存儲的host地址
- 切片的副本的host地址
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
回到JobSubmitter的writeNewSplits方法中,這里對切片進行排序,根據(jù)大小將拆分排序,以便最大的拆分優(yōu)先
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
之后調用createSplitFiles,根據(jù)切片清單創(chuàng)建元數(shù)據(jù)信息文件,主要是將數(shù)據(jù)寫入到job.split和job.splitmetainfo文件中。由createSplitFiles函數(shù)完成核心功能。
public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
Configuration conf, FileSystem fs, T[] splits)
throws IOException, InterruptedException {
//創(chuàng)建job.split文件,并以流的方式打開該文件
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
//將所有分片(InputSplit的實例)的信息都寫入job.split文件中
//同時會返回各個分片的原數(shù)據(jù)信息,放入info數(shù)組中
SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
out.close();
//將info中的split元數(shù)據(jù)信息寫入到job.splitmetainfo文件中
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
info);
}
六.submitJob真正的提交作業(yè)
submitClient.submitJob,會調用YARN的submitJob方法。構造開始一個MapReduce的ApplicationMaster的必要信息。
// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
調用submitApplication(appContext),提交到ResourceManager。
下一篇文章會對作業(yè)的初始化源碼進行分析。敬請關注