0303 Performance Tuning

轉(zhuǎn)載請(qǐng)注明出處,謝謝合作~

該篇中的示例暫時(shí)只有 Scala 版本~

性能調(diào)優(yōu)

對(duì)于某些工作場(chǎng)景,可以通過(guò)在內(nèi)存中緩存數(shù)據(jù)或者開(kāi)啟一些試驗(yàn)功能來(lái)提升性能。

在內(nèi)存中緩存數(shù)據(jù)

Spark SQL 能做以一種內(nèi)存中的列式存儲(chǔ)的格式緩存數(shù)據(jù),可以通過(guò)調(diào)用 spark.catalog.cacheTable("tableName") 或者 dataFrame.cache() 方法實(shí)現(xiàn)。之后 Spark SQL 只需要掃描用到的列中的數(shù)據(jù),而且能夠自動(dòng)使用壓縮來(lái)節(jié)省內(nèi)存空間,緩解 GC 壓力??梢酝ㄟ^(guò)調(diào)用 spark.catalog.uncacheTable("tableName") 來(lái)刪除緩存。

內(nèi)存緩存的配置項(xiàng)可以通過(guò) SparkSession 對(duì)象的 setConf 方法或者執(zhí)行 SQL 中的 SET key=value 語(yǔ)句來(lái)設(shè)置。

Property Name Default Meaning Since Version
spark.sql.inMemoryColumnarStorage.compressed true 當(dāng)設(shè)置為 true 時(shí),Spark SQL 會(huì)根據(jù)數(shù)據(jù)統(tǒng)計(jì)信息為每一列自動(dòng)應(yīng)用一種壓縮格式。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式存儲(chǔ)中批次的大小。較大的批次可以提升內(nèi)存利用率和壓縮效率,但是會(huì)提高 OOM 的風(fēng)險(xiǎn)。 1.1.1

其他配置項(xiàng)

下列選項(xiàng)也可以用來(lái)對(duì)查詢的性能進(jìn)行調(diào)優(yōu)。由于越來(lái)越多的優(yōu)化都會(huì)自動(dòng)進(jìn)行,這些選項(xiàng)在未來(lái)的版本中可能被被廢棄。

Property Name Default Meaning Since Version
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 在讀取文件數(shù)據(jù)時(shí)單分區(qū)的最大字節(jié)數(shù)。該配置只在使用文件類型的數(shù)據(jù)源(比如 Parquet, JSON 和 ORC)時(shí)有效。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) 讀取一個(gè)文件時(shí)的預(yù)計(jì)開(kāi)銷,由可以被同時(shí)掃描的字節(jié)數(shù)控制。該參數(shù)用在將多個(gè)文件歸入一個(gè)分區(qū)時(shí)起作用。被高估會(huì)有積極的效果,此時(shí)包含小文件的分區(qū)將會(huì)比包含大文件的分區(qū)(優(yōu)先調(diào)度)要快。該配置只在使用文件類型的數(shù)據(jù)源(比如 Parquet, JSON 和 ORC)時(shí)有效。 2.0.0
spark.sql.broadcastTimeout 300 廣播連接(Broadcast Join)中超時(shí)等待時(shí)間,單位為秒。 1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 在連接操作中,如果一張表的大小低于該配置,它會(huì)被廣播到其他的節(jié)點(diǎn)。設(shè)置為 -1 表示禁用廣播連接。注意,目前只能夠通過(guò) ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令獲得 Hive 表的統(tǒng)計(jì)信息。 1.1.0
spark.sql.shuffle.partitions 200 連接操作或者聚合操作引起的 shuffle 行為的分區(qū)數(shù)。 1.1.0

SQL 查詢連接策略提示

目前的連接策略提示有 BROADCAST,MERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL,在一張關(guān)系表和另一張關(guān)系表做連接操作時(shí),Spark 會(huì)根據(jù)連接策略提示采用不同的連接策略。例如,當(dāng)對(duì)表「t1」使用 BROADCAST 連接策略提示時(shí),Spark 會(huì)優(yōu)先采用廣播連接(根據(jù)是否含有 equi-join 鍵來(lái)決定采用 broadcast hash join 或者 broadcast nested loop join),即使的統(tǒng)計(jì)信息表明其大小超出了上述配置 spark.sql.autoBroadcastJoinThreshold 的限值。

如果連接操作的調(diào)用者和被調(diào)用者采用了不同的連接策略提示,優(yōu)先級(jí)從高到低為:BROADCASTMERGE、SHUFFLE_HASH、SHUFFLE_REPLICATE_NL。如果連接操作的調(diào)用者和被調(diào)用者都采用了 BROADCAST 策略提示或者 SHUFFLE_HASH 策略提示,Spark 會(huì)根據(jù)連接類型和表數(shù)據(jù)量大小做出選擇。

注意,由于指定的連接策略提示可能并不支持所有的連接類型,所以并不能保證 Spark 所采取的連接策略一定是被指定的那個(gè)。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()

詳情參見(jiàn)文檔 Join Hints。

SQL 查詢合并提示

合并提示可以讓 Spark SQL 用戶控制一些算子的輸出文件數(shù)量,這些算子包括 coalesce,repartitionrepartitionByRange。合并指引可以用來(lái)調(diào)優(yōu)性能,減少輸出文件的數(shù)量?!窩OALESCE」指引只有一個(gè)分區(qū)數(shù)量作為參數(shù)?!窻EPARTITION」的參數(shù)可以是分區(qū)數(shù)或者列名或者兩者一起?!窻EPARTITION_BY_RANGE」提示必須有一個(gè)列名參數(shù),分區(qū)數(shù)是可選的參數(shù)。

SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t

詳情參見(jiàn)文檔 Partitioning Hints。

自適應(yīng)查詢引擎

自適應(yīng)查詢引擎(AQE)是 Spark SQL 當(dāng)中的一項(xiàng)優(yōu)化技術(shù),能夠充分利用運(yùn)行時(shí)統(tǒng)計(jì)信息來(lái)選取最高效的查詢計(jì)劃。AQE 默認(rèn)是關(guān)閉的,Spark SQL 可以使用保護(hù)傘配置參數(shù) spark.sql.adaptive.enabled 來(lái)控制是否開(kāi)啟。Spark 3.0 中的 AQE 主要有三種優(yōu)化手段,包括合并 shuffle 分區(qū),將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接,優(yōu)化連接中的數(shù)據(jù)傾斜。

合并 shuffle 分區(qū)

當(dāng)參數(shù) spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 都被開(kāi)啟時(shí),通過(guò) map 端的統(tǒng)計(jì)信息合并 reduce 端的分區(qū)數(shù)。這種方式簡(jiǎn)化了查詢中 shuffle 分區(qū)數(shù)量的調(diào)優(yōu),不需要針對(duì)數(shù)據(jù)指定一個(gè)合理的分區(qū)數(shù),一旦通過(guò)參數(shù) spark.sql.adaptive.coalescePartitions.initialPartitionNum 設(shè)置了一個(gè)足夠大的初始 shuffle 分區(qū)數(shù),Spark 會(huì)在運(yùn)行時(shí)選取合理的分區(qū)數(shù)。

Property Name Default Meaning Since Version
spark.sql.adaptive.coalescePartitions.enabled true 當(dāng)該參數(shù)和參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true 時(shí),Spark 會(huì)根據(jù)目標(biāo)大小(由參數(shù) spark.sql.adaptive.advisoryPartitionSizeInBytes 控制)合并連續(xù)的 shuffle 分區(qū),以避免過(guò)多的小分區(qū)。 3.0.0
spark.sql.adaptive.coalescePartitions.minPartitionNum Default Parallelism 合并分區(qū)后的最小分區(qū)數(shù)。如果沒(méi)有設(shè)置,默認(rèn)值為集群最小并行數(shù)。該參數(shù)只有在參數(shù) spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 參數(shù)都開(kāi)啟的情況下才生效。 3.0.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum 200 分區(qū)合并前的初始分區(qū)數(shù)。默認(rèn)情況下與參數(shù) spark.sql.shuffle.partitions 的值相同。該參數(shù)只有在參數(shù) spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled 參數(shù)都開(kāi)啟的情況下才生效。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB 自適應(yīng)優(yōu)化時(shí)建議的分區(qū)大小,需要參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true。對(duì)于分區(qū)合并和分割數(shù)據(jù)傾斜分區(qū)起作用。 3.0.0

將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接

當(dāng)連接操作的調(diào)用者或者被調(diào)用者的數(shù)據(jù)量小于廣播哈希連接的限值時(shí),AQE 會(huì)將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接。這種方式并不比在開(kāi)始就指定廣播連接策略更高效,但是也比 sort-merge 連接好多了。優(yōu)化之后可以在一個(gè)節(jié)點(diǎn)上保存需要連接的所有數(shù)據(jù),讀取本地文件節(jié)省了網(wǎng)絡(luò)傳輸開(kāi)銷(需要參數(shù) spark.sql.adaptive.localShuffleReader.enabled 設(shè)置為 true)。

優(yōu)化連接中的數(shù)據(jù)傾斜

數(shù)據(jù)傾斜會(huì)嚴(yán)重降低連接查詢的效率,AQE 通過(guò)分割(或者折疊)將傾斜的數(shù)據(jù)分區(qū)組合成大小基本均勻的子任務(wù)來(lái)動(dòng)態(tài)處理 sort-merge 連接中的數(shù)據(jù)傾斜。當(dāng)參數(shù)spark.sql.adaptive.enabledspark.sql.adaptive.skewJoin.enabled 都開(kāi)啟時(shí)生效。

Property Name Default Meaning Since Version
spark.sql.adaptive.skewJoin.enabled true 當(dāng)該參數(shù)和參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true 時(shí),Spark 通過(guò)分割(或者折疊)動(dòng)態(tài)處理數(shù)sort-merge 連接中的據(jù)傾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 10 當(dāng)一個(gè)分區(qū)的大小大于該參數(shù)的值乘以分區(qū)大小中位數(shù),同時(shí)大于參數(shù) spark.sql.adaptive.skewedPartitionThresholdInBytes 設(shè)置的值時(shí),該分區(qū)被判定為數(shù)據(jù)傾斜。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB 當(dāng)一個(gè)分區(qū)的大小大于該參數(shù)的值,同時(shí)大于參數(shù)的spark.sql.adaptive.skewJoin.skewedPartitionFactor 值乘以分區(qū)大小中位數(shù)時(shí),該分區(qū)被判定為數(shù)據(jù)傾斜。理想情況下該參數(shù)的值應(yīng)該比參數(shù) spark.sql.adaptive.advisoryPartitionSizeInBytes 的值大。 3.0.0
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容