掌握spark 3.0中的查詢計(jì)劃

本文翻譯自Mastering Query Plans in Spark 3.0,能夠很好的幫助學(xué)習(xí)spark sql理解spark UI的計(jì)劃,決定翻譯記錄一下。
在Spark SQL中查詢計(jì)劃是理解查詢執(zhí)行的入口點(diǎn),它攜帶了大量的信息,并且能夠洞察查詢是怎么執(zhí)行的。在大的負(fù)載下或者執(zhí)行的任務(wù)很長的時(shí)候,這些信息很重要的。從查詢計(jì)劃的信息我們可以發(fā)現(xiàn)哪些是低效的并且能夠重寫查詢?nèi)ヌ峁└玫男阅堋?br> 對于不熟悉查詢計(jì)劃的人來說,乍一看,這些信息有點(diǎn)難懂。它是樹形結(jié)構(gòu),并且每個(gè)節(jié)點(diǎn)代表了一種操作,每種操作上提供了執(zhí)行的基本信息。spark官方文檔上涉及到查詢計(jì)劃的信息是比較少的。這邊文章的動(dòng)機(jī)就是讓我們熟悉物理計(jì)劃,我們接下來將會(huì)來看一下常用到的操作以及它們提供的信息以及他們是怎么執(zhí)行的。
這邊文章所涉及到的理論大部分是基于對源碼的研究和運(yùn)行優(yōu)化spark查詢計(jì)劃的實(shí)踐。

基本例子

我們考慮一個(gè)簡單的例子,一個(gè)查詢中涉及到filter以及aggregation,join操作的語句:

# in PySpark API:
query  = (
    questionsDF
    .filter(col('year') == 2019)
    .groupBy('user_id')
    .agg(
        count('*').alias('cnt')
    )
    .join(usersDF, 'user_id')
)

我們把例子中的usersDF是一組問問題的用戶,這些問題用questionsDF來表示。這些問題用year的這一列來進(jìn)行分區(qū),代表著哪一年問的問題。在這個(gè)查詢里,我們對2019年問問題的用戶感興趣,并且想知道每個(gè)人問了多少問題,而且我們想知道在輸出中我們想知道一些額外信息,這就是為什么我們在聚合之后進(jìn)行了usersDF的join操作。
這里有兩種基本的方式去查看物理計(jì)劃。第一種是在DataFrame上調(diào)用explain函數(shù),該函數(shù)展現(xiàn)這個(gè)計(jì)劃的文本化的展示:

這在spark 3.0有了一些優(yōu)化,explain函數(shù)帶有了一個(gè)新參數(shù) mode,這個(gè)參數(shù)的值可以是:formatted,cost,codegen。使用formatted模式將會(huì)把查詢計(jì)劃轉(zhuǎn)化為更加有組織的輸出(這里之展現(xiàn)了一部分):


在formatted計(jì)劃中,我們能看到裸數(shù),改裸數(shù)只是展現(xiàn)了操作的名字并帶有一個(gè)括號(hào)的數(shù)字。在數(shù)的下面,這里有一些數(shù)字對應(yīng)的細(xì)節(jié)描述。cost模式將會(huì)展示除了物理計(jì)劃之外的優(yōu)化的邏輯計(jì)劃,這些邏輯計(jì)劃帶有每個(gè)操作的統(tǒng)計(jì)信息,所以我們能看到在不同執(zhí)行階段的數(shù)據(jù)大小。最終codegen模式展現(xiàn)了將會(huì)執(zhí)行的生成的java代碼。
第二種方式是查看spark ui中的sql tab,這里有正在跑的和已經(jīng)完成了的查詢。通過點(diǎn)擊你要查看的查詢,我們可以看到物理計(jì)劃的文本表示。在下面這個(gè)圖片中,我們結(jié)合圖形表示,文本表示以及它們之間的對應(yīng)關(guān)系:

不同點(diǎn)是圖形表示的葉子節(jié)點(diǎn)在上面,根節(jié)點(diǎn)在下面,而文本表示的是反過來的。

CollapseCodegenStages

在物理計(jì)劃的圖形表示中,你能看到一些操作被組織成了一大塊藍(lán)色的矩形。這些大矩形對應(yīng)著codegen階段。這是發(fā)生在物理計(jì)劃的優(yōu)化階段。這個(gè)是叫做CollapseCodegenStages來負(fù)責(zé)優(yōu)化的,原理是把支持代碼生成的操作聚合到一起,通過消除虛擬函數(shù)的調(diào)用來加速。但是并不是所有的操作支持代碼生成。所以一些操作(如exchange操作)并不是大矩形的一部分。在我們的例子中,這里有三個(gè)codegen stages,對應(yīng)著三個(gè)大矩形,你能在操作的括號(hào)中看到codegen stage的id。從這個(gè)樹我們也可以分辨出一個(gè)操作是夠支持代碼生成,因?yàn)榧尤胫С执a生成的話,這里將會(huì)在對應(yīng)的操作的括號(hào)里有個(gè)星號(hào)。


我們簡單的分析一下在我們查詢中的每一個(gè)操作。

Scan parquet

