圖解mapreduce&yarn的工作機(jī)制

mapreduce&yarn的工作機(jī)制----吸星大法.png

YARN:資源(linux資源隔離機(jī)制:運(yùn)算資源---運(yùn)算程序jar/配置文件/CPU/內(nèi)存/IO--從linux中開辟出諸如內(nèi)存、處理器的container虛擬容器類似docker、openstack)調(diào)度系統(tǒng),負(fù)責(zé)管理資源調(diào)度和任務(wù)分配
1、mr程序提交的客戶端所在的節(jié)點(diǎn)運(yùn)行/XX/XX/XX.jar,其中的main方法submit任務(wù)后會(huì)調(diào)用YARNRUNNER類,實(shí)現(xiàn)了clientprotocol(總RPC協(xié)議)。因此該類可以申請(qǐng)?zhí)峤灰粋€(gè)application并獲得application的資源路徑
2、resources manager返回資源提交的路徑和application_id。
3、客戶端提交job運(yùn)行所需要的資源文件
4、客戶端通過RPC告知resources manager資源提交完畢,申請(qǐng)運(yùn)行mr appmaster。由于一個(gè)公司有很多個(gè)作業(yè)在運(yùn)行和提交,因此resources manager有一個(gè)隊(duì)列,使用了FIFO調(diào)度策略
5、將用戶的請(qǐng)求初始化成一個(gè)task,并將task放進(jìn)隊(duì)列,等待nodemanager來獲取任務(wù)task
6、nodemanager通過心跳機(jī)制領(lǐng)取到task任務(wù)
7、領(lǐng)取到任務(wù)的nodemanager將產(chǎn)生一個(gè)容器container,包括cpu和ram。并且從集群中下載YARNRUNNER提交的資源到本地,并且啟動(dòng) appmaster,appmaster就能讀取到配置文件的相關(guān)信息,這里resources manager和nodemanager都不知道這些相關(guān)信息,只有自己的程序appmaster知道,比如切片信息、mapTask和reduceTask的數(shù)量。
8、申請(qǐng)運(yùn)行mapTask的容器container(這個(gè)過程類似客戶端申請(qǐng)容器container,名字默認(rèn)為yarnchild)。
9、其他nodemanager從resources manager領(lǐng)取到任務(wù),并且創(chuàng)建容器container.
10、發(fā)送程序啟動(dòng)腳本 java -cp...,啟動(dòng)程序通過cpu+ram+jar來執(zhí)行mapTask代碼并且輸出分區(qū)且有序的文件。容錯(cuò):如果有一個(gè)mapTask執(zhí)行失敗,請(qǐng)重新申請(qǐng)一個(gè)容器container。任務(wù)備份:一個(gè)mapTask執(zhí)行慢,將重新申請(qǐng)container執(zhí)行備份任務(wù),取較快者。
11、appmaster想RM申請(qǐng)指定數(shù)量的容器,運(yùn)行reduceTask程序。
12、reduce節(jié)點(diǎn)向所有map節(jié)點(diǎn)獲取相應(yīng)分區(qū)的數(shù)據(jù)并執(zhí)行,執(zhí)行完成后reduce節(jié)點(diǎn)會(huì)回收container容器
13、application運(yùn)行完畢后,mr appmaster會(huì)向RM注銷自己

本地模式和集群模式區(qū)別詳情見https://blog.csdn.net/ForgetThatNight/article/details/78570234#t17 3-2-2

可以在Windows上本地跑mapreduce程序,但是需要配置環(huán)境變量

package cn.itcast.bigdata.mr.wcdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 相當(dāng)于一個(gè)yarn集群的客戶端
 * 需要在此封裝我們的mr程序的相關(guān)運(yùn)行參數(shù),指定jar包
 * 最后提交給yarn
 * @author
 *
 */
public class WordcountDriver {
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        
        //是否運(yùn)行為本地模式,就是看這個(gè)參數(shù)值是否為local,默認(rèn)就是local
        /*conf.set("mapreduce.framework.name", "local");*/
        
        本地模式運(yùn)行mr程序時(shí),輸入輸出的數(shù)據(jù)可以在本地,也可以在hdfs上
        到底在哪里,就看以下兩行配置你用哪行,默認(rèn)就是file:///
        /*conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
        /*conf.set("fs.defaultFS", "file:///");*/
        
        
        
        //運(yùn)行集群模式,就是把程序提交到y(tǒng)arn中去運(yùn)行
        //要想運(yùn)行為集群模式,以下3個(gè)參數(shù)要指定為集群上的值
        /*conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "mini1");
        conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
        Job job = Job.getInstance(conf);
        
        job.setJar("c:/wc.jar");
        //指定本程序的jar包所在的本地路徑
        /*job.setJarByClass(WordcountDriver.class);*/
        
        //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
        
        //指定mapper輸出數(shù)據(jù)的kv類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        //指定最終輸出的數(shù)據(jù)的kv類型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        //指定需要使用combiner,以及用哪個(gè)類作為combiner的邏輯
        /*job.setCombinerClass(WordcountCombiner.class);*/
        job.setCombinerClass(WordcountReducer.class);
        
        //如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputformat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
        
        //指定job的輸入原始文件所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結(jié)果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包,提交給yarn去運(yùn)行
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容