Spark難點(diǎn)解析:Join實(shí)現(xiàn)原理

Join背景介紹


SQL的所有操作,可以分為簡(jiǎn)單操作(如過(guò)濾where、限制次數(shù)limit等)和聚合操作(groupBy,join等)。

其中,join操作是最復(fù)雜、代價(jià)最大的操作類型,是大部分業(yè)務(wù)場(chǎng)景的性能瓶頸所在;所以,今天我們基于SparkSQL,來(lái)簡(jiǎn)要的聊一下SparkSQL所支持的幾種常見(jiàn)的Join算法以及其適用場(chǎng)景。

首先,我們需要知道數(shù)倉(cāng)中表格的分類:按照是否會(huì)經(jīng)常涉及到Join操作,可以簡(jiǎn)單分為低層次表和高層次表

低層次表:直接導(dǎo)入數(shù)倉(cāng)的表,列數(shù)少,與其他表存在外鍵依賴,查詢起來(lái)經(jīng)常會(huì)用到大量Join算法,查詢效率較低

高層次表:由低層次表加工而來(lái),使用SQL將需要join的表預(yù)先合并,形成“寬表”。寬表上查詢不需要大量Join,因此效率較高。但是,相對(duì)的是,寬表的數(shù)據(jù)存在大量冗余,同時(shí)生成滯后,查詢不及時(shí)。

Join使用的結(jié)論


Join常見(jiàn)分類&實(shí)現(xiàn)機(jī)制


當(dāng)前SparkSQL支持三種Join算法-shuffle hash join、broadcast hash join以及sort merge join。其中前兩者歸根到底都屬于hash join,只不過(guò)在hash join之前需要先shuffle還是先broadcast。所以,首先我們來(lái)看一下內(nèi)核hash join的機(jī)制。

Hash Join

先來(lái)看一個(gè)簡(jiǎn)單的SQL:select * from order,item where?item.id?= order.id

參與join的兩張表是item和order,join key分別是item.id以及order.id,假設(shè)這個(gè)Join采用的是hash join算法,整個(gè)過(guò)程會(huì)經(jīng)歷三步:

1. 確定Build Table(映射表、小表)以及Probe Table(探查表、大表)。其中Build Table用于構(gòu)建Hash Table,而Probe會(huì)遍歷自身所有key,映射到所生成的Hash Table上去匹配。

2. Build Table構(gòu)建Hash Table。依次讀取Build Table(item)的數(shù)據(jù),對(duì)于每一行數(shù)據(jù)根據(jù)join key(item.id)進(jìn)行hash,hash到對(duì)應(yīng)的Bucket,生成hash table中的一條記錄。數(shù)據(jù)緩存在內(nèi)存中,如果內(nèi)存放不下需要dump到外存。

3. Probe Table探測(cè)。依次掃描Probe Table(order)的數(shù)據(jù),使用相同的hash函數(shù)映射Hash Table中的記錄,映射成功之后再檢查join條件(item.id= order.i_id),如果匹配成功就可以將兩者join在一起。

Hash Join原理圖

兩點(diǎn)補(bǔ)充:

1 hash join的性能。從上面的原理圖可以看出,hash join對(duì)兩張表基本只掃描一次,算法效率是o(a+b),比起蠻力的笛卡爾積算法的a*b快了很多數(shù)量級(jí)。

2 為什么說(shuō)Build Table要盡量選擇小表呢?從原理上也看到了,構(gòu)建的Hash Table是需要被頻繁訪問(wèn)的,所以Hash Table最好能全部加載到內(nèi)存里,這也決定了hash join只適合至少一個(gè)小表join的場(chǎng)景。

看完了hash join的內(nèi)核,我們來(lái)看一下這種單機(jī)的算法,在大數(shù)據(jù)分布式情況下,應(yīng)該如何去做。目前成熟的有兩套算法:broadcast hash join和shuffler hash join。

Broadcast Hash Join

broadcast hash join是將其中一張小表廣播分發(fā)到另一張大表所在的分區(qū)節(jié)點(diǎn)上,分別并發(fā)地與其上的分區(qū)記錄進(jìn)行hash join。broadcast適用于小表很小,可以直接廣播的場(chǎng)景。

在執(zhí)行上,主要可以分為以下兩步:

1. broadcast階段:將小表廣播分發(fā)到大表所在的所有主機(jī)。分發(fā)方式可以有driver分發(fā),或者采用p2p方式。

