一般情況下,啟動(dòng)一個(gè)hive任務(wù)時(shí)hive會(huì)計(jì)算這個(gè)任務(wù)需要用到的map和reduce數(shù)量,通常map數(shù)和reduce數(shù)不需要調(diào)整。但是有時(shí)如果map或reduce的計(jì)算很復(fù)雜、單個(gè)map的執(zhí)行時(shí)間很長,且hive分配的map數(shù)或reduce比較少,集群還有大量計(jì)算資源沒有利用的情況下,可以通過增大map數(shù)或reduce數(shù),來提高任務(wù)并發(fā),縮短任務(wù)計(jì)算時(shí)長,提高計(jì)算效率。
1、hive on mr
1.1、如何調(diào)整map數(shù)
InputFormat 接口按照某個(gè)策略將輸入數(shù)據(jù)且分成若干個(gè) split,以便確定 Map Task 的個(gè)數(shù)即 Mapper 的個(gè)數(shù),在 MapReduce 框架中,一個(gè) split 就意味著需要一個(gè) Map Task;
當(dāng)hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat時(shí),hive會(huì)先計(jì)算splitSize ,然后通過splitSize、任務(wù)輸入的數(shù)據(jù)量大小和文件數(shù)來共同決定split數(shù)量,即map數(shù)。
splitSize = max{minSize,min{goalSize,blockSize}}
其中
- minSize:是
mapreduce.input.fileinputformat.split.minsize決定的 InputFormat的最小長度。 - goalSize:該值由
totalSize/numSplits來確定 InputSplit 的長度,它是根據(jù)用戶的期望的 InputSplit 個(gè)數(shù)計(jì)算出來的。numSplits為用戶設(shè)定的 Map Task 的個(gè)數(shù),默認(rèn)為1,可通過mapreduce.job.maps設(shè)置。totalSize是hive任務(wù)處理的數(shù)據(jù)量大小。 - blockSize:HDFS 中的文件存儲(chǔ)塊block的大小,可以通過
dfs.blocksize查看大小。
由上公式可知,在org.apache.hadoop.hive.ql.io.HiveInputFormat接口下,主要是mapreduce.input.fileinputformat.split.minsize和mapreduce.job.maps來決定map數(shù)
當(dāng)hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat時(shí),主要是如下四個(gè)參數(shù)起作用:
mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize.per.rack-
mapreduce.input.fileinputformat.split.minsize.per.node
這里切分的邏輯比較復(fù)雜,主要的流程大致如下:
- 首先處理每個(gè)Datanode的blockInfo,先按照>=maxsplitsize來切分split,剩余的再按照blockinfo>=minSplitSizeNode切分,其余的等和rack的其余blockinfo進(jìn)行合并。
- 其次對每個(gè)Rack進(jìn)行處理:先按照>=maxsplitsize來切分split,剩余的再按照blockinfo>=minSplitSizeRack切分,其余的等和overflow的其余blockinfo進(jìn)行合并。
- 對于overflow blockInfo直接根據(jù)maxsplitsize來進(jìn)行切分。
1.2 如何調(diào)整reduce數(shù)
hive on mr模式下reduce數(shù)主要受如下兩個(gè)參數(shù)影響:
-
hive.exec.reducers.bytes.per.reducer--每個(gè)reduce處理的數(shù)量量 -
hive.exec.reducers.max--hive任務(wù)最大reduce個(gè)數(shù)
reducer數(shù) = min(hive.exec.reducers.max,max(1,totalsize/hive.exec.reducers.bytes.per.reducer))
2、hive on tez
2.1、如何調(diào)整map數(shù)
在運(yùn)行hive on tez時(shí)會(huì)發(fā)現(xiàn)跟hive on mr的map數(shù)差異會(huì)比較大,主要原因在于 Tez 中對 inputSplit 做了 grouping 操作,將多個(gè) inputSplit 組合成更少的 groups,然后為每個(gè) group 生成一個(gè) mapper 任務(wù),而不是為每個(gè)inputSplit 生成一個(gè)mapper 任務(wù)。
可以通過調(diào)整如下參數(shù)來調(diào)整grouping數(shù):
tez.grouping.min-sizetez.grouping.max-size
2.2、如何調(diào)整reduce數(shù)
tez on tez模式下reduce數(shù)主要受如下兩個(gè)參數(shù)影響:
-
hive.exec.reducers.bytes.per.reducer--每個(gè)reduce處理的數(shù)量量 -
hive.exec.reducers.max--hive任務(wù)最大reduce個(gè)數(shù) hive.tez.auto.reducer.parallelism-
hive.tex.min.partition.factor
-hive.tez.max.partition.factor
reducer數(shù) = Max(1, Min(hive.exec.reducers.max, ReducerStage estimate/hive.exec.reducers.bytes.per.reducer))x hive.tez.max.partition.factor
3、其他影響map數(shù)和reduce數(shù)的情況
3.1 小文件合并
hive.merge.mapfiles = true #在Map-only的任務(wù)結(jié)束時(shí)合并小文件
hive.merge.mapredfiles = true #在Map-Reduce的任務(wù)結(jié)束時(shí)合并小文件
hive.merge.size.per.task = 256*1000*1000 #合并文件的大小
hive.merge.smallfiles.avgsize=16000000 #當(dāng)輸出文件的平均大小小于該值時(shí),啟動(dòng)一個(gè)獨(dú)立的map-reduce任務(wù)進(jìn)行文件merge
上面參數(shù)會(huì)在任務(wù)結(jié)束后,如果任務(wù)生成的小文件觸發(fā)了hive.merge.smallfiles.avgsize=會(huì)另外再啟reduce任務(wù)來合并小文件。
3.2 分桶表的影響
如果往分桶表里插入數(shù)據(jù),由于hdfs最終會(huì)每個(gè)桶一個(gè)文件,因此在當(dāng)分桶表設(shè)置了多少個(gè)桶,最終就會(huì)生成多少個(gè)reduce任務(wù)
3.3 只一個(gè)個(gè)redecu的情況
- 執(zhí)行全局聚合,例如語句:
select count(*),sum(field1) from tablea - 做笛卡爾集操作
- 執(zhí)行order by 操作
3.4 map階段不支持拆分的情況
有的文件格式并不支持切分,如果hive表的存儲(chǔ)的文件格式不支持切分,則在查詢該表時(shí),有多少文件就會(huì)產(chǎn)生多少map任務(wù)。例如,存儲(chǔ)格式是text,但是采用了gzip壓縮,這種情況下則不支持切分,讀表數(shù)據(jù)時(shí),該表對應(yīng)的存儲(chǔ)目錄下有多少個(gè)文件就會(huì)生成多少個(gè)map任務(wù),每個(gè)map任務(wù)處理一個(gè)文件的數(shù)據(jù)。
4、map數(shù)和reduce數(shù)設(shè)置多少最合適?
map數(shù)和reduce數(shù)并非越多越好,需要綜合多種情況來考慮
- 任務(wù)的計(jì)算復(fù)雜度:處理同樣數(shù)據(jù)量,計(jì)算邏輯越復(fù)雜,任務(wù)耗時(shí)會(huì)越長
- hdfs文件系統(tǒng)的元數(shù)據(jù)的壓力:如果生成的小文件很多,hdfs元數(shù)據(jù)會(huì)增長很快,會(huì)增加hdfs的元數(shù)據(jù)壓力
- 整合集群的計(jì)算資源
一般情況下不需要調(diào)整map數(shù)和reduce數(shù),當(dāng)單個(gè)map執(zhí)行時(shí)間過長時(shí),且map數(shù)不多的情況下,就需要通過調(diào)整map數(shù),通過提高map數(shù),提高并發(fā)來縮短單個(gè)map的執(zhí)行時(shí)間,從而縮短整合任務(wù)的計(jì)算時(shí)間。同樣reduce階段單個(gè)reduce處理數(shù)據(jù)量很大,耗時(shí)比較長時(shí),而分配的reduce數(shù)不多的情況下,也可以通過提高reduce數(shù)來減少每個(gè)reudce的計(jì)算時(shí)長。
那單個(gè)map執(zhí)行多少時(shí)間為宜:根據(jù)在大數(shù)據(jù)集群上的實(shí)踐經(jīng)驗(yàn),單個(gè)map宜在1-2分鐘執(zhí)行完是比較好,如果單個(gè)map執(zhí)行時(shí)間太短,會(huì)消耗比較多的時(shí)間在申請集群資源和初始化資源階段,反倒影響整體任務(wù)效率;如果單個(gè)map執(zhí)行時(shí)間太長,長時(shí)間占用集群資源,不能及時(shí)釋放資源,導(dǎo)致其他任務(wù)長時(shí)間等待,不利于集群計(jì)算資源的合理利用。
同樣,reduce任務(wù)階段,一般執(zhí)行會(huì)比較長,因此reduce階段不reduce數(shù)不宜太多,一般不超過集群cores數(shù)的50%,太多會(huì)占用大量集群資源,導(dǎo)致其他任務(wù)獲取不到資源而排隊(duì),同時(shí)也會(huì)生成過多的文件。