單個表每天數(shù)據(jù)有50億左右。需用二級分區(qū)優(yōu)化該表。
1、最初查詢
insert into table xx_parquet_v2 PARTITION(dt, uiappid) select %s from xxx where dt= %s;
錯誤:
Java Heap Space?;蛘逩C overhead limit exceeded。
原因:
Parquet和ORC是列式批處理文件格式。這些格式要求在寫入文件之前將批次的行(batches of rows)緩存在內(nèi)存中。在執(zhí)行INSERT語句時,動態(tài)分區(qū)目前的實(shí)現(xiàn)是:至少為每個動態(tài)分區(qū)目錄打開一個文件寫入器(file writer)。由于這些緩沖區(qū)是按分區(qū)維護(hù)的,因此在運(yùn)行時所需的內(nèi)存量隨著分區(qū)數(shù)量的增加而增加。所以經(jīng)常會導(dǎo)致mappers或reducers的OOM,具體取決于打開的文件寫入器(file writer)的數(shù)量。
通過INSERT語句插入數(shù)據(jù)到動態(tài)分區(qū)表中,也可能會超過HDFS同時打開文件數(shù)的限制。
如果沒有join或聚合,INSERT ... SELECT語句會被轉(zhuǎn)換為只有map任務(wù)的作業(yè)。mapper任務(wù)會讀取輸入記錄然后將它們發(fā)送到目標(biāo)分區(qū)目錄。在這種情況下,每個mapper必須為遇到的每個動態(tài)分區(qū)創(chuàng)建一個新的文件寫入器(file writer)。mapper在運(yùn)行時所需的內(nèi)存量隨著它遇到的分區(qū)數(shù)量的增加而增加。
詳細(xì)原因:https://blog.csdn.net/frank_jyp/article/details/81780821
2、第一次修改
set hive.optimize.sort.dynamic.partition = true,從新跑上述語句。
通過這個優(yōu)化,這個只有map任務(wù)的mapreduce會引入reduce過程,這樣動態(tài)分區(qū)的那個字段比如日期在傳到reducer時會被排序。由于分區(qū)字段是排序的,因此每個reducer只需要保持一個文件寫入器(file writer)隨時處于打開狀態(tài),在收到來自特定分區(qū)的所有行后,關(guān)閉記錄寫入器(record writer),從而減小內(nèi)存壓力。這種優(yōu)化方式在寫parquet文件時使用的內(nèi)存要相對少一些,但代價(jià)是要對分區(qū)字段進(jìn)行排序。
但reduce階段一直卡在99%,判斷是uiappid數(shù)據(jù)傾斜導(dǎo)致。驗(yàn)證數(shù)據(jù)傾斜:
# 找出uiappid條數(shù)大于1億條的uiappid
select uiappid, count(*) as t from xxx where dt=%s group by uiappid having t>100000000;
然后你會發(fā)現(xiàn)跑得特別慢。開啟map group優(yōu)化(Map端部分聚合,相當(dāng)于Combiner):
hive.map.aggr=true
設(shè)置上述參數(shù)即可。若是其他情況的group優(yōu)化,可參考hive.groupby.skewindata參數(shù)。
hive.groupby.skewindata=true
有數(shù)據(jù)傾斜的時候進(jìn)行負(fù)載均衡,當(dāng)hive.groupby.skewindata設(shè)定為 true,生成的查詢計(jì)劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結(jié)果集合會隨機(jī)分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的 Group By Key 有可能被分發(fā)到不同的 Reduce 中,從而達(dá)到負(fù)載均衡的目的;第二個 MR Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
3、第二次修改
分兩步:
1、第一步:找出條數(shù)大于1億的uiappid后,select時過濾調(diào)這些大的uiappid。通過這個優(yōu)化過,reduce階段單個key的數(shù)據(jù)都不超過1億條,可以快速得到結(jié)果。
set hive.optimize.sort.dynamic.partition = true;
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid not in ('a','b');
2、第二步:再次將uiappid條數(shù)大于1億的數(shù)據(jù)插入表中。因?yàn)榇笥?億條的uiappid比較少,可以為每個mapper遇到的分區(qū)創(chuàng)建一個文件寫入器(file writer)。
insert into table xx_parquet_v2 PARTITION(dt='%s', uiappid) select %s from xxx where dt= %s and uiappid in ('a','b');
4、其他的配置
set mapreduce.map.memory.mb=6144;
set mapreduce.map.java.opts=-Xmx4096m; # map 內(nèi)存配置
set mapreduce.input.fileinputformat.split.maxsize=1024000000;
set mapreduce.input.fileinputformat.split.minsize=1024000000;
set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000; # map文件大小配置
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
set parquet.memory.min.chunk.size=100000; # parquet文件格式配置
set hive.exec.dynamic.partition.mode=nonstrict; #配置動態(tài)分區(qū)
set mapreduce.reduce.memory.mb=8192;
set mapreduce.reduce.java.opts=-Xmx6144m; # 配置reduce內(nèi)存限制