總結(jié)了一下在以往工作中,對于Hive SQL調(diào)優(yōu)的一些實(shí)際應(yīng)用,是日常積累的一些優(yōu)化技巧,如有出入,歡迎在評論區(qū)留言探討~
一、EXPLAIN 查看執(zhí)行計(jì)劃
二、建表優(yōu)化
2.1 分區(qū)
- 分區(qū)表基本操作,partitioned
- 二級分區(qū)
- 動態(tài)分區(qū)
2.2 分桶
- 分桶表基本操作,clustered
- 分桶表主要是抽樣查詢,找出具有代表性的結(jié)果
2.3 選擇合適的文件格式和壓縮格式
- LZO,拉茲羅
- Snappy
- 壓縮速度快,壓縮比高
三、HiveSQL語法優(yōu)化
3.1 單表查詢優(yōu)化
-
列裁剪和分區(qū)裁剪,全表和全列掃描效率都很差,生產(chǎn)環(huán)境絕對不要使用
SELECT *,所謂列裁剪就是在查詢時只讀取需要的列,分區(qū)裁剪就是只讀取需要的分區(qū)- 與列裁剪優(yōu)化相關(guān)的配置項(xiàng)是
hive.optimize.cp,默認(rèn)是true - 與分區(qū)裁剪優(yōu)化相關(guān)的則是
hive.optimize.pruner,默認(rèn)是true - 在
HiveSQL解析階段對應(yīng)的則是ColumnPruner邏輯優(yōu)化器
- 與列裁剪優(yōu)化相關(guān)的配置項(xiàng)是
-
Group By 配置調(diào)整,
map階段會把同一個key發(fā)給一個reduce,當(dāng)一個key過大時就傾斜了,可以開啟map端預(yù)聚合,可以有效減少shuffle數(shù)據(jù)量并# 是否在map端聚合,默認(rèn)為true set hive.map.aggr = true; # 在map端聚合的條數(shù) set hive.groupby.mapaggr.checkintervel = 100000; # 在數(shù)據(jù)傾斜的時候進(jìn)行均衡負(fù)載(默認(rèn)是false),開啟后會有 兩個`mr任務(wù)`。 # 當(dāng)選項(xiàng)設(shè)定為true時,第一個 `mr任務(wù)` 會將map輸出的結(jié)果隨機(jī)分配到`reduce`, # 每個`reduce`會隨機(jī)分布到`reduce`上,這樣的處理結(jié)果是會使相同的`group by key`分到不同的`reduce`上。 # 第二個 `mr任務(wù)` 再根據(jù)預(yù)處理的結(jié)果按`group by key`分到`reduce`上, # 保證相同`group by key`的數(shù)據(jù)分到同一個`reduce`上。 # *切記!?。? # 這樣能解決數(shù)據(jù)傾斜,但是不能讓運(yùn)行速度更快 # 在數(shù)據(jù)量小的時候,開始數(shù)據(jù)傾斜負(fù)載均衡可能反而會導(dǎo)致時間變長 # 配置項(xiàng)畢竟是死的,單純靠它有時不能根本上解決問題 # 因此還是建議自行了解數(shù)據(jù)傾斜的細(xì)節(jié),并優(yōu)化查詢語句 set hive.groupby.skewindata = true; -
Vectorization,矢量計(jì)算技術(shù),通過設(shè)置批處理的增量大小為1024行單次來達(dá)到比單行單次更好的效率
# 開啟矢量計(jì)算 set hive.vectorized.execution.enabled = true; # 在reduce階段開始矢量計(jì)算 set hive.vectorized.execution.reduce.enabled = true; 多重模式,一次讀取多次插入,同一張表的插入操作優(yōu)化成先
from table再insertin/exists或者join用
left semi join代替(為什么替代擴(kuò)展一下~)
3.2 多表查詢優(yōu)化
-
CBO優(yōu)化,成本優(yōu)化器,代價最小的執(zhí)行計(jì)劃就是最好的執(zhí)行計(jì)劃
- join的時候表的順序關(guān)系,前面的表都會被加載到內(nèi)存中,后面的表進(jìn)行磁盤掃描
- 通過
hive.cbo.enable,自動優(yōu)化hivesql中多個join的執(zhí)行順序 - 可以通過查詢一下參數(shù),這些一般都是true,無需修改
set hive.cbo.enable = true; set hive.compute.query.using.stats = true; set hive.stats.fetch.column.stats = true; set hive.stats.fetch.partition.stats = true; -
謂詞下推(非常關(guān)鍵的一個優(yōu)化),將
sql語句中的where謂詞邏輯都盡可能提前執(zhí)行,減少下游處理的數(shù)據(jù)量,
在關(guān)系型數(shù)據(jù)庫如MySQL中,也有謂詞下推(Predicate Pushdown,PPD)的概念,
它就是將sql語句中的where謂詞邏輯都盡可能提前執(zhí)行,減少下游處理的數(shù)據(jù)量# 這個設(shè)置是默認(rèn)開啟的 # 如果關(guān)閉了但是cbo開啟,那么關(guān)閉依然不會生效 # 因?yàn)閏bo會自動使用更為高級的優(yōu)化計(jì)劃 # 與它對應(yīng)的邏輯優(yōu)化器是PredicatePushDown # 該優(yōu)化器就是將OperatorTree中的FilterOperator向上提 set hive.optimize.pdd = true; # 舉個例子 # 對forum_topic做過濾的where語句寫在子查詢內(nèi)部,而不是外部 select a.uid,a.event_type,b.topic_id,b.title from calendar_record_log a left outer join ( select uid,topic_id,title from forum_topic where pt_date = 20220108 and length(content) >= 100 ) b on a.uid = b.uid where a.pt_date = 20220108 and status = 0; -
Map Join,
map join是指將join操作兩方中比較小的表直接分發(fā)到各個map進(jìn)程的內(nèi)存中,在map中進(jìn)行join的操作。
map join特別適合大小表join的情況,Hive會將build table和probe table在map端直接完成join過程,消滅了reduce,減少shuffle,所以會減少開銷-
set hive.auto.convert.join = true,配置開啟,默認(rèn)是true -
注意?。?! 如果執(zhí)行
小表join大表,小表作為主連接的主表,所有數(shù)據(jù)都要寫出去,此時會走reduce階段,mapjoin會失效 -
大表join小表不受影響,上一條的原因主要是因?yàn)?code>小表join大表的時候,map階段不知道reduce的結(jié)果其他reduce是否有, - 所以必須在最后
reduce聚合的時候再處理,就產(chǎn)生了reduce的開銷
# 舉個例子 # 在最常見的`hash join`方法中,一般總有一張相對小的表和一張相對大的表, # 小表叫`build table`,大表叫`probe table` # Hive在解析帶join的SQL語句時,會默認(rèn)將最后一個表作為`probe table`, # 將前面的表作為`build table`并試圖將它們讀進(jìn)內(nèi)存 # 如果表順序?qū)懛矗琡probe table`在前面,引發(fā)`OOM(內(nèi)存不足)`的風(fēng)險就高了 # 在維度建模數(shù)據(jù)倉庫中,事實(shí)表就是`probe table`,維度表就是`build table` # 假設(shè)現(xiàn)在要將日歷記錄事實(shí)表和記錄項(xiàng)編碼維度表來`join` select a.event_type,a.event_code,a.event_desc,b.upload_time from calendar_event_code a inner join ( select event_type,upload_time from calendar_record_log where pt_date = 20220108 ) b on a.event_type = b.event_type; -
-
Map Join,大表和大表的
MapReduce任務(wù),可以使用SMB Join- 直接join耗時會很長,但是根據(jù)某字段分桶后,兩個大表每一個桶就是一個小文件,兩個表的每個小文件的分桶字段都應(yīng)該能夠一一對應(yīng)(hash值取模的結(jié)果)
- 總結(jié)就是分而治之,注意兩個大表的分桶字段和數(shù)量都應(yīng)該保持一致
set hive.optimize.bucketmapjoin = true; set hive.optimeize.bucketmapjoin.sortedmerge = true; hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; -
多表join時key相同,這種情況會將多個
join合并為一個mr 任務(wù)來處理# 舉個例子 # 如果下面兩個join的條件不相同 # 比如改成a.event_code = c.event_code # 就會拆成兩個MR job計(jì)算 select a.event_type,a.event_code,a.event_desc,b.upload_time from calendar_event_code a inner join ( select event_type,upload_time from calendar_record_log where pt_date = 20220108 ) b on a.event_type = b.event_type inner join ( select event_type,upload_time from calendar_record_log_2 where pt_date = 20220108 ) c on a.event_type = c.event_type; 笛卡爾積,在生產(chǎn)環(huán)境中嚴(yán)禁使用
3.3 其他查詢優(yōu)化
-
Sort By 代替 Order By,HiveQL中的
order by與其他sql方言中的功能一樣,就是將結(jié)果按某字段全局排序,這會導(dǎo)致所有map端數(shù)據(jù)都進(jìn)入一個reducer中,
在數(shù)據(jù)量大時可能會長時間計(jì)算不完。如果使用sort by,那么還是會視情況啟動多個reducer進(jìn)行排序,并且保證每個reducer內(nèi)局部有序。
為了控制map端數(shù)據(jù)分配到reducer的key,往往還要配合distribute by一同使用,如果不加distribute by的話,map端數(shù)據(jù)就會隨機(jī)分配到reducer# 舉個例子 select uid,upload_time,event_type,record_data from calendar_record_log where pt_date >= 20220108 and pt_date <= 20220131 distribute by uid sort by upload_time desc,event_type desc; Group By代替Distinct,當(dāng)要統(tǒng)計(jì)某一列的去重?cái)?shù)時,如果數(shù)據(jù)量很大,
count(distinct)就會非常慢,原因與order by類似,
count(distinct)邏輯只會有很少的reducer來處理。但是這樣寫會啟動兩個mr任務(wù)(單純distinct只會啟動一個),
所以要確保數(shù)據(jù)量大到啟動mr任務(wù)的overhead遠(yuǎn)小于計(jì)算耗時,才考慮這種方法,當(dāng)數(shù)據(jù)集很小或者key的傾斜比較明顯時,group by還可能會比distinct慢
四、數(shù)據(jù)傾斜
注意要和數(shù)據(jù)過量的情況區(qū)分開,數(shù)據(jù)傾斜是大部分任務(wù)都已經(jīng)執(zhí)行完畢,但是某一個任務(wù)或者少數(shù)幾個任務(wù),一直未能完成,甚至執(zhí)行失敗,
而數(shù)據(jù)過量,是大部分任務(wù)都執(zhí)行的很慢,這種情況需要通過擴(kuò)充執(zhí)行資源的方式來加快速度,大數(shù)據(jù)編程不怕數(shù)據(jù)量大,就怕數(shù)據(jù)傾斜,一旦數(shù)據(jù)傾斜,嚴(yán)重影響效率
4.1 單表攜帶了 Group By 字段的查詢
- 任務(wù)中存在
group by操作,同時聚合函數(shù)為count或sum,單個key導(dǎo)致的數(shù)據(jù)傾斜可以這樣通過設(shè)置開啟map端預(yù)聚合參數(shù)的方式來處理# 是否在map端聚合,默認(rèn)為true set hive.map.aggr = true; # 在map端聚合的條數(shù) set hive.groupby.mapaggr.checkintervel = 100000; # 有數(shù)據(jù)傾斜的時候開啟負(fù)載均衡,這樣會生成兩個mr任務(wù) set hive.groupby.skewindata = true; - 任務(wù)中存在
group by操作,同時聚合函數(shù)為count或sum,多個key導(dǎo)致的數(shù)據(jù)傾斜可以通過增加reduce的數(shù)量來處理- 增加分區(qū)可以減少不同分區(qū)之間的數(shù)據(jù)量差距,而且增加的分區(qū)時候不能是之前分區(qū)數(shù)量的倍數(shù),不然會導(dǎo)致取模結(jié)果相同繼續(xù)分在相同分區(qū)
- 第一種修改方式
# 每個reduce處理的數(shù)量 set hive.exec.reduce.bytes.per.reducer = 256000000; # 每個任務(wù)最大的reduce數(shù)量 set hive.exec.reducers.max = 1009; # 計(jì)算reducer數(shù)的公式,根據(jù)任務(wù)的需要調(diào)整每個任務(wù)最大的reduce數(shù)量 N = min(設(shè)置的最大數(shù),總數(shù)量數(shù)/每個reduce處理的數(shù)量)- 第二種修改方式
# 在hadoop的mapred-default.xml文件中修改 set mapreduce.job.reduces = 15;
4.2 兩表或多表的 join 關(guān)聯(lián)時,其中一個表較小,但是 key 集中
- 設(shè)置參數(shù)增加
map數(shù)量# join的key對應(yīng)記錄條數(shù)超過該數(shù)量,會進(jìn)行分拆 set hive.skewjoin.key = 1000; # 并設(shè)置該參數(shù)為true,默認(rèn)是false set hive.optimize.skewjoin = true; # 上面的參數(shù)如果開啟了會將計(jì)算數(shù)量超過閾值的key寫進(jìn)臨時文件,再啟動另外一個任務(wù)做map join # 可以通過設(shè)置這個參數(shù),控制第二個任務(wù)的mapper數(shù)量,默認(rèn)10000 set hive.skewjoin.mapjoin.map.tasks = 10000; - 使用
mapjoin,減少reduce從根本上解決數(shù)據(jù)傾斜,參考HiveSQL語法優(yōu)化 -> 多表查詢優(yōu)化 -> Map Join,大表和大表的MapReduce任務(wù),SMB Join
4.3 兩表或多表的 join 關(guān)聯(lián)時,有 Null值 或 無意義值
這種情況很常見,比如當(dāng)事實(shí)表是日志類數(shù)據(jù)時,往往會有一些項(xiàng)沒有記錄到,我們視情況會將它置為null,或者空字符串、-1等,
如果缺失的項(xiàng)很多,在做join時這些空值就會非常集中,拖累進(jìn)度,因此,若不需要空值數(shù)據(jù),就提前寫where語句過濾掉,
需要保留的話,將空值key用隨機(jī)方式打散,例如將用戶ID為null的記錄隨機(jī)改為負(fù)值:
select a.uid,a.event_type,b.nickname,b.age
from (
select
(case when uid is null then cast(rand()*-10240 as int) else uid end) as uid,
event_type from calendar_record_log
where pt_date >= 20220108
) a left outer join (
select uid,nickname,age from user_info where status = 4
) b on a.uid = b.uid;
4.4 兩表或多表的 join 關(guān)聯(lián)時,數(shù)據(jù)類型不統(tǒng)一
比如int類型和string類型進(jìn)行關(guān)聯(lián),關(guān)聯(lián)時候以小類型作為分區(qū),這里int、string會到一個reduceTask中,如果數(shù)據(jù)量多,會造成數(shù)據(jù)傾斜
# 可以通過轉(zhuǎn)換為同一的類型來處理
cast(user.id as string)
4.5 單獨(dú)處理傾斜key
這其實(shí)是上面處理空值方法的拓展,不過傾斜的key變成了有意義的,一般來講傾斜的key都很少,我們可以將它們抽樣出來,
對應(yīng)的行單獨(dú)存入臨時表中,然后打上一個較小的隨機(jī)數(shù)前綴(比如0~9),最后再進(jìn)行聚合
五、Hive Job 優(yōu)化
5.1 Hive Map 優(yōu)化
5.1.1 Map數(shù)量多少的影響
- Map數(shù)過大
-
map階段輸出文件太小,產(chǎn)生大量小文件 - 初始化和創(chuàng)建
map的開銷很大
-
- Map數(shù)太小
- 文件處理或查詢并發(fā)度小,
Job執(zhí)行時間過長 - 大量作業(yè)時,容易堵塞集群
- 文件處理或查詢并發(fā)度小,
5.1.2 控制Map數(shù)的原則
根據(jù)實(shí)際情況,控制map數(shù)量需要遵循兩個原則
- 第一是使大數(shù)據(jù)量利用合適的
map數(shù) - 第二是使單個
map任務(wù)處理合適的數(shù)據(jù)量
5.1.3 復(fù)雜文件適當(dāng)增加Map數(shù)
- 當(dāng)
input的文件都很大,任務(wù)邏輯復(fù)雜,map執(zhí)行非常慢的時候,可以考慮增加map數(shù),來使得每個map處理的數(shù)據(jù)量減少,從而提高任務(wù)的執(zhí)行效率 - 那么如何增加
map的數(shù)量呢?在map階段,文件先被切分成split塊,而后每一個split切片對應(yīng)一個Mapper任務(wù),
FileInputFormat這個類先對輸入文件進(jìn)行邏輯上的劃分,以128m為單位,將原始數(shù)據(jù)從邏輯上分割成若干個split,每個split切片對應(yīng)一個mapper任務(wù),
所以說減少切片的大小就可增加map數(shù)量 - 可以依據(jù)公式計(jì)算
computeSliteSize(Math.max(minSize, Math.min(maxSize, blockSize))) = blockSize = 128m - 執(zhí)行語句:
set mapreduce.input.fileinputformat.split.maxsize = 100;
5.1.4 小文件進(jìn)行合并減少M(fèi)ap數(shù)
為什么要進(jìn)行小文件合并?因?yàn)槿绻粋€任務(wù)有很多小文件(遠(yuǎn)遠(yuǎn)小于塊大小128m),則每個小文件也會被當(dāng)做一個塊,用一個map任務(wù)來完成,
而一個map任務(wù)啟動和初始化的時間遠(yuǎn)遠(yuǎn)大于邏輯處理的時間,就會造成很大的資源浪費(fèi),同時可執(zhí)行的map數(shù)是受限的
兩種方式合并小文件
- 在
Map執(zhí)行前合并小文件,減少map數(shù)量// 每個Map最大輸入大小(這個值決定了合并后文件的數(shù)量) set mapred.max.split.size = 256000000; // 一個節(jié)點(diǎn)上split的至少的大小(這個值決定了多個DataNode上的文件是否需要合并) set mapred.min.split.size.per.node = 100000000; // 一個交換機(jī)下split的至少的大小(這個值決定了多個交換機(jī)上的文件是否需要合并) set mapred.min.split.size.per.rack = 100000000; // 執(zhí)行Map前進(jìn)行小文件合并 set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; - 在
Map-Reduce任務(wù)執(zhí)行結(jié)束時合并小文件,減少小文件輸出// 設(shè)置map端輸出進(jìn)行合并,默認(rèn)為true set hive.merge.mapfiles = true; // 設(shè)置reduce端輸出進(jìn)行合并,默認(rèn)為false set hive.merge.mapredfiles = true; // 設(shè)置合并文件的大小,默認(rèn)是256 set hive.merge.size.per.task = 256 * 1000 * 1000; // 當(dāng)輸出文件的平均大小小于該值時,啟動一個獨(dú)立的`MapReduce任務(wù)`進(jìn)行文件`merge`。 set hive.merge.smallfiles.avgsize = 16000000;
5.1.5 Map端預(yù)聚合減少M(fèi)ap數(shù)量
- 相當(dāng)于在
map端執(zhí)行combiner,執(zhí)行命令:set hive.map.aggr = true; -
combiners是對map端的數(shù)據(jù)進(jìn)行適當(dāng)?shù)木酆?,其好處是減少了從map端到reduce端的數(shù)據(jù)傳輸量 - 其作用的本質(zhì),是將
map計(jì)算的結(jié)果進(jìn)行二次聚合,使Key-Value<List>中List的數(shù)據(jù)量變小,從而達(dá)到減少數(shù)據(jù)量的目的
5.1.6 推測執(zhí)行
- 在分布式集群環(huán)境下,因?yàn)槌绦駼ug(包括Hadoop本身的bug),負(fù)載不均衡或者資源分布不均等原因,會造成同一個作業(yè)的多個任務(wù)之間運(yùn)行速度不一致,
有些任務(wù)的運(yùn)行速度可能明顯慢于其他任務(wù)(比如一個作業(yè)的某個任務(wù)進(jìn)度只有50%,而其他所有任務(wù)已經(jīng)運(yùn)行完畢),則這些任務(wù)會拖慢作業(yè)的整體執(zhí)行進(jìn)度 - Hadoop采用了
推測執(zhí)行(Speculative Execution)機(jī)制,它根據(jù)一定的法則推測出拖后腿的任務(wù),并為這樣的任務(wù)啟動一個備份任務(wù),
讓該任務(wù)與原始任務(wù)同時處理同一份數(shù)據(jù),并最終選用最先成功運(yùn)行完成任務(wù)的計(jì)算結(jié)果作為最終結(jié)果 - 執(zhí)行命令:
set mapred.reduce.tasks.speculative.execution = true; # 默認(rèn)是true - 當(dāng)然,如果用戶對于運(yùn)行時的偏差非常敏感的話,那么可以將這些功能關(guān)閉掉,如果用戶因?yàn)檩斎霐?shù)據(jù)量很大而需要執(zhí)行長時間的
map task或者reduce task的話,
那么啟動推測執(zhí)行造成的浪費(fèi)是非常巨大的
5.1.7 合理控制Map數(shù)量的實(shí)際案例
假設(shè)一個SQL任務(wù):
SELECT COUNT(1)
FROM fx67ll_alarm_count_copy
WHERE alarm_date = "2021-01-08";
該任務(wù)的輸入目錄inputdir是:/group/fx67ll_data/fx67ll_data_etl/date/fx67ll_alarm_count_copy/alarm_date=2021-01-08,共有194個文件,
其中很多是遠(yuǎn)遠(yuǎn)小于128m的小文件,總大小約9G,正常執(zhí)行會用194個Map任務(wù),map總共消耗的計(jì)算資源:SLOTS_MILLIS_MAPS= 610,023
通過在Map執(zhí)行前合并小文件,減少M(fèi)ap數(shù)
# 前面三個參數(shù)確定合并文件塊的大小
# 大于文件塊大小128m的,按照128m來分隔
# 小于128m,大于100m的,按照100m來分隔
# 把那些小于100m的(包括小文件和分隔大文件剩下的),進(jìn)行合并,最終生成了74個塊
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
合并后,用了74個map任務(wù),map消耗的計(jì)算資源:SLOTS_MILLIS_MAPS= 323,098,對于這個簡單SQL任務(wù),執(zhí)行時間上可能差不多,但節(jié)省了一半的計(jì)算資源
再假設(shè)這樣一個SQL任務(wù):
SELECT data_fx67ll,
COUNT(1),
COUNT(DISTINCT id),
SUM(CASE WHEN …),
SUM(CASE WHEN …),
SUM(…)
FROM fx67ll_device_info_zs
GROUP data_fx67ll
如果表fx67ll_device_info_zs只有一個文件,大小為120m,但包含幾千萬的記錄,如果用1個map去完成這個任務(wù),肯定是比較耗時的,
這種情況下,我們要考慮將這一個文件合理的拆分成多個
增加Reduce數(shù)量,來增加Map數(shù)量
set mapred.reduce.tasks=10;
CREATE TABLE fx67ll_device_info_zs_temp
AS
SELECT *
FROM fx67ll_device_info_zs
DISTRIBUTE BY RAND(123);
這樣會將fx67ll_device_info_zs表的記錄,隨機(jī)的分散到包含10個文件的fx67ll_device_info_zs_temp表中,
再用fx67ll_device_info_zs_temp代替上面sql中的fx67ll_device_info_zs表,
則會用10個map任務(wù)去完成,每個map任務(wù)處理大于12m(幾百萬記錄)的數(shù)據(jù),效率肯定會好很多
5.2 Hive Reduce 優(yōu)化
5.2.1 Reduce數(shù)量多少的影響
- 同
map一樣,啟動和初始化reduce也會消耗時間和資源 - 另外,有多少個
reduce,就會有多少個輸出文件,如果生成了很多個小文件,那么如果這些小文件作為下一個任務(wù)的輸入,則也會出現(xiàn)小文件過多的問題
5.2.2 控制Reduce數(shù)的原則
和map一樣,控制reduce數(shù)量需要遵循兩個原則
- 第一是使大數(shù)據(jù)量利用合適的
reduce數(shù) - 第二是使單個
reduce任務(wù)處理合適的數(shù)據(jù)量
5.2.3 Hive自己如何確定Reduce數(shù)
reduce個數(shù)的設(shè)定極大影響任務(wù)執(zhí)行效率,不指定reduce個數(shù)的情況下,Hive會猜測確定一個reduce個數(shù),基于以下兩個設(shè)定:
# 每個reduce任務(wù)處理的數(shù)據(jù)量,默認(rèn)為 1000^3=1G
hive.exec.reducers.bytes.per.reducer
# 每個任務(wù)最大的reduce數(shù),默認(rèn)為999
hive.exec.reducers.max
計(jì)算reducer數(shù)的公式很簡單N = min(參數(shù)2,總輸入數(shù)據(jù)量 / 參數(shù)1)
即,如果reduce的輸入(map的輸出)總大小不超過1G,那么只會有一個reduce任務(wù)
舉個例子:
SELECT alarm_date,
COUNT(1)
FROM fx67ll_alarm_count_copy
WHERE alarm_date = "2021-01-08"
GROUP BY alarm_date;
該任務(wù)的輸入目錄inputdir是:/group/fx67ll_data/fx67ll_data_etl/date/fx67ll_alarm_count_copy/alarm_date=2021-01-08,
總大小為9G多,因此這句有10個reduce
5.2.4 如何調(diào)整Reduce數(shù)量
注意?。?!實(shí)際開發(fā)中,reduce的個數(shù)一般通過程序自動推定,而不人為干涉,因?yàn)槿藶榭刂频脑挘绻褂貌划?dāng)很容易造成結(jié)果不準(zhǔn)確,且降低執(zhí)行效率
- 通過調(diào)整每個
reduce任務(wù)處理的數(shù)據(jù)量來調(diào)整reduce個數(shù),處理的數(shù)據(jù)量少了,任務(wù)數(shù)就多了# 設(shè)置每個reduce任務(wù)處理的數(shù)據(jù)量500M,默認(rèn)是1G set hive.exec.reducers.bytes.per.reducer = 500000000; SELECT alarm_date, COUNT(1) FROM fx67ll_alarm_count_copy WHERE alarm_date = "2021-01-08" GROUP BY alarm_date; 這次有20個reduce - 直接調(diào)整每個
Job中的最大reduce數(shù)量,過于簡單粗暴,慎用,盡量不要,雖然設(shè)置了reduce的個數(shù)看起來好像執(zhí)行速度變快了,但是實(shí)際并不是這樣的# 設(shè)置每個任務(wù)最大的reduce數(shù)為15個,默認(rèn)為999 set mapred.reduce.tasks = 15; SELECT alarm_date, COUNT(1) FROM fx67ll_alarm_count_copy WHERE alarm_date = "2021-01-08" GROUP BY alarm_date; 這次有15個reduce
5.2.5 推測執(zhí)行
參考map優(yōu)化的最后一項(xiàng)
5.2.6 什么情況下只有一個Reduce
很多時候你會發(fā)現(xiàn)任務(wù)中不管數(shù)據(jù)量多大,不管你有沒有設(shè)置調(diào)整reduce個數(shù)的參數(shù),任務(wù)中一直都只有一個reduce任務(wù),
其實(shí)只有一個reduce任務(wù)的情況,除了數(shù)據(jù)量小于hive.exec.reducers.bytes.per.reducer參數(shù)值的情況外,還有以下原因:
- 沒有
Group By的匯總,例如:SELECT alarm_date, COUNT(1) FROM fx67ll_alarm_count_copy WHERE alarm_date = "2021-01-08" GROUP BY alarm_date; 寫成 SELECT COUNT(1) FROM fx67ll_alarm_count_copy WHERE alarm_date = "2021-01-08"; 注意避免這樣情況的發(fā)生 - 用了
Order by排序,因?yàn)樗鼤?shù)據(jù)進(jìn)行全局排序,所以數(shù)據(jù)量特別大的時候效率非常低,盡量避免 - 有笛卡爾積,生產(chǎn)環(huán)境必須嚴(yán)格避免
5.3 Hive 任務(wù)整體優(yōu)化
5.3.1 Fetch抓取
Fetch抓取是指Hive在某些情況的查詢可以不必使用mr 任務(wù),例如在執(zhí)行一個簡單的select * from XX時,我們只需要簡單的進(jìn)行抓取對應(yīng)目錄下的數(shù)據(jù)即可。
在hive-default.xml.template中,hive.fetch.task.conversion(默認(rèn)是morn),老版本中默認(rèn)是minimal。
該屬性為morn時,在全局查找,字段查找,limit查找等都不走mr 任務(wù)
5.3.2 本地模式
Hive也可以不將任務(wù)提交到集群進(jìn)行運(yùn)算,而是直接在一臺節(jié)點(diǎn)上處理,因?yàn)橄颂峤坏郊旱膐verhead,所以比較適合數(shù)據(jù)量很小,且邏輯不復(fù)雜的任務(wù)。
設(shè)置hive.exec.mode.local.auto為true可以開啟本地模式,但任務(wù)的輸入數(shù)據(jù)總量必須小于hive.exec.mode.local.auto.inputbytes.max(默認(rèn)值128MB),
且mapper數(shù)必須小于hive.exec.mode.local.auto.tasks.max(默認(rèn)值4),reducer數(shù)必須為0或1,才會真正用本地模式執(zhí)行
5.3.3 并行執(zhí)行
Hive中互相沒有依賴關(guān)系的job間是可以并行執(zhí)行的,最典型的就是多個子查詢union all,
在集群資源相對充足的情況下,可以開啟并行執(zhí)行,即將參數(shù)hive.exec.parallel設(shè)為true,
另外hive.exec.parallel.thread.number可以設(shè)定并行執(zhí)行的線程數(shù),默認(rèn)為8,一般都夠用。
注意?。。]資源無法并行,且數(shù)據(jù)量小時開啟可能還沒不開啟快,所以建議數(shù)據(jù)量大時開啟
5.3.4 嚴(yán)格模式
要開啟嚴(yán)格模式,需要將參數(shù)hive.mapred.mode設(shè)為strict。
所謂嚴(yán)格模式,就是強(qiáng)制不允許用戶執(zhí)行3種有風(fēng)險的sql語句,一旦執(zhí)行會直接失敗,這3種語句是:
- 查詢分區(qū)表時不限定分區(qū)列的語句
- 兩表join產(chǎn)生了笛卡爾積的語句
- 用order by來排序但沒有指定limit的語句
5.3.5 JVM重用
- 主要用于處理小文件過多的時候
- 在
mr 任務(wù)中,默認(rèn)是每執(zhí)行一個task就啟動一個JVM,如果task非常小而碎,那么JVM啟動和關(guān)閉的耗時就會很長 - 可以通過調(diào)節(jié)參數(shù)
mapred.job.reuse.jvm.num.tasks來重用 - 例如將這個參數(shù)設(shè)成5,那么就代表同一個
mr 任務(wù)中順序執(zhí)行的5個task可以重復(fù)使用一個JVM,減少啟動和關(guān)閉的開銷,但它對不同mr 任務(wù)中的task無效
5.3.6 啟用壓縮
壓縮job的中間結(jié)果數(shù)據(jù)和輸出數(shù)據(jù),可以用少量CPU時間節(jié)省很多空間,壓縮方式一般選擇Snappy,效率最高。
要啟用中間壓縮,需要設(shè)定hive.exec.compress.intermediate為true,
同時指定壓縮方式hive.intermediate.compression.codec為org.apache.hadoop.io.compress.SnappyCodec。
另外,參數(shù)hive.intermediate.compression.type可以選擇對塊(BLOCK)還是記錄(RECORD)壓縮,BLOCK的壓縮率比較高。
輸出壓縮的配置基本相同,打開hive.exec.compress.output即可
5.3.7 采用合適的存儲格式
- 在Hive SQL的
create table語句中,可以使用stored as ...指定表的存儲格式。
Hive表支持的存儲格式有TextFile、SequenceFile、RCFile、Avro、ORC、Parquet等。
存儲格式一般需要根據(jù)業(yè)務(wù)進(jìn)行選擇,在我們的實(shí)操中,絕大多數(shù)表都采用TextFile與Parquet兩種存儲格式之一。 -
TextFile是最簡單的存儲格式,它是純文本記錄,也是Hive的默認(rèn)格式,雖然它的磁盤開銷比較大,查詢效率也低,但它更多地是作為跳板來使用。 -
RCFile、ORC、Parquet等格式的表都不能由文件直接導(dǎo)入數(shù)據(jù),必須由TextFile來做中轉(zhuǎn)。 -
Parquet和ORC都是Apache旗下的開源列式存儲格式。列式存儲比起傳統(tǒng)的行式存儲更適合批量OLAP查詢,并且也支持更好的壓縮和編碼。 - 我們選擇
Parquet的原因主要是它支持Impala查詢引擎,并且我們對update、delete和事務(wù)性操作需求很低。
六、Hive的小文件
6.1 什么情況下會產(chǎn)生小文件?
- 動態(tài)分區(qū)插入數(shù)據(jù),產(chǎn)生大量的小文件,從而導(dǎo)致map數(shù)量劇增
- reduce數(shù)量越多,小文件也越多,有多少個reduce,就會有多少個輸出文件,如果生成了很多小文件,那這些小文件作為下一次任務(wù)的輸入
- 數(shù)據(jù)源本身就包含大量的小文件
6.2 小文件有什么樣的危害?
- 從Hive的角度看,小文件會開很多map,一個map開一個java虛擬機(jī)jvm去執(zhí)行,所以這些任務(wù)的初始化,啟動,執(zhí)行會浪費(fèi)大量的資源,嚴(yán)重影響性能
- 在hdfs中,每個小文件對象約占150byte,如果小文件過多會占用大量內(nèi)存,這樣NameNode內(nèi)存容量嚴(yán)重制約了集群的擴(kuò)展
- 每個hdfs上的文件,會消耗128字節(jié)記錄其meta信息,所以大量小文件會占用大量內(nèi)存
6.3 如何避免小文件帶來的危害?
6.3.1 從小文件產(chǎn)生的途經(jīng)就可以從源頭上控制小文件數(shù)量
- 使用Sequencefile作為表存儲格式,不要用textfile,在一定程度上可以減少小文件
- 減少reduce的數(shù)量(可以使用參數(shù)進(jìn)行控制)
- 少用動態(tài)分區(qū),用時記得按distribute by分區(qū)
6.3.2 對于已有的小文件
- 使用hadoop archive命令把小文件進(jìn)行歸檔,采用archive命令不會減少文件存儲大小,只會壓縮NameNode的空間使用
- 重建表,建表時減少reduce數(shù)量
我是 fx67ll.com,如果您發(fā)現(xiàn)本文有什么錯誤,歡迎在評論區(qū)討論指正,感謝您的閱讀!
如果您喜歡這篇文章,歡迎訪問我的 本文github倉庫地址,為我點(diǎn)一顆Star,Thanks~ :)
轉(zhuǎn)發(fā)請注明參考文章地址,非常感謝?。?!