BigData~03:Hadoop03 ~ MapReduce

要是你是個Java開發(fā),那么你肯定聽說過MapReduce,下面就來看看這個東東吧

一、簡介

  1. MapReduce:一個分布式計算框架;
  2. MapReduce存在意義:大大減小用戶開發(fā)分布式計算的開發(fā)難度,從而提高分布式計算的開發(fā)效率。因為MapReduce把分布式業(yè)務的各種任務調(diào)度操作和計算程序與HDFS程序的銜接工作封裝了起來,開發(fā)人員只需要關注自己的業(yè)務邏輯;
  3. MapReduce結構:
    • MRAppMaster:MapReduce Application Master,負責整個程序的過程中MapTask和ReduceTask調(diào)度和狀態(tài)協(xié)調(diào)的工作;
    • MapTask:負責Map階段的整個數(shù)據(jù)處理流程;
    • ReduceTask:負責Reduce階段的整個數(shù)據(jù)處理流程;

二、MapReduce運行流程

  1. 當我們通過Hadoop運行jar,運行到最后的submit/waitForCompletion之后,Job底層會去獲取我們要處理的文件的規(guī)模(FileInputFormat中的getSplits方法:獲取所有的文件,累加所有的文件大小),根據(jù)我們在Job上的參數(shù)配置,計算形成這個任務執(zhí)行規(guī)劃(任務分片規(guī)劃:job.split(一個切片對應一個MapTask),規(guī)劃元信息:job.splitmetainfo,輸入輸出配置:job.xml),然后把這個規(guī)劃內(nèi)容除去原信息和我們上傳的jar提交給YARN;
  2. YARN根據(jù)ResourceManager對各個NodeManager的記錄情況,選出一個比較清閑的NodeManager作為MRAppMaster,YARN就把規(guī)劃發(fā)送給MRAppMaster,MRAppMaster就會根據(jù)規(guī)劃啟動相應個數(shù)的MapTask進程,進程啟動后,這些進程會使用FileInputFormat打開HDFS的流讀取文件數(shù)據(jù),同時把讀取到的數(shù)據(jù)傳入Mapper的map方法中,map方法處理后會把結果交給OutPutCollector,OutPutCollector會把這些數(shù)據(jù)寫入本地文件;文件中的數(shù)據(jù)是排序且分區(qū)的,有幾個ReduceTask就有幾個分區(qū),分區(qū)是由[Hash]Partitioner組件完成的;
  3. 當MRAppMaster檢測到所有的MapTask完成之后,ResourceManager就會根據(jù)規(guī)劃啟動(有可能提前啟動)相應臺數(shù)的ReduceTask,這些ReduceTask就會去獲取MapTask輸出文件的特定分區(qū)數(shù)據(jù),把所有MapTask的特定分區(qū)全部獲取到之后進行歸并排序,然后按照相同key的KV為一個組,調(diào)用Reducer的reduce()方法進行邏輯運算;其中,每個MapTask的內(nèi)容都會按照分區(qū)被對應的ReduceTask讀?。籖educe的結果就會到OutputFormat對象中,OutputFormat會根據(jù)當前的對接口的實現(xiàn)決定把文件做怎樣的輸出,可能是網(wǎng)絡IO,也可能是HDFS等等,要看OutputFormat的實現(xiàn)是什么樣子;

