要是你是個Java開發(fā),那么你肯定聽說過MapReduce,下面就來看看這個東東吧
一、簡介
- MapReduce:一個分布式計算框架;
- MapReduce存在意義:大大減小用戶開發(fā)分布式計算的開發(fā)難度,從而提高分布式計算的開發(fā)效率。因為MapReduce把分布式業(yè)務的各種任務調(diào)度操作和計算程序與HDFS程序的銜接工作封裝了起來,開發(fā)人員只需要關注自己的業(yè)務邏輯;
- MapReduce結構:
- MRAppMaster:MapReduce Application Master,負責整個程序的過程中MapTask和ReduceTask調(diào)度和狀態(tài)協(xié)調(diào)的工作;
- MapTask:負責Map階段的整個數(shù)據(jù)處理流程;
- ReduceTask:負責Reduce階段的整個數(shù)據(jù)處理流程;
二、MapReduce運行流程
- 當我們通過Hadoop運行jar,運行到最后的submit/waitForCompletion之后,Job底層會去獲取我們要處理的文件的規(guī)模(FileInputFormat中的getSplits方法:獲取所有的文件,累加所有的文件大小),根據(jù)我們在Job上的參數(shù)配置,計算形成這個任務執(zhí)行規(guī)劃(任務分片規(guī)劃:job.split(一個切片對應一個MapTask),規(guī)劃元信息:job.splitmetainfo,輸入輸出配置:job.xml),然后把這個規(guī)劃內(nèi)容除去原信息和我們上傳的jar提交給YARN;
- YARN根據(jù)ResourceManager對各個NodeManager的記錄情況,選出一個比較清閑的NodeManager作為MRAppMaster,YARN就把規(guī)劃發(fā)送給MRAppMaster,MRAppMaster就會根據(jù)規(guī)劃啟動相應個數(shù)的MapTask進程,進程啟動后,這些進程會使用FileInputFormat打開HDFS的流讀取文件數(shù)據(jù),同時把讀取到的數(shù)據(jù)傳入Mapper的map方法中,map方法處理后會把結果交給OutPutCollector,OutPutCollector會把這些數(shù)據(jù)寫入本地文件;文件中的數(shù)據(jù)是排序且分區(qū)的,有幾個ReduceTask就有幾個分區(qū),分區(qū)是由[Hash]Partitioner組件完成的;
- 當MRAppMaster檢測到所有的MapTask完成之后,ResourceManager就會根據(jù)規(guī)劃啟動(有可能提前啟動)相應臺數(shù)的ReduceTask,這些ReduceTask就會去獲取MapTask輸出文件的特定分區(qū)數(shù)據(jù),把所有MapTask的特定分區(qū)全部獲取到之后進行歸并排序,然后按照相同key的KV為一個組,調(diào)用Reducer的reduce()方法進行邏輯運算;其中,每個MapTask的內(nèi)容都會按照分區(qū)被對應的ReduceTask讀?。籖educe的結果就會到OutputFormat對象中,OutputFormat會根據(jù)當前的對接口的實現(xiàn)決定把文件做怎樣的輸出,可能是網(wǎng)絡IO,也可能是HDFS等等,要看OutputFormat的實現(xiàn)是什么樣子;
三、集成過程
- 引入Maven:Common/CommonLib/HDFS/HDFSLib/MapReduce/MapReduceLib/YARN/YARNLib
- 編寫MapTask程序:實現(xiàn)類org.apache.hadoop.mapreduce.Mapper子類的map方法
- Mapper泛型:
- KEYIN: 默認情況下,是mr框架所讀到的一行Long類型文本的起始偏移量,但是在hadoop中有自己的更精簡的序列化接口,所以不直接用Long,而用LongWritable;
- VALUEIN:默認情況下,是mr框架所讀到的一行String類型文本的內(nèi)容,用Text;
- KEYOUT:是用戶自定義邏輯處理完成之后輸出String類型數(shù)據(jù)中的key,在此處是單詞,用Text;
- VALUEOUT:是用戶自定義邏輯處理完成之后輸出Integer類型數(shù)據(jù)中的value,在此處是單詞次數(shù),用IntWritable;
- MapTask會對每一行輸入數(shù)據(jù)調(diào)用一次我們自定義的map()方法;
- map階段的業(yè)務邏輯就寫在自定義的map()方法中;
- 在整個MapTask運行完畢之前,MapTask會把每次的結果保存在一個臨時文件中;
- Mapper泛型:
- 編寫ReduceTask程序:實現(xiàn)類org.apache.hadoop.mapreduce.Reducer子類的reduce方法
- Reducer泛型:
- KEYIN, VALUEIN對應Mapper輸出的KEYOUT,VALUEOUT類型;
- KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出數(shù)據(jù)類型;
- KEYOUT是單詞;
- VLAUEOUT是總次數(shù);
- Reducer泛型:
- 編寫YARN程序來采集程序運行相關的參數(shù),將下面代碼寫在main方法中:
if (args == null || args.length == 0) { args = new String[2]; args[0] = "hdfs://master:9000/wordcount/input/wordcount.txt"; args[1] = "hdfs://master:9000/wordcount/output8"; } Configuration conf = new Configuration(); /** // 設置Hadoop用戶名,在win上開發(fā)需要設置 //conf.set("HADOOP_USER_NAME", "hadoop"); //conf.set("dfs.permissions.enabled", "false"); **/ /*conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resoucemanager.hostname", "mini1");*/ Job job = Job.getInstance(conf); /*job.setJar("/home/shreker/wordcount.jar");*/ //指定本程序的jar包所在的本地路徑 job.setJarByClass(WordcountDriver.class); //指定本業(yè)務job要使用的mapper/Reducer業(yè)務類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); //指定mapper輸出數(shù)據(jù)的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //指定Reduce最終輸出的數(shù)據(jù)的kv類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定需要緩存一個文件到所有的maptask運行節(jié)點工作目錄 /* job.addArchiveToClassPath(archive); */// 緩存jar包到task運行節(jié)點的classpath中 /* job.addFileToClassPath(file); */// 緩存普通文件到task運行節(jié)點的classpath中 /* job.addCacheArchive(uri); */// 緩存壓縮包文件到task運行節(jié)點的工作目錄 /* job.addCacheFile(uri) */// 緩存普通文件到task運行節(jié)點的工作目錄 //指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關參數(shù),以及job所用的java類所在的jar包,提交給yarn去運行 /*job.submit();*/ boolean successful = job.waitForCompletion(true); System.exit(successful ? 0 : 1); - 把這幾個類達成一個jar;
- 啟動HDFS和YARN集群;
- 上傳到集群的任何一個節(jié)點上;
- 在HDFS上創(chuàng)建一個文件夾,把要處理的文件上傳到這個文件夾下;
- 運行jar(-cp:-classpath)
# java程序依賴運行 java -cp <jar-path>[:<jar-path>[:<jar-path>...]] <main-package-class> <main-class-params> # hadoop中jar的使用 hadoop <jar-path> <main-package-class> <main-class-params>
四、MapReduce數(shù)據(jù)流向
- HDFS數(shù)據(jù)流
- InputFormat:主要定義讀取數(shù)據(jù)的方式,如一次讀取多少字節(jié),還是一次讀取一行等
- RecordReader:通過read方法開始從數(shù)據(jù)流中讀取數(shù)據(jù);
- Mapper:數(shù)據(jù)流經(jīng)我們編寫的map方法;
- OutPutCollector:map方法輸出數(shù)據(jù)收集器;
- MapOutputBuffer(環(huán)形緩沖區(qū))
- mr.sort.size:100;
- Partitioner:默認使用HashPartitioner,即按照key的Hash值分區(qū);
- SpillRecord:
- 每次溢出產(chǎn)生一個文件,每個文件都是分好區(qū)的;
- 數(shù)據(jù)處理完成后,合并所有的文件,結果文件也是分區(qū)合并的(使用歸并排序);
- Combiner:本質(zhì)還是一個Reducer,作用是減少網(wǎng)絡IO,即提前進行reduce操作;但是注意,不是所有的操作都適合使用Combiner,只有在不影響業(yè)務結果的情況下才能使用;
- Reducer:對所有的數(shù)據(jù)進行匯總統(tǒng)計工作,注意:每個ReduceTask都是去獲取所有Map輸出文件的同一個分區(qū)獲取數(shù)據(jù);
- OutputFormat:定義目標文件;
- RecordWriter:根據(jù)目標文件進行寫入;
五、自定義MapTask分區(qū)
- 實現(xiàn)一個類繼承自
org.apache.hadoop.mapreduce.Partitioner,封裝分區(qū)的業(yè)務邏輯; - 在Job中設置分區(qū)個數(shù)(
job.setNumReduceTasks),和分區(qū)的實現(xiàn)類(job.setPartitionerClass);
六、MapTask任務切片規(guī)劃的機制
- 任務切片規(guī)劃講的是,任務的切片的生成過程
- 切片的原則:
- 簡單地按照文件的內(nèi)容長度進行切片;
- 切片大小,默認等于block大??;
- 切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片;
- 當程序運行到submit/waitForCompletion的時候,InputFormat類中的getSplit()切片簡要過程如下:
- 獲取到要處理的數(shù)據(jù)所在文件夾;
- 遍歷這個文件夾下的每個文件,對每個文件做切片運算,主要是欺騙大小的計算,如:
computeSplitSize::Math.max(minSize, Math.min(goalSize, blockSize)),并把切片運算結果記錄在job.split文件中,并計算所有的文件并做切片運算,都記錄在job.split文件中; - 需要注意的是,在做切片的時候還有很多的細節(jié),比如,在切片的最后的時候,還要判斷剩下的是不是只有一點(block的1.1倍),如果只有一點,那么根據(jù)簡單切片原則的最后這個切片就直接歸在倒數(shù)第二個切片上,這算是對切片的一些優(yōu)化內(nèi)容,如果想知道的更詳細可以跟蹤源碼查看,這里就不再贅述了;
七、序列化反序列化
- 我們在編寫Java程序的時候序列化一般都是實現(xiàn)接口
java.io.Serializable,但是這個接口有個缺點,就是會把集成體系結構中所有的數(shù)據(jù)都會進行序列化; - 在MapReduce體系中,有自己一套序列化和反序列化的工具
- String → Text
- int → IntWritable
- long → LongWritable
- ...
- 自定義類 → 實現(xiàn)接口
org.apache.hadoop.io.WritableComparable,參考這里;- Comparable接口主要用來比對Key是否是同一個Key;
八、小文件處理
- 在Job上設置InputFormatClass,這個僅僅只是把多個小文件看做是一個切片,沒有進行物理的合并,在數(shù)據(jù)讀取的時候進行:
job.setInputFormatClass(CombineTextInputFormat.class); - 同時限制切片的上線和下限
CombineTextInputFormat.setMaxInputSplitSize(<job>, <max-bytes-size>); CombineTextInputFormat.setMinInputSplitSize(<job>, <min-bytes-size>); - MapTask的個數(shù)與CPU的核心個數(shù)一樣的時候,處理效率最高
九、超大文件處理
- 如果要處理的文件比較大,那么就可以通過設置提高blocksize的大小
十、并行度
- MapTask的并行度:由Job切片數(shù)決定
- ReduceTask的并行度:可以手動設置,默認值是1:
job.setNumReduceTasks(3);
十一、Combiner
- Combiner是一種特殊的Reducer,因為事先Combiner組件要繼承Reducer,該組件位于Map和Reduce之間;
- Combiner的意義:局部Reduce,以減少網(wǎng)絡IO,提供系統(tǒng)性能;
- Combiner的實現(xiàn):
- 實現(xiàn)一個繼承Reducer的類,這個類就是一個Combiner;
- 在Job對象上設置Combiner類:
job.setCombinerClass(<Combiner>.class)
- 注意:
- 不是任何地方都能夠使用Combiner,前提是不能影響業(yè)務邏輯;
- Combiner輸出的類型要與Reduce輸入一致,輸入與Map輸出一致;
十二、Shuffle
- shuffle:指的是Map把數(shù)據(jù)傳遞給Reduce的這個流程,其核心就是:數(shù)據(jù)分區(qū)和排序;
- shuffle流程再現(xiàn):
- OutPutCollector收集MapTask輸出的KV對,放到環(huán)形緩沖區(qū)中;
- 環(huán)形緩沖區(qū)數(shù)據(jù)的不斷流入而溢出本地磁盤文件,每次溢出形成一個文件,其中調(diào)用組件Partitioner對數(shù)據(jù)進行分區(qū)以及針對key的排序,多個溢出文件會被合并成大的溢出文件,如果設置了Combiner組件就會進行Combine操作;
- ReduceTask根據(jù)自己的分區(qū)號,去各個MapTask機器上取相應的結果分區(qū)數(shù)據(jù);
- ReduceTask會取到同一個分區(qū)的來自不同MapTask的結果文件,ReduceTask會利用歸并排序將這些文件再進行合并;
- 合并成大文件后,Shuffle的過程也就結束了,后面進入Reduce流程;
十三、MapReduce運行模式
- 本地運行:如果我們在win上運行程序,默認情況下吧程序提交到一個本地的模擬器上,使用線程模擬YarnChild;如果需要指定的話,在Configuration對象上設置
mapreduce.framework.name值可以是:"local","yarn"等,還有就是fs.defaultFS,用來指定輸入輸出的數(shù)據(jù)在哪; - 集群運行模式,設置以下參數(shù)(當然,如果是打jar在Linux上執(zhí)行,倒也沒有必要設置,因為Hadoop會自動設置):
-
mapreduce.framework.name為yarn -
yarn.resourcemanager.hostname為對應的IP或者主機名 -
fs.defaultFS為hdfs://<host>:9000
-
- 本地運行集群
- 這樣的方式直接運行時會報錯的,需要修改源碼,因為在本地運行集群的時候,Job的相關參數(shù)設置的時候是win形式的配置,當這些配置在運行的時候解析就會出現(xiàn)問題。
可以在其中完成一些SQL完成的示例比如JOIN的邏輯
* 疑問
- Mapper的泛型和Reducer的泛型可以不對應嗎?
- Mapper的輸出和Reducer的輸入必須對應;
- MapTask完成后才分區(qū)還是一邊執(zhí)行MapTask一邊分區(qū)?
- MapOutputBuffer的每次溢出都會產(chǎn)生一個文件,這個文件是分好區(qū)的,當所有的數(shù)據(jù)都完成處理的時候,就會把這些文件合并;所以,是一邊執(zhí)行MapTask一邊分區(qū);
- ReduceTask獲取所有的MapTask數(shù)據(jù)之后進行歸并排序嗎?如果每個MapTask的輸出文件都很大,每個數(shù)據(jù)分區(qū)也都很大呢。。。