2. hash join階段:在每個(gè)executor上執(zhí)行單機(jī)版hash join,小表映射,大表試探;

需要注意的是,Spark中對(duì)于可以廣播的小表,默認(rèn)限制是10M以下。(參數(shù)是spark.sql.autoBroadcastJoinThreshold

Broadcast Hash Join示意圖

Shuffle Hash Join

當(dāng)join的一張表很小的時(shí)候,使用broadcast hash join,無(wú)疑效率最高。但是隨著小表逐漸變大,廣播所需內(nèi)存、帶寬等資源必然就會(huì)太大,所以才會(huì)有默認(rèn)10M的資源限制。

所以,當(dāng)小表逐漸變大時(shí),就需要采用另一種Hash Join來(lái)處理:Shuffle Hash Join。

Shuffle Hash Join按照join key進(jìn)行分區(qū),根據(jù)key相同必然分區(qū)相同的原理,將大表join分而治之,劃分為小表的join,充分利用集群資源并行化執(zhí)行。

在執(zhí)行上,主要可以分為以下兩步:

1. shuffle階段:分別將兩個(gè)表按照join key進(jìn)行分區(qū),將相同join key的記錄重分布到同一節(jié)點(diǎn),兩張表的數(shù)據(jù)會(huì)被重分布到集群中所有節(jié)點(diǎn)。

2. hash join階段:每個(gè)分區(qū)節(jié)點(diǎn)上的數(shù)據(jù)單獨(dú)執(zhí)行單機(jī)hash join算法。

Shuffle Hash Join示意圖

剛才也說(shuō)過(guò),Hash Join適合至少有一個(gè)小表的情況,那如果兩個(gè)大表需要Join呢?這時(shí)候就需要Sort-Merge Join了。

Sort-Merge Join

SparkSQL對(duì)兩張大表join采用了全新的算法-sort-merge join,整個(gè)過(guò)程分為三個(gè)步驟:

1. shuffle階段:將兩張大表根據(jù)join key進(jìn)行重新分區(qū),兩張表數(shù)據(jù)會(huì)分布到整個(gè)集群,以便分布式并行處理

2. sort階段:對(duì)單個(gè)分區(qū)節(jié)點(diǎn)的兩表數(shù)據(jù),分別進(jìn)行排序

3. merge階段:對(duì)排好序的兩張分區(qū)表數(shù)據(jù)執(zhí)行join操作。join操作很簡(jiǎn)單,分別遍歷兩個(gè)有序序列,碰到相同join key就merge輸出,否則繼續(xù)取更小一邊的key。

Sort-Merge Join示意圖

仔細(xì)分析的話會(huì)發(fā)現(xiàn),sort-merge join的代價(jià)并不比shuffle hash join小,反而是多了很多。那為什么SparkSQL還會(huì)在兩張大表的場(chǎng)景下選擇使用sort-merge join算法呢?

這和Spark的shuffle實(shí)現(xiàn)有關(guān),目前spark的shuffle實(shí)現(xiàn)都適用sort-based shuffle算法,因此在經(jīng)過(guò)shuffle之后partition數(shù)據(jù)都是按照key排序的。因此理論上可以認(rèn)為數(shù)據(jù)經(jīng)過(guò)shuffle之后是不需要sort的,可以直接merge。

結(jié)論:如何優(yōu)化


經(jīng)過(guò)上文的分析,可以明確每種Join算法都有自己的適用場(chǎng)景。在優(yōu)化的時(shí)候,除了要根據(jù)業(yè)務(wù)場(chǎng)景選擇合適的join算法之外,還要注意以下幾點(diǎn):

1 數(shù)據(jù)倉(cāng)庫(kù)設(shè)計(jì)時(shí)最好避免大表與大表的join查詢。

2 SparkSQL也可以根據(jù)內(nèi)存資源、帶寬資源適量將參數(shù)spark.sql.autoBroadcastJoinThreshold調(diào)大,讓更多join實(shí)際執(zhí)行為broadcast hash join。


文集

Spark:理論與實(shí)踐

文章

五分鐘大數(shù)據(jù):Spark入門

Spark編程快速入門

Spark難點(diǎn)解析:Join實(shí)現(xiàn)原理

可視化發(fā)現(xiàn)Spark數(shù)據(jù)傾斜


參考鏈接:

SparkSQL – 有必要坐下來(lái)聊聊Join:http://hbasefly.com/2017/03/19/sparksql-basic-join/

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容