三、集成過程

  1. 引入Maven:Common/CommonLib/HDFS/HDFSLib/MapReduce/MapReduceLib/YARN/YARNLib
  2. 編寫MapTask程序:實現(xiàn)類org.apache.hadoop.mapreduce.Mapper子類的map方法
    • Mapper泛型:
      • KEYIN: 默認情況下,是mr框架所讀到的一行Long類型文本的起始偏移量,但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable;
      • VALUEIN:默認情況下,是mr框架所讀到的一行String類型文本的內(nèi)容,用Text;
      • KEYOUT:是用戶自定義邏輯處理完成之后輸出String類型數(shù)據(jù)中的key,在此處是單詞,用Text;
      • VALUEOUT:是用戶自定義邏輯處理完成之后輸出Integer類型數(shù)據(jù)中的value,在此處是單詞次數(shù),用IntWritable;
    • MapTask會對每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法;
    • map階段的業(yè)務邏輯就寫在自定義的map()方法中;
    • 在整個MapTask運行完畢之前,MapTask會把每次的結果保存在一個臨時文件中;
  3. 編寫ReduceTask程序:實現(xiàn)類org.apache.hadoop.mapreduce.Reducer子類的reduce方法
    • Reducer泛型:
      • KEYIN, VALUEIN對應Mapper輸出的KEYOUT,VALUEOUT類型;
      • KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數(shù)據(jù)類型;
      • KEYOUT是單詞;
      • VLAUEOUT是總次數(shù);
  4. 編寫YARN程序來采集程序運行相關的參數(shù),將下面代碼寫在main方法中:
    if (args == null || args.length == 0) {
        args = new String[2];
        args[0] = "hdfs://master:9000/wordcount/input/wordcount.txt";
        args[1] = "hdfs://master:9000/wordcount/output8";
    }
    
    Configuration conf = new Configuration();
    
    /**
    // 設置Hadoop用戶名,在win上開發(fā)需要設置
    //conf.set("HADOOP_USER_NAME", "hadoop");
    //conf.set("dfs.permissions.enabled", "false");
    **/
    
    /*conf.set("mapreduce.framework.name", "yarn");
    conf.set("yarn.resoucemanager.hostname", "mini1");*/
    Job job = Job.getInstance(conf);
    
    /*job.setJar("/home/shreker/wordcount.jar");*/
    //指定本程序的jar包所在的本地路徑
    job.setJarByClass(WordcountDriver.class);
    
    //指定本業(yè)務job要使用的mapper/Reducer業(yè)務類
    job.setMapperClass(WordcountMapper.class);
    job.setReducerClass(WordcountReducer.class);
    
    //指定mapper輸出數(shù)據(jù)的kv類型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    
    //指定Reduce最終輸出的數(shù)據(jù)的kv類型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    
    // 指定需要緩存一個文件到所有的maptask運行節(jié)點工作目錄
    /* job.addArchiveToClassPath(archive); */// 緩存jar包到task運行節(jié)點的classpath中
    /* job.addFileToClassPath(file); */// 緩存普通文件到task運行節(jié)點的classpath中
    /* job.addCacheArchive(uri); */// 緩存壓縮包文件到task運行節(jié)點的工作目錄
    /* job.addCacheFile(uri) */// 緩存普通文件到task運行節(jié)點的工作目錄
    
    //指定job的輸入原始文件所在目錄
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    //指定job的輸出結果所在目錄
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    //將job中配置的相關參數(shù),以及job所用的java類所在的jar包,提交給yarn去運行
    /*job.submit();*/
    boolean successful = job.waitForCompletion(true);
    System.exit(successful ? 0 : 1);
    
  5. 把這幾個類達成一個jar;
  6. 啟動HDFS和YARN集群;
  7. 上傳到集群的任何一個節(jié)點上;
  8. 在HDFS上創(chuàng)建一個文件夾,把要處理的文件上傳到這個文件夾下;
  9. 運行jar(-cp:-classpath)
    # java程序依賴運行
    java -cp <jar-path>[:<jar-path>[:<jar-path>...]] <main-package-class> <main-class-params>
    # hadoop中jar的使用
    hadoop <jar-path> <main-package-class> <main-class-params>
    

