
YARN:資源(linux資源隔離機(jī)制:運(yùn)算資源---運(yùn)算程序jar/配置文件/CPU/內(nèi)存/IO--從linux中開辟出諸如內(nèi)存、處理器的container虛擬容器類似docker、openstack)調(diào)度系統(tǒng),負(fù)責(zé)管理資源調(diào)度和任務(wù)分配
1、mr程序提交的客戶端所在的節(jié)點(diǎn)運(yùn)行/XX/XX/XX.jar,其中的main方法submit任務(wù)后會(huì)調(diào)用YARNRUNNER類,實(shí)現(xiàn)了clientprotocol(總RPC協(xié)議)。因此該類可以申請(qǐng)?zhí)峤灰粋€(gè)application并獲得application的資源路徑
2、resources manager返回資源提交的路徑和application_id。
3、客戶端提交job運(yùn)行所需要的資源文件
4、客戶端通過RPC告知resources manager資源提交完畢,申請(qǐng)運(yùn)行mr appmaster。由于一個(gè)公司有很多個(gè)作業(yè)在運(yùn)行和提交,因此resources manager有一個(gè)隊(duì)列,使用了FIFO調(diào)度策略
5、將用戶的請(qǐng)求初始化成一個(gè)task,并將task放進(jìn)隊(duì)列,等待nodemanager來獲取任務(wù)task
6、nodemanager通過心跳機(jī)制領(lǐng)取到task任務(wù)
7、領(lǐng)取到任務(wù)的nodemanager將產(chǎn)生一個(gè)容器container,包括cpu和ram。并且從集群中下載YARNRUNNER提交的資源到本地,并且啟動(dòng) appmaster,appmaster就能讀取到配置文件的相關(guān)信息,這里resources manager和nodemanager都不知道這些相關(guān)信息,只有自己的程序appmaster知道,比如切片信息、mapTask和reduceTask的數(shù)量。
8、申請(qǐng)運(yùn)行mapTask的容器container(這個(gè)過程類似客戶端申請(qǐng)容器container,名字默認(rèn)為yarnchild)。
9、其他nodemanager從resources manager領(lǐng)取到任務(wù),并且創(chuàng)建容器container.
10、發(fā)送程序啟動(dòng)腳本 java -cp...,啟動(dòng)程序通過cpu+ram+jar來執(zhí)行mapTask代碼并且輸出分區(qū)且有序的文件。容錯(cuò):如果有一個(gè)mapTask執(zhí)行失敗,請(qǐng)重新申請(qǐng)一個(gè)容器container。任務(wù)備份:一個(gè)mapTask執(zhí)行慢,將重新申請(qǐng)container執(zhí)行備份任務(wù),取較快者。
11、appmaster想RM申請(qǐng)指定數(shù)量的容器,運(yùn)行reduceTask程序。
12、reduce節(jié)點(diǎn)向所有map節(jié)點(diǎn)獲取相應(yīng)分區(qū)的數(shù)據(jù)并執(zhí)行,執(zhí)行完成后reduce節(jié)點(diǎn)會(huì)回收container容器
13、application運(yùn)行完畢后,mr appmaster會(huì)向RM注銷自己
本地模式和集群模式區(qū)別詳情見https://blog.csdn.net/ForgetThatNight/article/details/78570234#t17 3-2-2
可以在Windows上本地跑mapreduce程序,但是需要配置環(huán)境變量
package cn.itcast.bigdata.mr.wcdemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 相當(dāng)于一個(gè)yarn集群的客戶端
* 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包
* 最后提交給yarn
* @author
*
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//是否運(yùn)行為本地模式,就是看這個(gè)參數(shù)值是否為local,默認(rèn)就是local
/*conf.set("mapreduce.framework.name", "local");*/
本地模式運(yùn)行mr程序時(shí),輸入輸出的數(shù)據(jù)可以在本地,也可以在hdfs上
到底在哪里,就看以下兩行配置你用哪行,默認(rèn)就是file:///
/*conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
/*conf.set("fs.defaultFS", "file:///");*/
//運(yùn)行集群模式,就是把程序提交到y(tǒng)arn中去運(yùn)行
//要想運(yùn)行為集群模式,以下3個(gè)參數(shù)要指定為集群上的值
/*conf.set("mapreduce.framework.name", "yarn");
conf.set("yarn.resourcemanager.hostname", "mini1");
conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
Job job = Job.getInstance(conf);
job.setJar("c:/wc.jar");
//指定本程序的jar包所在的本地路徑
/*job.setJarByClass(WordcountDriver.class);*/
//指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper輸出數(shù)據(jù)的kv類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的數(shù)據(jù)的kv類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要使用combiner,以及用哪個(gè)類作為combiner的邏輯
/*job.setCombinerClass(WordcountCombiner.class);*/
job.setCombinerClass(WordcountReducer.class);
//如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputformat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
//指定job的輸入原始文件所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結(jié)果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包,提交給yarn去運(yùn)行
/*job.submit();*/
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}