title: mapreduce調(diào)優(yōu)
date: 2016/12/2 9:16:39
tags: MapReduce
categories: 大數(shù)據(jù)
對應(yīng)用程序進(jìn)行調(diào)優(yōu)
避免輸入大量小文件。大量的小文件(不足一個block大小)作為輸入數(shù)據(jù)會產(chǎn)生很多的Map任務(wù)(默認(rèn)一個分片對應(yīng)一個Map任務(wù)),而每個Map任務(wù)實(shí)際工作量又非常小,系統(tǒng)要花更多的時間來將這些Map任務(wù)的輸出進(jìn)行整合。如果將大量的小文件進(jìn)行預(yù)處理合并成一個或幾個大文件,任務(wù)執(zhí)行的效率可能會提升幾十倍。可手動將小文件合并成大文件,或通過Hadoop的SequenceFile、CombineFileInputFormat將多個文件打包到一個輸入單元中,使得每個Map處理更多的數(shù)據(jù),從而提高性能。
輸入文件size巨大,但不是小文件。這種情況可以通過增大每個mapper的input size,即增大minSize或者增大blockSize來減少所需的mapper的數(shù)量。增大blockSize通常不可行,因?yàn)楫?dāng)HDFS被hadoop namenode -format之后,blockSize就已經(jīng)確定了(由格式化時dfs.block.size決定),如果要更改blockSize,需要重新格式化HDFS,這樣當(dāng)然會丟失已有的數(shù)據(jù)。所以通常情況下只能通過增大minSize,即增大mapred.min.split.size的值。
預(yù)判并過濾無用數(shù)據(jù)。可以使用一些過濾工具,在作業(yè)執(zhí)行之前將數(shù)據(jù)中無用的數(shù)據(jù)進(jìn)行過濾,可極大提高M(jìn)apReduce執(zhí)行效率。Bloom Filter是一種功能強(qiáng)大的過濾器,執(zhí)行效率高,時間復(fù)雜度為O(1),缺點(diǎn)是存在一定的誤判可能,詳細(xì)參考《Bloom Filter概念和原理》。當(dāng)將一個非常大的表和一個非常小的表進(jìn)行表連接操作時,可以使用Bloom Filter將小表數(shù)據(jù)作為Bloom Filter的輸入數(shù)據(jù),將大表的原始數(shù)據(jù)進(jìn)行過濾(過濾不通過的數(shù)據(jù)一定是不可用的,過濾通過的數(shù)據(jù)可能有用可能無用),可提高程序執(zhí)行的效率。
合理使用分布式緩存DistributedCache。DistributedCache可以將一些字典、jar包、配置文件等緩存到需要執(zhí)行map任務(wù)的節(jié)點(diǎn)中,避免map任務(wù)多次重復(fù)讀取這些資源,尤其在join操作時,使用DistributedCache緩存小表數(shù)據(jù)在map端進(jìn)行join操作,可避免shuffle、reduce等操作,提高程序運(yùn)行效率。
重用Writable類型。避免大量多次new這些Writable對象,這會花費(fèi)java垃圾收集器大量的清理工作,建議在map函數(shù)外定義這些Writable對象,如下所示:
class MyMapper … {
Text wordText = new Text();
IntWritable one = new IntWritable(1);
public void map(...) {
for (String word: words) {
wordText.set(word);
context.write(wordText, one);
}
}
}
- 合理設(shè)置Combiner。Combine階段處于Map端操作的最后一步,設(shè)置Combine操作可大大提高M(jìn)apReduce的執(zhí)行效率,前提是增加Combine不能改變最終的結(jié)果值,換句話說,不是所有的MapReduce程序都能添加Combine,如求平均數(shù)的MapReduce程序就不適合設(shè)置Combine操作。通常Combine函數(shù)與Reduce函數(shù)一致
對參數(shù)進(jìn)行調(diào)優(yōu)(基于hadoop2.6.0)
HDFS參數(shù)調(diào)優(yōu)(hdfs-site.xml)
dfs.namenode.handler.count:namenode用于處理RPC的線程數(shù),默認(rèn)值10,可根據(jù)NameNode所在節(jié)點(diǎn)機(jī)器配置適當(dāng)調(diào)大,如32、64;
dfs.datanode.handler.count:datanode上用于處理RPC的線程數(shù),2.6版本默認(rèn)值10,早期1.x版本默認(rèn)值為3,可根據(jù)datanode節(jié)點(diǎn)的配置適當(dāng)調(diào)整;
MapReduce參數(shù)調(diào)優(yōu)(mapred-site.xml)
mapreduce.tasktracker.map.tasks.maximum:每個nodemanager節(jié)點(diǎn)上可運(yùn)行的最大map任務(wù)數(shù),默認(rèn)值2,可根據(jù)實(shí)際值調(diào)整為10~100;
mapreduce.tasktracker.reduce.tasks.maximum:每個nodemanager節(jié)點(diǎn)上可運(yùn)行的最大reduce任務(wù)數(shù),默認(rèn)值2,可根據(jù)實(shí)際值調(diào)整為10~100;
mapreduce.output.fileoutputformat.compress:是否對任務(wù)輸出產(chǎn)生的結(jié)果進(jìn)行壓縮,默認(rèn)值false。對傳輸數(shù)據(jù)進(jìn)行壓縮,既可以減少文件的存儲空間,又可以加快數(shù)據(jù)在網(wǎng)絡(luò)不同節(jié)點(diǎn)之間的傳輸速度。
mapreduce.output.fileoutputformat.compress.type:輸出產(chǎn)生任務(wù)數(shù)據(jù)的壓縮方式,默認(rèn)值RECORD,可配置值有:NONE、RECORD、BLOCK
mapreduce.map.output.compress:map端壓縮
mapreduce.map.output.compress.codec:map壓縮格式
mapreduce.task.io.sort.mb:map任務(wù)輸出結(jié)果的內(nèi)存環(huán)形緩沖區(qū)大小,默認(rèn)值100M,可根據(jù)map節(jié)點(diǎn)的機(jī)器進(jìn)行配置,貌似不能超過值mapred.child.java.opts;
mapreduce.map.sort.spill.percent:map任務(wù)輸出環(huán)形緩沖區(qū)大小溢寫觸發(fā)最大比例,默認(rèn)值80%,這個值一般不建議修改;
mapreduce.reduce.shuffle.parallelcopies:reduce節(jié)點(diǎn)通過http拷貝map輸出結(jié)果數(shù)據(jù)到本地的最大工作線程數(shù),默認(rèn)值5,可根據(jù)節(jié)點(diǎn)機(jī)器配置適當(dāng)修改;
mapreduce.reduce.shuffle.input.buffer.percent:reduce節(jié)點(diǎn)在shuffle階段拷貝map輸出結(jié)果數(shù)據(jù)到本地時,內(nèi)存緩沖區(qū)大小所占JVM內(nèi)存的比例,默認(rèn)值0.7,一般不建議修改;
mapreduce.reduce.shuffle.merge.percent:reduce節(jié)點(diǎn)shuffle內(nèi)存緩沖區(qū)溢寫觸發(fā)最大比例,默認(rèn)值0.66,一般不建議修改;
mapred.child.java.opts:配置每個map或reduce使用的內(nèi)存數(shù)量,默認(rèn)值-Xmx200m,即200M。如果nodemanager所在節(jié)點(diǎn)
Map和Reduce個數(shù)設(shè)置
map的數(shù)量
map的數(shù)量通常是由hadoop集群的DFS塊大小確定的,也就是輸入文件的總塊數(shù),正常的map數(shù)量的并行規(guī)模大致是每一個Node是10~100個,對于CPU消耗較小的作業(yè)可以設(shè)置Map數(shù)量為300個左右,但是由于hadoop的沒一個任務(wù)在初始化時需要一定的時間,因此比較合理的情況是每個map執(zhí)行的時間至少超過1分鐘。具體的數(shù)據(jù)分片是這樣的,InputFormat在默認(rèn)情況下會根據(jù)hadoop集群的DFS塊大小進(jìn)行分片,每一個分片會由一個map任務(wù)來進(jìn)行處理,當(dāng)然用戶還是可以通過參數(shù)mapred.min.split.size參數(shù)在作業(yè)提交客戶端進(jìn)行自定義設(shè)置。還有一個重要參數(shù)就是mapred.map.tasks,這個參數(shù)設(shè)置的map數(shù)量僅僅是一個提示,只有當(dāng)InputFormat 決定了map任務(wù)的個數(shù)比mapred.map.tasks值小時才起作用。同樣,Map任務(wù)的個數(shù)也能通過使用JobConf 的conf.setNumMapTasks(int num)方法來手動地設(shè)置。這個方法能夠用來增加map任務(wù)的個數(shù),但是不能設(shè)定任務(wù)的個數(shù)小于Hadoop系統(tǒng)通過分割輸入數(shù)據(jù)得到的值。因此,如果你有一個大小是10TB的輸入數(shù)據(jù),并設(shè)置DFS塊大小為 128M,你必須設(shè)置至少82K個map任務(wù),除非你設(shè)置的mapred.map.tasks參數(shù)比這個數(shù)還要大。當(dāng)然為了提高集群的并發(fā)效率,可以設(shè)置一個默認(rèn)的map數(shù)量,當(dāng)用戶的map數(shù)量較小或者比本身自動分割的值還小時可以使用一個相對交大的默認(rèn)值,從而提高整體hadoop集群的效率。reduece的數(shù)量
reduce在運(yùn)行時往往需要從相關(guān)map端復(fù)制數(shù)據(jù)到reduce節(jié)點(diǎn)來處理,因此相比于map任務(wù)。reduce節(jié)點(diǎn)資源是相對比較缺少的,同時相對運(yùn)行較慢,正確的reduce任務(wù)的個數(shù)應(yīng)該是0.95或者1.75 *(節(jié)點(diǎn)數(shù) ×mapred.tasktracker.tasks.maximum參數(shù)值)。mapred.tasktracker.tasks.reduce.maximum的數(shù)量一般設(shè)置為各節(jié)點(diǎn)cpu core數(shù)量,或者數(shù)量減1,即能同時計(jì)算的slot數(shù)量。如果任務(wù)數(shù)是節(jié)點(diǎn)個數(shù)的0.95倍,那么所有的reduce任務(wù)能夠在 map任務(wù)的輸出傳輸結(jié)束后同時開始運(yùn)行。如果任務(wù)數(shù)是節(jié)點(diǎn)個數(shù)的1.75倍,那么高速的節(jié)點(diǎn)會在完成他們第一批reduce任務(wù)計(jì)算之后開始計(jì)算第二批 reduce任務(wù),這樣的情況更有利于負(fù)載均衡。同時需要注意增加reduce的數(shù)量雖然會增加系統(tǒng)的資源開銷,但是可以改善負(fù)載勻衡,降低任務(wù)失敗帶來的負(fù)面影響。同樣,Reduce任務(wù)也能夠與 map任務(wù)一樣,通過設(shè)定JobConf 的conf.setNumReduceTasks(int num)方法來增加任務(wù)個數(shù)。
cpu數(shù)量 = 服務(wù)器CPU總核數(shù) / 每個CPU的核數(shù)
服務(wù)器CPU總核數(shù) = more /proc/cpuinfo | grep 'processor' | wc -l
每個CPU的核數(shù) = more /proc/cpuinfo | grep 'cpu cores'reduce數(shù)量為0
有些作業(yè)不需要進(jìn)行歸約進(jìn)行處理,那么就可以設(shè)置reduce的數(shù)量為0來進(jìn)行處理,這種情況下用戶的作業(yè)運(yùn)行速度相對較高,map的輸出會直接寫入到 SetOutputPath(path)設(shè)置的輸出目錄,而不是作為中間結(jié)果寫到本地。同時Hadoop框架在寫入文件系統(tǒng)前并不對之進(jìn)行排序。
參考轉(zhuǎn)載
http://www.cnblogs.com/hanganglin/p/4563716.html
https://my.oschina.net/Chanthon/blog/150500