四、MapReduce數(shù)據(jù)流向

  1. HDFS數(shù)據(jù)流
  2. InputFormat:主要定義讀取數(shù)據(jù)的方式,如一次讀取多少字節(jié),還是一次讀取一行等
  3. RecordReader:通過read方法開始從數(shù)據(jù)流中讀取數(shù)據(jù);
  4. Mapper:數(shù)據(jù)流經(jīng)我們編寫的map方法;
  5. OutPutCollector:map方法輸出數(shù)據(jù)收集器;
  6. MapOutputBuffer(環(huán)形緩沖區(qū))
    • mr.sort.size:100;
  7. Partitioner:默認使用HashPartitioner,即按照key的Hash值分區(qū);
  8. SpillRecord:
    • 每次溢出產(chǎn)生一個文件,每個文件都是分好區(qū)的;
    • 數(shù)據(jù)處理完成后,合并所有的文件,結果文件也是分區(qū)合并的(使用歸并排序);
  9. Combiner:本質(zhì)還是一個Reducer,作用是減少網(wǎng)絡IO,即提前進行reduce操作;但是注意,不是所有的操作都適合使用Combiner,只有在不影響業(yè)務結果的情況下才能使用;
  10. Reducer:對所有的數(shù)據(jù)進行匯總統(tǒng)計工作,注意:每個ReduceTask都是去獲取所有Map輸出文件的同一個分區(qū)獲取數(shù)據(jù);
  11. OutputFormat:定義目標文件;
  12. RecordWriter:根據(jù)目標文件進行寫入;

五、自定義MapTask分區(qū)

  1. 實現(xiàn)一個類繼承自org.apache.hadoop.mapreduce.Partitioner,封裝分區(qū)的業(yè)務邏輯;
  2. 在Job中設置分區(qū)個數(shù)(job.setNumReduceTasks),和分區(qū)的實現(xiàn)類(job.setPartitionerClass);

六、MapTask任務切片規(guī)劃的機制

  • 任務切片規(guī)劃講的是,任務的切片的生成過程
  • 切片的原則:
    1. 簡單地按照文件的內(nèi)容長度進行切片;
    2. 切片大小,默認等于block大??;
    3. 切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片;
  • 當程序運行到submit/waitForCompletion的時候,InputFormat類中的getSplit()切片簡要過程如下:
    1. 獲取到要處理的數(shù)據(jù)所在文件夾;
    2. 遍歷這個文件夾下的每個文件,對每個文件做切片運算,主要是欺騙大小的計算,如:computeSplitSize::Math.max(minSize, Math.min(goalSize, blockSize)),并把切片運算結果記錄在job.split文件中,并計算所有的文件并做切片運算,都記錄在job.split文件中;
    3. 需要注意的是,在做切片的時候還有很多的細節(jié),比如,在切片的最后的時候,還要判斷剩下的是不是只有一點(block的1.1倍),如果只有一點,那么根據(jù)簡單切片原則的最后這個切片就直接歸在倒數(shù)第二個切片上,這算是對切片的一些優(yōu)化內(nèi)容,如果想知道的更詳細可以跟蹤源碼查看,這里就不再贅述了;

七、序列化反序列化

  1. 我們在編寫Java程序的時候序列化一般都是實現(xiàn)接口java.io.Serializable,但是這個接口有個缺點,就是會把集成體系結構中所有的數(shù)據(jù)都會進行序列化;
  2. 在MapReduce體系中,有自己一套序列化和反序列化的工具
    • String → Text
    • int → IntWritable
    • long → LongWritable
    • ...
    • 自定義類 → 實現(xiàn)接口org.apache.hadoop.io.WritableComparable,參考這里;
      • Comparable接口主要用來比對Key是否是同一個Key;

八、小文件處理

  1. 在Job上設置InputFormatClass,這個僅僅只是把多個小文件看做是一個切片,沒有進行物理的合并,在數(shù)據(jù)讀取的時候進行:
    job.setInputFormatClass(CombineTextInputFormat.class);
    
  2. 同時限制切片的上線和下限
    CombineTextInputFormat.setMaxInputSplitSize(<job>, <max-bytes-size>);
    CombineTextInputFormat.setMinInputSplitSize(<job>, <min-bytes-size>);
    
  3. MapTask的個數(shù)與CPU的核心個數(shù)一樣的時候,處理效率最高

