火山日常啰嗦
學(xué)習(xí)了一些大數(shù)據(jù)的相關(guān)框架后,發(fā)現(xiàn)應(yīng)用層的東西確實(shí)不難,真正難的都是底層原理,所以我查看了很多資料,借鑒了前人的方法再加上自己的理解,寫(xiě)下了這篇文章。
數(shù)據(jù)傾斜的直白概念:
數(shù)據(jù)傾斜就是數(shù)據(jù)的分布不平衡,某些地方特別多,某些地方又特別少,導(dǎo)致的在處理數(shù)據(jù)的時(shí)候,有些很快就處理完了,而有些又遲遲未能處理完,導(dǎo)致整體任務(wù)最終遲遲無(wú)法完成,這種現(xiàn)象就是數(shù)據(jù)傾斜。
針對(duì)mapreduce的過(guò)程來(lái)說(shuō)就是,有多個(gè)reduce,其中有一個(gè)或者若干個(gè)reduce要處理的數(shù)據(jù)量特別大,而其他的reduce處理的數(shù)據(jù)量則比較小,那么這些數(shù)據(jù)量小的reduce很快就可以完成,而數(shù)據(jù)量大的則需要很多時(shí)間,導(dǎo)致整個(gè)任務(wù)一直在等它而遲遲無(wú)法完成。
跑mr任務(wù)時(shí)常見(jiàn)的reduce的進(jìn)度總是卡在99%,這種現(xiàn)象很大可能就是數(shù)據(jù)傾斜造成的。
產(chǎn)生數(shù)據(jù)傾斜的原因:
1) key的分布不均勻或者說(shuō)某些key太集中。
上面就說(shuō)過(guò),reduce的數(shù)據(jù)量大小差異過(guò)大,而reduce的數(shù)據(jù)是分區(qū)的結(jié)果,分區(qū)是對(duì)key求hash值,根據(jù)hash值決定該key被分到某個(gè)分區(qū),進(jìn)而進(jìn)入到某個(gè)reduce,而如果key很集中或者相同,那么計(jì)算得到它們的hash值可能一樣,那么就會(huì)被分配到同一個(gè)reduce,就會(huì)造成這個(gè)reduce所要處理的數(shù)據(jù)量過(guò)大。
2) 業(yè)務(wù)數(shù)據(jù)自身的特性。
比如某些業(yè)務(wù)數(shù)據(jù)作為key的字段本就很集中,那么結(jié)果肯定會(huì)導(dǎo)致數(shù)據(jù)傾斜啊。
還有其他的一些原因,但是,根本原因還是key的分布不均勻,而其他的原因就是會(huì)造成key不均勻,進(jìn)而導(dǎo)致數(shù)據(jù)傾斜的后果,所以說(shuō)根本原因是key的分布不均勻。
既然有數(shù)據(jù)傾斜這種現(xiàn)象,就必須要有數(shù)據(jù)傾斜對(duì)應(yīng)的處理方案啊。
簡(jiǎn)單地說(shuō)數(shù)據(jù)傾斜這種現(xiàn)象導(dǎo)致的任務(wù)遲遲不能完成,耗費(fèi)了太多時(shí)間,極大地影響了性能,所以我們數(shù)據(jù)傾斜的解決方案設(shè)計(jì)思路就是往如何提高性能,即如何縮短任務(wù)的處理時(shí)間這方面考慮的,而要提高性能,就要讓key分布相對(duì)均衡,所以我們的終極目標(biāo)就是考慮如何預(yù)處理數(shù)據(jù)才能夠使得它的key分布均勻。
常見(jiàn)的數(shù)據(jù)傾斜處理方案:
1 設(shè)置參數(shù)
1)設(shè)置hive.map.aggr=true //開(kāi)啟map端部分聚合功能,就是將key相同的歸到一起,減少數(shù)據(jù)量,這樣就可以相對(duì)地減少進(jìn)入reduce的數(shù)據(jù)量,在一定程度上可以提高性能,當(dāng)然,如果數(shù)據(jù)的減少量微乎其微,那對(duì)性能的影響幾乎沒(méi)啥變化。
2)設(shè)置hive.groupby.skewindata=true //如果發(fā)生了數(shù)據(jù)傾斜就可以通過(guò)它來(lái)進(jìn)行負(fù)載均衡。當(dāng)選項(xiàng)設(shè)定為 true,生成的查詢(xún)計(jì)劃會(huì)有兩個(gè) MR Job。第一個(gè) MR Job 中,Map 的輸出結(jié)果集合會(huì)隨機(jī)分布到 Reduce 中,每個(gè) Reduce 做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同的Key 有可能被分發(fā)到不同的 Reduce 中,從而達(dá)到負(fù)載均衡的目的;第二個(gè) MR Job 再根據(jù)預(yù)處理的數(shù)據(jù)結(jié)果按照Key 分布到 Reduce 中(這個(gè)過(guò)程是按照key的hash值進(jìn)行分區(qū)的,不同于mr job1的隨機(jī)分配,這次可以保證相同的Key 被分布到同一個(gè) Reduce 中),最后完成最終的聚合操作。所以它主要就是先通過(guò)第一個(gè)mr job將key隨機(jī)分配到reduce,使得會(huì)造成數(shù)據(jù)傾斜的key可能被分配到不同的reduce上,從而達(dá)到負(fù)載均衡的目的。到第二個(gè)mr job中,因?yàn)榈谝粋€(gè)mr job已經(jīng)在reduce中對(duì)這些數(shù)據(jù)進(jìn)行了部分聚合(就像單詞統(tǒng)計(jì)的例子,a這個(gè)字母在不同的reduce中,已經(jīng)算出它在每個(gè)reduce中的個(gè)數(shù),但是最終的總的個(gè)數(shù)還沒(méi)算出來(lái),那么就將它傳到第二個(gè)mr job,這樣就可以得到總的單詞個(gè)數(shù)),所以這里直接進(jìn)行最后的聚合就可以了。
3)hive.exec.reducers.bytes.per.reducer=1000000000 (單位是字節(jié))
每個(gè)reduce能夠處理的數(shù)據(jù)量大小,默認(rèn)是1G
4)hive.exec.reducers.max=999
最大可以開(kāi)啟的reduce個(gè)數(shù),默認(rèn)是999個(gè)
在只配了hive.exec.reducers.bytes.per.reducer以及hive.exec.reducers.max的情況下,實(shí)際的reduce個(gè)數(shù)會(huì)根據(jù)實(shí)際的數(shù)據(jù)總量/每個(gè)reduce處理的數(shù)據(jù)量來(lái)決定。
5)mapred.reduce.tasks=-1
實(shí)際運(yùn)行的reduce個(gè)數(shù),默認(rèn)是-1,可以認(rèn)為指定,但是如果認(rèn)為在此指定了,那么就不會(huì)通過(guò)實(shí)際的總數(shù)據(jù)量/hive.exec.reducers.bytes.per.reducer來(lái)決定reduce的個(gè)數(shù)了。
2 sql語(yǔ)句優(yōu)化
給幾個(gè)具體的場(chǎng)景以及在這些場(chǎng)景下的處理方案:
1)進(jìn)行表的join這種業(yè)務(wù)操作時(shí),經(jīng)常會(huì)產(chǎn)生數(shù)據(jù)傾斜。
原因就是這些業(yè)務(wù)數(shù)據(jù)本就存在key會(huì)分布不均勻的風(fēng)險(xiǎn),所以我們join時(shí)不能使用普通的join(reduce端join)或者可以使用普通join,但是是優(yōu)化后的。
不使用普通join的原因:數(shù)據(jù)要進(jìn)入reduce,肯定要先進(jìn)行分區(qū),而分區(qū)就是根據(jù)key的hash值來(lái)進(jìn)行的,既然數(shù)據(jù)的key本身就是不均勻的了(即某些key很集中,或者干脆就是有很多相同的key,比如key為無(wú)效值null),那這樣分區(qū)的結(jié)果就是這些集中的key會(huì)被分到同一個(gè)分區(qū)中,而這些key的數(shù)量本就大,所以會(huì)產(chǎn)生數(shù)據(jù)傾斜。
既然不使用普通join,那么我們可以使用map join,對(duì)于原本就有數(shù)據(jù)傾斜的風(fēng)險(xiǎn)的業(yè)務(wù)數(shù)據(jù),我們可以使用map join來(lái)避免數(shù)據(jù)傾斜這種風(fēng)險(xiǎn),原因:
上面就說(shuō)過(guò)了,普通join就是因?yàn)榉謪^(qū)這一階段導(dǎo)致較為集中的key會(huì)被分到同一個(gè)分區(qū),進(jìn)而進(jìn)入同一個(gè)reduce,這樣機(jī)會(huì)產(chǎn)生數(shù)據(jù)傾斜。但是呢,使用map join的話,直接在map端就完成表的join操作(表的join得到的結(jié)果(一張新表)就是我們這次的目標(biāo)了),進(jìn)入map端的數(shù)據(jù)都是經(jīng)過(guò)split(分片)得到的,沒(méi)有根據(jù)key分區(qū)這一操作,所以數(shù)據(jù)都是相對(duì)均勻地分布在每個(gè)map task中的,所以就不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但是這種操作有個(gè)前提條件就是僅適用于小表join大表,而小表怎么定義它的大小,多小的表才算小表,這里有個(gè)參數(shù)可以確定的(但是這個(gè)參數(shù)名我暫時(shí)忘記了),如果小表的數(shù)據(jù)大小小于這個(gè)值,就可以使用map join,而是在這種情況下是自動(dòng)使用map join這種方案的。所以如果是大小表join,直接用map join,避免數(shù)據(jù)傾斜。
但是如果都是大表呢,不滿足小表join大表,那就無(wú)法使用map join,那該怎么辦呢?別擔(dān)心,有辦法:
分情況討論:
1)業(yè)務(wù)數(shù)據(jù)有數(shù)據(jù)傾斜的風(fēng)險(xiǎn),但是這些導(dǎo)致數(shù)據(jù)傾斜風(fēng)險(xiǎn)的key一般都是無(wú)效的(比如日志中user_id的值很容易丟失而成為null),那對(duì)于這些null值得數(shù)據(jù),我們最終關(guān)聯(lián)得到的結(jié)果表中一定不會(huì)有這樣的記錄的,因?yàn)樗鼈兊膗ser_id為null,而在某些業(yè)務(wù)需求中我們就是要分析與user_id有關(guān)的行為,所以當(dāng)user_id為null時(shí),它對(duì)應(yīng)的這些數(shù)據(jù)都是沒(méi)有意義的,所以根本不會(huì)出現(xiàn)在結(jié)果表中。所以當(dāng)業(yè)務(wù)數(shù)據(jù)很大,但是數(shù)據(jù)中的大部分(一般都是80%)可能都是無(wú)效數(shù)據(jù),那么我們就可以在join時(shí)過(guò)濾(清洗)掉它們,沒(méi)有了這些無(wú)效數(shù)據(jù),自然就不存在這么大量集中的key了,數(shù)據(jù)傾斜的風(fēng)險(xiǎn)也就消失了。
比如將日志和用戶(hù)表進(jìn)行關(guān)聯(lián),關(guān)聯(lián)條件是user_id相同,按照上述說(shuō)法,那么為了避免數(shù)據(jù)傾斜風(fēng)險(xiǎn),我們應(yīng)該這樣做:
方法1:(普通join)
select * from log a join users b on (a.user_id is not null and a.user_id = b.user_id );
這是屬于表的內(nèi)連接的,兩張表不滿足條件的記錄都不保留。
方法2:檢測(cè)到user_id是null時(shí)給它賦予一個(gè)新值(這個(gè)新值由一個(gè)字符串(比如我自己給它定一個(gè) hive)加上一個(gè)隨機(jī)數(shù)組成),這樣就可以將原來(lái)集中的key分散開(kāi)來(lái),也避免了數(shù)據(jù)傾斜的風(fēng)險(xiǎn)。而且因?yàn)檫@些數(shù)據(jù)本來(lái)就是無(wú)效數(shù)據(jù),根本不會(huì)出現(xiàn)在結(jié)果表中,所以,這樣處理user_id(由一個(gè)字符串(比如我自己給它定一個(gè) hive)加上一個(gè)隨機(jī)數(shù)),它也無(wú)法關(guān)聯(lián)的,因?yàn)橛行У臄?shù)據(jù)的user_id沒(méi)有這種形式的,所以就算這些無(wú)效數(shù)據(jù)出現(xiàn)在不同的reduce中還是不會(huì)影響結(jié)果的,我這樣處理只是為了將它們分散開(kāi)而已,所以用這種方法處理,結(jié)果表中也不會(huì)出現(xiàn)null這些無(wú)效數(shù)據(jù),跟過(guò)濾處理方案得到的結(jié)果是一樣的。(普通join)
select *
from log a
join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
但是這兩種方案只是適用于大表join大表的內(nèi)連接,兩張表的無(wú)效數(shù)據(jù)都不保留。
但是如果對(duì)于左外連接或者右外連接這種情況,即使驅(qū)動(dòng)表中某些記錄在另一張表中沒(méi)有數(shù)據(jù)與它對(duì)應(yīng),但我們是依然需要保留驅(qū)動(dòng)表的這些數(shù)據(jù)的,那該怎么辦呢?其實(shí)很簡(jiǎn)單,只需要將上述方法得到的結(jié)果再與驅(qū)動(dòng)表的這些無(wú)數(shù)據(jù)取并集就可以了。
如下:
select * from log a
left outer join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
2)雖然都是大表,但是呢對(duì)于某些業(yè)務(wù)數(shù)據(jù)而言,其有用的部分只占它所在表的很少一部分,那么我們就可以將它們先取出來(lái),得到的結(jié)果應(yīng)該是一張小表,那么就可以使用map join來(lái)避免數(shù)據(jù)傾斜了。
不同數(shù)據(jù)類(lèi)型關(guān)聯(lián)產(chǎn)生數(shù)據(jù)傾斜(普通join)
場(chǎng)景:用戶(hù)表中user_id字段為int,log表中user_id字段既有string類(lèi)型也有int類(lèi)型。
當(dāng)按照user_id進(jìn)行兩個(gè)表的Join操作時(shí),因?yàn)槲覀冊(cè)谶B接時(shí)要進(jìn)行user_id的比較,所以需要user_id的類(lèi)型都相同,如果我們選擇將log表中的String類(lèi)型轉(zhuǎn)換為int類(lèi)型,那么就可能會(huì)出現(xiàn)這種情況:String類(lèi)型轉(zhuǎn)換為int類(lèi)型得到的都是null值(這就是類(lèi)型轉(zhuǎn)換的問(wèn)題了,String類(lèi)型數(shù)據(jù)轉(zhuǎn)換為int類(lèi)型會(huì)失敗,數(shù)據(jù)丟失,就會(huì)賦null值),如果所有的String類(lèi)型的user_id都變成了null,那么就又出現(xiàn)了集中的key,分區(qū)后就又會(huì)導(dǎo)致數(shù)據(jù)傾斜。所以我們進(jìn)行類(lèi)型轉(zhuǎn)換時(shí)不能選擇將String類(lèi)型轉(zhuǎn)換為int,而應(yīng)該將int類(lèi)型轉(zhuǎn)換為String,因?yàn)閕nt轉(zhuǎn)換為String不會(huì)出問(wèn)題,int類(lèi)型原來(lái)的值是什么,轉(zhuǎn)換為String后對(duì)應(yīng)的字符串就會(huì)是什么,形式?jīng)]變,只是類(lèi)型變了而已。
解決方法:把int類(lèi)型轉(zhuǎn)換成字符串類(lèi)型
select * from users a
join logs b
on (a.usr_id = cast(b.user_id as string));
數(shù)據(jù)本身有數(shù)據(jù)傾斜風(fēng)險(xiǎn),通過(guò)count(distinct xxx)這種操作方式就產(chǎn)生數(shù)據(jù)傾斜后果
比如有一份日志,要你從日志中統(tǒng)計(jì)某天有多少個(gè)用戶(hù)訪問(wèn)網(wǎng)站,即統(tǒng)計(jì)有多少個(gè)不同的user_id;但是呢這個(gè)網(wǎng)站卻又恰巧遭到攻擊,日志中大部分都是同一個(gè)user_id的記錄,其他的user_id屬于正常訪問(wèn),訪問(wèn)量不會(huì)很大,在這種情況下,當(dāng)你直接使用count(distinct user_id)時(shí),這也是要跑mr任務(wù)的啊,這時(shí)這些大量的相同的user_id就是集中的key了,結(jié)果就是通過(guò)分區(qū)它們都被分到一個(gè)reduce中,就會(huì)造成這個(gè)reduce處理的數(shù)據(jù)特別大,而其中的reduce處理的數(shù)據(jù)都很小,所以就會(huì)造成數(shù)據(jù)傾斜。
那么要怎么優(yōu)化呢?
方法1:可以先找出這個(gè)user_id是什么,過(guò)濾掉它,然后通過(guò)count(distinct user_id)計(jì)算出剩余的那些user_id的個(gè)數(shù),最后再加1(這1個(gè)就是那個(gè)被過(guò)濾掉的user_id,雖然它有大量的記錄,但是ser_id相同的都是同一個(gè)用戶(hù),而我們要計(jì)算的就是用戶(hù)數(shù))
sql語(yǔ)句展示:
分組求和后降序排序,就可以得到這個(gè)數(shù)據(jù)量最大的user_id是什么,然后我們下一步操作時(shí)就過(guò)濾它,等計(jì)算完其他的再加上它這一個(gè)。
select user_id,count(user_id) from log group by user_id desc limit 2;
select count(distinct user_id)+1 as sum from log;
sum就是最終的結(jié)果--用戶(hù)數(shù)
方法2:我們可以先通過(guò)group by分組,然后再在分組得到的結(jié)果的基礎(chǔ)之上進(jìn)行count
sql語(yǔ)句展示:
select count(*) from (select user_id from log group by user_id) new_log;
總的來(lái)說(shuō)就是,數(shù)據(jù)傾斜的根源是key分布不均勻,所以應(yīng)對(duì)方案要么是從源頭解決(不讓數(shù)據(jù)分區(qū),直接在map端搞定),要么就是在分區(qū)時(shí)將這些集中卻無(wú)效的key過(guò)濾(清洗)掉,或者是想辦法將這些key打亂,讓它們進(jìn)入到不同的reduce中。
性能調(diào)優(yōu)是指通過(guò)調(diào)整使得機(jī)器處理任務(wù)的速度更快,所花的時(shí)間更少,而數(shù)據(jù)傾斜的處理是hive性能調(diào)優(yōu)的一部分,通過(guò)處理能夠大大減少任務(wù)的運(yùn)行時(shí)間。
除了數(shù)據(jù)傾斜的處理之外,hive的優(yōu)化還有其他方面的,例如where子句優(yōu)化:
select * from a left outer join b on (a.key=b.key) where a.date='2017-07-11' and b.date='2017-07-11';
這是一個(gè)左外連接。
這個(gè)sql語(yǔ)句執(zhí)行的結(jié)果是:得到的結(jié)果是表a與表b的連接表,且表中的記錄的date都是'2017-07-11'。
而這個(gè)sql語(yǔ)句的執(zhí)行過(guò)程是:逐條獲取到a表的記錄,然后掃描b表,尋找字段key值為a.key的記錄,找到后將b表的這條記錄連接到a表上,然后判斷連接后的這條記錄是否滿足條件a.date='2017-07-11' and b.date='2017-07-11',如果滿足,則顯示,否則,丟棄。
因?yàn)檫@是一個(gè)左外連接,且a為驅(qū)動(dòng)表,連接時(shí)在a中發(fā)現(xiàn)key而在b中沒(méi)有發(fā)現(xiàn)與之相等的key時(shí),b中的列將置為null,包括列date,一個(gè)不為null,一個(gè)為null,這樣后邊的where就沒(méi)有用了。
簡(jiǎn)答的說(shuō)這個(gè)方案的做法就是先按連接條件進(jìn)行連接,連接后再看where條件,如果不滿足就丟棄,那之前連接所做的那些功夫就浪費(fèi)了,白白耗費(fèi)了資源(cpu等),增加了運(yùn)行的總時(shí)間,如果有一種方案可以在未進(jìn)行連接之前就直接判斷出不滿足最終的條件,那么就可以直接丟棄它,這樣對(duì)于這樣的記錄就不要浪費(fèi)資源以及時(shí)間去連接了,這樣也是能提升性能的,下面就看看這種方案:
sql語(yǔ)句:
將剛才的where限制條件直接放到on里面,那么就變成了滿足這三個(gè)條件才會(huì)進(jìn)行連接,不滿足的直接過(guò)濾掉,就像上面所說(shuō)的,少了無(wú)效連接那一步,就相對(duì)地節(jié)約了時(shí)間,如果這樣的無(wú)效連接的記錄很多的話,那么采用這種改進(jìn)版的方案無(wú)疑能夠較大程度地提高性能。
select * from a left outer join b on (a.key=b.key and a.date='2017-07-11' and b.date='2017-07-11');
不管怎么說(shuō),我們?cè)谶\(yùn)行任務(wù)時(shí),總是希望能加快運(yùn)行速度,縮短運(yùn)行時(shí)間,更快地得到結(jié)果,即提升性能,這是我們的目的,這就是我們所謂的性能調(diào)優(yōu)。
關(guān)于小表join大表的補(bǔ)充:
表join時(shí)的操作是這樣的:
當(dāng)操作到驅(qū)動(dòng)表的某條記錄時(shí),就會(huì)全局掃描另一張表,尋找滿足條件的記錄,而當(dāng)掃描它時(shí),為了讀取速度更快,一般都選先將它加載到內(nèi)存,而內(nèi)存的大小是有限的,為了不占據(jù)過(guò)多的內(nèi)存或者避免內(nèi)存溢出,加載進(jìn)入內(nèi)存的表一般是小表,即數(shù)據(jù)量比較小,map join就是這樣做的。
即驅(qū)動(dòng)表不放進(jìn)內(nèi)存,而另一張表(即要連接到驅(qū)動(dòng)表的那一張表)就要先加載進(jìn)內(nèi)存,為了掃描速度更快,提高性能。
比如select * from a left outer join b on (a.key=b.key);
左外連接,驅(qū)動(dòng)表是a,表b的記錄是要被連接到表a的,所以每在a上連接一條記錄就要被全局掃描一次的表是b,所以表b應(yīng)先加載到內(nèi)存(前提是表b是小表,如果是大表的話,估計(jì)會(huì)產(chǎn)生oom異常--out of memory內(nèi)存溢出異常)。
select * from aa right outer join bb on (a.key=b.key);
右外連接,驅(qū)動(dòng)表是bb,aa應(yīng)先加載到內(nèi)存(前提是小表)。
ps:希望我的分享能幫助到有需要的伙伴哦。我不是大神的哦,如果文中有誤,還請(qǐng)大家不吝賜教,幫忙指正,謝謝了!??!