scan parquet操作代表著從parquet文件中讀取數(shù)據(jù)。從明細(xì)信息中,我們能直接看到從這個(gè)數(shù)據(jù)源中我們選擇了哪些列。雖然我們沒指定具體的字段,但是這里也會(huì)應(yīng)用ColumnPruning規(guī)則,這個(gè)規(guī)則會(huì)確保只有真正字段才會(huì)從這個(gè)數(shù)據(jù)源中提取出來。我們也能看到有兩種filters:PartitionFilters和PushFilters。PartitionFilters應(yīng)用在數(shù)據(jù)源分區(qū)的字段上。這是非常重要的因?yàn)槲覀兡芴^我們不需要的數(shù)據(jù)。檢查對應(yīng)的filters是否傳播到正確的位置總是沒錯(cuò)的。這是因?yàn)槲覀儽M可能讀取少量的數(shù)據(jù),因?yàn)镮O是比較費(fèi)時(shí)的。在spark 2.4,這里還有一個(gè)代表實(shí)際讀取到的分區(qū)的partitionCount字段,這個(gè)字段在spark 3.0已經(jīng)去掉了。
PushFilters把字段直接下推到parquet文件中去,假如parquet文件過濾的列是按照過濾字段排序的話,這個(gè)規(guī)則就很有用了,因?yàn)檫@種情況下,我們能利用parquet內(nèi)部結(jié)構(gòu)去過濾數(shù)據(jù)。parquet文件是按照行組和每個(gè)行組的元數(shù)據(jù)文件組成的。這個(gè)元數(shù)據(jù)包含了每個(gè)行組的最大最小值,基于這個(gè)信息,我們就能判斷是否讀取這個(gè)行組。

Filter

Filter操作佷容易理解。它僅僅是代表過濾條件。但是這個(gè)操作怎么創(chuàng)建的并不是很明顯,因?yàn)樵诓樵冎兴⒉皇侵苯訉?yīng)著過濾條件。因?yàn)樗械膄ilters首先被Catalyst optimzer處理,改規(guī)則可能修改或者重新移動(dòng)她們。這里有好幾個(gè)規(guī)則在她們轉(zhuǎn)換為物理計(jì)劃前的邏輯計(jì)劃。我們列舉了一下:

  • PushDownPredicates-這個(gè)規(guī)則通過其他的操作把filter下推到離數(shù)據(jù)源更近的地方,但不是所有的操作都支持。比如,如果表達(dá)式不是確定性的,這就不行,假如我們使用類似first,last,collect_set,collect_list,rand等,filters操作就不能通過這些操作而進(jìn)行下推,因?yàn)檫@些函數(shù)是不確定性的。
  • CombineFilters-結(jié)合兩個(gè)臨近的操作合成一個(gè)(收集兩個(gè)filters條件合成一個(gè)更為復(fù)雜的的條件)
  • InferFiltersFromConstraints-這個(gè)規(guī)則實(shí)際上會(huì)創(chuàng)建新的filter操作,如從join操作(從inner join中創(chuàng)建一個(gè)joining key is not null)
  • PruneFilters-移除多余的filters(比如一個(gè)filters總是true)

Exchange

Exchange操作代表著shuffle操作,意味著物理數(shù)據(jù)的集群范圍內(nèi)的移動(dòng)。這個(gè)操作是很費(fèi)時(shí)的,因?yàn)樗鼤?huì)通過網(wǎng)絡(luò)移動(dòng)數(shù)據(jù)。查詢計(jì)劃的信息也包含了一些數(shù)據(jù)重新分區(qū)的細(xì)節(jié)。在我們的例子中,是hashPartitioning(user_id,200):



這意味著數(shù)據(jù)將會(huì)根據(jù)user_id列重新分區(qū)為200個(gè)分區(qū),有著同樣user_id的行將會(huì)屬于同一個(gè)分區(qū),將會(huì)分配到同一個(gè)executor上。為了確保只有200分區(qū),spark將會(huì)計(jì)算user_id的hashcode并且對200取模。這個(gè)結(jié)果就是不同的user_ids就會(huì)分到同一個(gè)分區(qū)。同時(shí)有些分區(qū)可能是空的。這里也有其他類型的分區(qū)值的去留意一下:

  • RoundRobinPartitioning-數(shù)據(jù)將會(huì)隨機(jī)分配到n個(gè)分區(qū)中,n在函數(shù)repartition(n)中指定
  • SinglePartition-所有數(shù)據(jù)將會(huì)分配到一個(gè)分區(qū)中,進(jìn)而到一個(gè)executor中。
  • RangePartitioning-這個(gè)用在對數(shù)據(jù)排序中,用在orderBy或者sort操作中

HashAggregate

這個(gè)代表著數(shù)據(jù)聚合,這個(gè)經(jīng)常是兩個(gè)操作,要么被Exchange分開或者不分開:



為什么這里有兩個(gè)HashAggregate操作的原因是第一個(gè)是部分聚合,它在每個(gè)executor上每個(gè)分區(qū)分別進(jìn)行聚合。在我們的例子中,你能看到partial_count(1)的function字段,最終的部分聚合結(jié)果就是第二個(gè)聚合。這個(gè)操作也展示了數(shù)據(jù)按照哪個(gè)分組的Keys字段。results字段展示了在聚合以后的可用的列。

BroadcastHashJoin & BroadcastExchange

BroadcastHashJoin(BHJ)代表著join算法的操作,除了這個(gè),還有SortMergeJoin和ShuffleHashJoin。BHJ總是伴隨著BroadcastExchange,這個(gè)代表著廣播shuffle-數(shù)據(jù)將會(huì)收集到driver端并且會(huì)被傳播到需要的executor上。

ColumnarToRow

這是在spark 3.0引入的新操作,用于列行之間的轉(zhuǎn)換

總結(jié)

在spark sql中的物理計(jì)劃由攜帶了有用信息的操作組成,正確理解每個(gè)操作能夠更好的洞察執(zhí)行,并且通過分析計(jì)劃,我們可以分析是夠是最優(yōu)的,必要的時(shí)候可以進(jìn)行優(yōu)化。
在這篇文章里,我們描述了在物理計(jì)劃中經(jīng)常用到的一組操作,雖然不是全部但是我們盡量去覆蓋經(jīng)常使用到的操作。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容