九、超大文件處理

  • 如果要處理的文件比較大,那么就可以通過設置提高blocksize的大小

十、并行度

  • MapTask的并行度:由Job切片數(shù)決定
  • ReduceTask的并行度:可以手動設置,默認值是1:
    job.setNumReduceTasks(3);
    

十一、Combiner

  • Combiner是一種特殊的Reducer,因為事先Combiner組件要繼承Reducer,該組件位于Map和Reduce之間;
  • Combiner的意義:局部Reduce,以減少網(wǎng)絡IO,提供系統(tǒng)性能;
  • Combiner的實現(xiàn):
    1. 實現(xiàn)一個繼承Reducer的類,這個類就是一個Combiner;
    2. 在Job對象上設置Combiner類:
      job.setCombinerClass(<Combiner>.class)
      
  • 注意:
    1. 不是任何地方都能夠使用Combiner,前提是不能影響業(yè)務邏輯;
    2. Combiner輸出的類型要與Reduce輸入一致,輸入與Map輸出一致;

十二、Shuffle

  • shuffle:指的是Map把數(shù)據(jù)傳遞給Reduce的這個流程,其核心就是:數(shù)據(jù)分區(qū)和排序;
  • shuffle流程再現(xiàn):
    1. OutPutCollector收集MapTask輸出的KV對,放到環(huán)形緩沖區(qū)中;
    2. 環(huán)形緩沖區(qū)數(shù)據(jù)的不斷流入而溢出本地磁盤文件,每次溢出形成一個文件,其中調(diào)用組件Partitioner對數(shù)據(jù)進行分區(qū)以及針對key的排序,多個溢出文件會被合并成大的溢出文件,如果設置了Combiner組件就會進行Combine操作;
    3. ReduceTask根據(jù)自己的分區(qū)號,去各個MapTask機器上取相應的結果分區(qū)數(shù)據(jù);
    4. ReduceTask會取到同一個分區(qū)的來自不同MapTask的結果文件,ReduceTask會利用歸并排序將這些文件再進行合并;
    5. 合并成大文件后,Shuffle的過程也就結束了,后面進入Reduce流程;

十三、MapReduce運行模式

  • 本地運行:如果我們在win上運行程序,默認情況下吧程序提交到一個本地的模擬器上,使用線程模擬YarnChild;如果需要指定的話,在Configuration對象上設置mapreduce.framework.name值可以是:"local","yarn"等,還有就是fs.defaultFS,用來指定輸入輸出的數(shù)據(jù)在哪;
  • 集群運行模式,設置以下參數(shù)(當然,如果是打jar在Linux上執(zhí)行,倒也沒有必要設置,因為Hadoop會自動設置):
    1. mapreduce.framework.nameyarn
    2. yarn.resourcemanager.hostname為對應的IP或者主機名
    3. fs.defaultFShdfs://<host>:9000
  • 本地運行集群
    • 這樣的方式直接運行時會報錯的,需要修改源碼,因為在本地運行集群的時候,Job的相關參數(shù)設置的時候是win形式的配置,當這些配置在運行的時候解析就會出現(xiàn)問題。

可以在其中完成一些SQL完成的示例比如JOIN的邏輯

* 疑問

  1. Mapper的泛型和Reducer的泛型可以不對應嗎?
    • Mapper的輸出和Reducer的輸入必須對應;
  2. MapTask完成后才分區(qū)還是一邊執(zhí)行MapTask一邊分區(qū)?
    • MapOutputBuffer的每次溢出都會產(chǎn)生一個文件,這個文件是分好區(qū)的,當所有的數(shù)據(jù)都完成處理的時候,就會把這些文件合并;所以,是一邊執(zhí)行MapTask一邊分區(qū);
  3. ReduceTask獲取所有的MapTask數(shù)據(jù)之后進行歸并排序嗎?如果每個MapTask的輸出文件都很大,每個數(shù)據(jù)分區(qū)也都很大呢。。。
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容