當(dāng)我看到Spark3.0版本對(duì)于SparkSQL性能優(yōu)化之后,不由自主的選擇去使用SparkSQL,在此分享下SparkSQL3.0新功能。對(duì)于SparkSQL提供的兩種開發(fā)方式:DSL/SQL,我更喜歡SQL方式,SQL方式不僅開發(fā)效率高,而且DSL實(shí)現(xiàn)特別復(fù)雜的功能,個(gè)人感覺不如使用SparkCore借助靈活算子實(shí)現(xiàn)。
AQE(Adaptive Query Execution)
AQE是Spark SQL中的一種優(yōu)化技術(shù),它利用運(yùn)行時(shí)統(tǒng)計(jì)信息來選擇最有效的查詢執(zhí)行計(jì)劃。AQE默認(rèn)情況下是禁用的。Spark SQL可以使用spark.sql.adaptive.enabled的傘形配置來控制是否打開/關(guān)閉它。 從Spark 3.0開始,AQE具有三個(gè)主要功能,包括合并后shuffle分區(qū),sort-merge join轉(zhuǎn)broadcast join以及傾斜優(yōu)化。
在此有個(gè)概念CBO(cost-based optimization)必須先熟悉,CBO框架翻譯就是基于成本的優(yōu)化,也是在Spark3.0版本才引入的,該框架收集并利用各種數(shù)據(jù)統(tǒng)計(jì)信息(如行數(shù),不同值的數(shù)量,NULL 值,最大/最小值等)來幫助 Spark 選擇更好的計(jì)劃。這些基于成本的優(yōu)化技術(shù)很好的例子就是選擇正確的 Join 類型(broadcast hash join vs. sort merge join),在 hash join 的時(shí)候選擇正確的連接順序,或在多個(gè) join 中調(diào)整 join 順序。然而,過時(shí)的統(tǒng)計(jì)信息和不完善的基數(shù)估計(jì)可能導(dǎo)致次優(yōu)查詢計(jì)劃。
1,Coalescing Post Shuffle Partitions
當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled配置都是true的時(shí)候,這個(gè)特性會(huì)基于map輸出統(tǒng)計(jì)的數(shù)據(jù)合并后shuffle分區(qū)數(shù)。這個(gè)特性簡化了運(yùn)行查詢時(shí)對(duì)shuffle分區(qū)數(shù)的調(diào)優(yōu),你不需要設(shè)置適當(dāng)?shù)姆謪^(qū)數(shù)來適配自己的數(shù)據(jù)集。一旦您通過spark.sql.adaptive.coalescePartitions.initialPartitionNum設(shè)置了足夠大的初始shuffle分區(qū)數(shù),Spark就可以在運(yùn)行時(shí)選擇合適的shuffle分區(qū)數(shù)。
當(dāng)在 Spark 中運(yùn)行sql來處理非常大的數(shù)據(jù)時(shí),shuffle 通常對(duì)查詢性能有非常重要的影響。Shuffle 是一個(gè)昂貴的操作符,因?yàn)樗枰诰W(wǎng)絡(luò)中移動(dòng)數(shù)據(jù),因此數(shù)據(jù)是按照下游操作符所要求的方式重新分布的。
shuffle 的一個(gè)關(guān)鍵屬性是分區(qū)的數(shù)量。分區(qū)的最佳數(shù)量取決于數(shù)據(jù),但是數(shù)據(jù)大小可能在不同的階段、不同的查詢之間有很大的差異,這使得這個(gè)數(shù)字很難調(diào)優(yōu):如果分區(qū)數(shù)太少,那么每個(gè)分區(qū)處理的數(shù)據(jù)大小可能非常大,處理這些大分區(qū)的任務(wù)可能需要將數(shù)據(jù)溢寫到磁盤(例如,涉及排序或聚合),從而減慢查詢速度;如果分區(qū)數(shù)太多,那么每個(gè)分區(qū)處理的數(shù)據(jù)大小可能非常小,并且將有大量的網(wǎng)絡(luò)數(shù)據(jù)獲取來讀取 shuffle 塊,這也會(huì)由于低效的 I/O 模式而減慢查詢速度。擁有大量的任務(wù)也會(huì)給 Spark 任務(wù)調(diào)度程序帶來更多的負(fù)擔(dān)。
要解決這個(gè)問題,我們可以在開始時(shí)設(shè)置相對(duì)較多的 shuffle 分區(qū)數(shù),然后在運(yùn)行時(shí)通過查看 shuffle 文件統(tǒng)計(jì)信息將相鄰的小分區(qū)合并為較大的分區(qū)。此種思想就是所謂的Coalescing Post Shuffle Partitions。
2.Converting sort-merge join to broadcast join
當(dāng)運(yùn)行時(shí)統(tǒng)計(jì)的join操作任何一端的數(shù)據(jù)小于broadcast hash join閾值(spark.sql.autoBroadcastJoinThreshold默認(rèn)10MB)時(shí),AQE將sort-merge join轉(zhuǎn)換為broadcast hash join。這不如首先規(guī)劃一個(gè)broadcast hash join高效,但這比繼續(xù)sort-merge join要好,這樣我們可以節(jié)省join雙方的排序,并在本地讀取shuffle文件節(jié)省網(wǎng)絡(luò)流量(如果spark.sql.adaptive.localShuffleReader=true)。
3.Optimizing Skew Join
數(shù)據(jù)傾斜會(huì)嚴(yán)重降低連接查詢的性能。該特性通過將傾斜任務(wù)拆分(并在需要時(shí)復(fù)制)為大小大致相同的任務(wù)來動(dòng)態(tài)處理sort-merge jion的傾斜。當(dāng)spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled配置均為true的情況下,動(dòng)態(tài)優(yōu)化數(shù)據(jù)傾斜的join才會(huì)生效。