轉(zhuǎn)載請(qǐng)注明出處,謝謝合作~
該篇中的示例暫時(shí)只有 Scala 版本~
性能調(diào)優(yōu)
- 在內(nèi)存中緩存數(shù)據(jù)(Caching Data In Memory)
- 其他配置項(xiàng)(Other Configuration Options)
- SQL 查詢連接策略提示(Join Strategy Hints for SQL Queries)
- SQL 查詢合并提示(Coalesce Hints for SQL Queries)
- 自適應(yīng)查詢引擎(Adaptive Query Execution)
- 合并 shuffle 分區(qū)(Coalescing Post Shuffle Partitions)
- 將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接(Converting sort-merge join to broadcast join)
- 優(yōu)化連接中的數(shù)據(jù)傾斜(Optimizing Skew Join)
對(duì)于某些工作場(chǎng)景,可以通過(guò)在內(nèi)存中緩存數(shù)據(jù)或者開(kāi)啟一些試驗(yàn)功能來(lái)提升性能。
在內(nèi)存中緩存數(shù)據(jù)
Spark SQL 能做以一種內(nèi)存中的列式存儲(chǔ)的格式緩存數(shù)據(jù),可以通過(guò)調(diào)用 spark.catalog.cacheTable("tableName") 或者 dataFrame.cache() 方法實(shí)現(xiàn)。之后 Spark SQL 只需要掃描用到的列中的數(shù)據(jù),而且能夠自動(dòng)使用壓縮來(lái)節(jié)省內(nèi)存空間,緩解 GC 壓力??梢酝ㄟ^(guò)調(diào)用 spark.catalog.uncacheTable("tableName") 來(lái)刪除緩存。
內(nèi)存緩存的配置項(xiàng)可以通過(guò) SparkSession 對(duì)象的 setConf 方法或者執(zhí)行 SQL 中的 SET key=value 語(yǔ)句來(lái)設(shè)置。
| Property Name | Default | Meaning | Since Version |
|---|---|---|---|
spark.sql.inMemoryColumnarStorage.compressed |
true | 當(dāng)設(shè)置為 true 時(shí),Spark SQL 會(huì)根據(jù)數(shù)據(jù)統(tǒng)計(jì)信息為每一列自動(dòng)應(yīng)用一種壓縮格式。 | 1.0.1 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 控制列式存儲(chǔ)中批次的大小。較大的批次可以提升內(nèi)存利用率和壓縮效率,但是會(huì)提高 OOM 的風(fēng)險(xiǎn)。 | 1.1.1 |
其他配置項(xiàng)
下列選項(xiàng)也可以用來(lái)對(duì)查詢的性能進(jìn)行調(diào)優(yōu)。由于越來(lái)越多的優(yōu)化都會(huì)自動(dòng)進(jìn)行,這些選項(xiàng)在未來(lái)的版本中可能被被廢棄。
| Property Name | Default | Meaning | Since Version |
|---|---|---|---|
spark.sql.files.maxPartitionBytes |
134217728 (128 MB) | 在讀取文件數(shù)據(jù)時(shí)單分區(qū)的最大字節(jié)數(shù)。該配置只在使用文件類型的數(shù)據(jù)源(比如 Parquet, JSON 和 ORC)時(shí)有效。 | 2.0.0 |
spark.sql.files.openCostInBytes |
4194304 (4 MB) | 讀取一個(gè)文件時(shí)的預(yù)計(jì)開(kāi)銷,由可以被同時(shí)掃描的字節(jié)數(shù)控制。該參數(shù)用在將多個(gè)文件歸入一個(gè)分區(qū)時(shí)起作用。被高估會(huì)有積極的效果,此時(shí)包含小文件的分區(qū)將會(huì)比包含大文件的分區(qū)(優(yōu)先調(diào)度)要快。該配置只在使用文件類型的數(shù)據(jù)源(比如 Parquet, JSON 和 ORC)時(shí)有效。 | 2.0.0 |
spark.sql.broadcastTimeout |
300 | 廣播連接(Broadcast Join)中超時(shí)等待時(shí)間,單位為秒。 | 1.3.0 |
spark.sql.autoBroadcastJoinThreshold |
10485760 (10 MB) | 在連接操作中,如果一張表的大小低于該配置,它會(huì)被廣播到其他的節(jié)點(diǎn)。設(shè)置為 -1 表示禁用廣播連接。注意,目前只能夠通過(guò) ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan 命令獲得 Hive 表的統(tǒng)計(jì)信息。 |
1.1.0 |
spark.sql.shuffle.partitions |
200 | 連接操作或者聚合操作引起的 shuffle 行為的分區(qū)數(shù)。 | 1.1.0 |
SQL 查詢連接策略提示
目前的連接策略提示有 BROADCAST,MERGE,SHUFFLE_HASH 和 SHUFFLE_REPLICATE_NL,在一張關(guān)系表和另一張關(guān)系表做連接操作時(shí),Spark 會(huì)根據(jù)連接策略提示采用不同的連接策略。例如,當(dāng)對(duì)表「t1」使用 BROADCAST 連接策略提示時(shí),Spark 會(huì)優(yōu)先采用廣播連接(根據(jù)是否含有 equi-join 鍵來(lái)決定采用 broadcast hash join 或者 broadcast nested loop join),即使的統(tǒng)計(jì)信息表明其大小超出了上述配置 spark.sql.autoBroadcastJoinThreshold 的限值。
如果連接操作的調(diào)用者和被調(diào)用者采用了不同的連接策略提示,優(yōu)先級(jí)從高到低為:BROADCAST、MERGE、SHUFFLE_HASH、SHUFFLE_REPLICATE_NL。如果連接操作的調(diào)用者和被調(diào)用者都采用了 BROADCAST 策略提示或者 SHUFFLE_HASH 策略提示,Spark 會(huì)根據(jù)連接類型和表數(shù)據(jù)量大小做出選擇。
注意,由于指定的連接策略提示可能并不支持所有的連接類型,所以并不能保證 Spark 所采取的連接策略一定是被指定的那個(gè)。
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
詳情參見(jiàn)文檔 Join Hints。
SQL 查詢合并提示
合并提示可以讓 Spark SQL 用戶控制一些算子的輸出文件數(shù)量,這些算子包括 coalesce,repartition 和 repartitionByRange。合并指引可以用來(lái)調(diào)優(yōu)性能,減少輸出文件的數(shù)量?!窩OALESCE」指引只有一個(gè)分區(qū)數(shù)量作為參數(shù)?!窻EPARTITION」的參數(shù)可以是分區(qū)數(shù)或者列名或者兩者一起?!窻EPARTITION_BY_RANGE」提示必須有一個(gè)列名參數(shù),分區(qū)數(shù)是可選的參數(shù)。
SELECT /*+ COALESCE(3) */ * FROM t
SELECT /*+ REPARTITION(3) */ * FROM t
SELECT /*+ REPARTITION(c) */ * FROM t
SELECT /*+ REPARTITION(3, c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
詳情參見(jiàn)文檔 Partitioning Hints。
自適應(yīng)查詢引擎
自適應(yīng)查詢引擎(AQE)是 Spark SQL 當(dāng)中的一項(xiàng)優(yōu)化技術(shù),能夠充分利用運(yùn)行時(shí)統(tǒng)計(jì)信息來(lái)選取最高效的查詢計(jì)劃。AQE 默認(rèn)是關(guān)閉的,Spark SQL 可以使用保護(hù)傘配置參數(shù) spark.sql.adaptive.enabled 來(lái)控制是否開(kāi)啟。Spark 3.0 中的 AQE 主要有三種優(yōu)化手段,包括合并 shuffle 分區(qū),將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接,優(yōu)化連接中的數(shù)據(jù)傾斜。
合并 shuffle 分區(qū)
當(dāng)參數(shù) spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 都被開(kāi)啟時(shí),通過(guò) map 端的統(tǒng)計(jì)信息合并 reduce 端的分區(qū)數(shù)。這種方式簡(jiǎn)化了查詢中 shuffle 分區(qū)數(shù)量的調(diào)優(yōu),不需要針對(duì)數(shù)據(jù)指定一個(gè)合理的分區(qū)數(shù),一旦通過(guò)參數(shù) spark.sql.adaptive.coalescePartitions.initialPartitionNum 設(shè)置了一個(gè)足夠大的初始 shuffle 分區(qū)數(shù),Spark 會(huì)在運(yùn)行時(shí)選取合理的分區(qū)數(shù)。
| Property Name | Default | Meaning | Since Version |
|---|---|---|---|
spark.sql.adaptive.coalescePartitions.enabled |
true | 當(dāng)該參數(shù)和參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true 時(shí),Spark 會(huì)根據(jù)目標(biāo)大小(由參數(shù) spark.sql.adaptive.advisoryPartitionSizeInBytes 控制)合并連續(xù)的 shuffle 分區(qū),以避免過(guò)多的小分區(qū)。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionNum |
Default Parallelism | 合并分區(qū)后的最小分區(qū)數(shù)。如果沒(méi)有設(shè)置,默認(rèn)值為集群最小并行數(shù)。該參數(shù)只有在參數(shù) spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 參數(shù)都開(kāi)啟的情況下才生效。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
200 | 分區(qū)合并前的初始分區(qū)數(shù)。默認(rèn)情況下與參數(shù) spark.sql.shuffle.partitions 的值相同。該參數(shù)只有在參數(shù) spark.sql.adaptive.enabled 和 spark.sql.adaptive.coalescePartitions.enabled 參數(shù)都開(kāi)啟的情況下才生效。 |
3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes |
64 MB | 自適應(yīng)優(yōu)化時(shí)建議的分區(qū)大小,需要參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true。對(duì)于分區(qū)合并和分割數(shù)據(jù)傾斜分區(qū)起作用。 |
3.0.0 |
將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接
當(dāng)連接操作的調(diào)用者或者被調(diào)用者的數(shù)據(jù)量小于廣播哈希連接的限值時(shí),AQE 會(huì)將 sort-merge 連接轉(zhuǎn)換為 broadcast 連接。這種方式并不比在開(kāi)始就指定廣播連接策略更高效,但是也比 sort-merge 連接好多了。優(yōu)化之后可以在一個(gè)節(jié)點(diǎn)上保存需要連接的所有數(shù)據(jù),讀取本地文件節(jié)省了網(wǎng)絡(luò)傳輸開(kāi)銷(需要參數(shù) spark.sql.adaptive.localShuffleReader.enabled 設(shè)置為 true)。
優(yōu)化連接中的數(shù)據(jù)傾斜
數(shù)據(jù)傾斜會(huì)嚴(yán)重降低連接查詢的效率,AQE 通過(guò)分割(或者折疊)將傾斜的數(shù)據(jù)分區(qū)組合成大小基本均勻的子任務(wù)來(lái)動(dòng)態(tài)處理 sort-merge 連接中的數(shù)據(jù)傾斜。當(dāng)參數(shù)spark.sql.adaptive.enabled 和 spark.sql.adaptive.skewJoin.enabled 都開(kāi)啟時(shí)生效。
| Property Name | Default | Meaning | Since Version |
|---|---|---|---|
spark.sql.adaptive.skewJoin.enabled |
true | 當(dāng)該參數(shù)和參數(shù) spark.sql.adaptive.enabled 設(shè)置為 true 時(shí),Spark 通過(guò)分割(或者折疊)動(dòng)態(tài)處理數(shù)sort-merge 連接中的據(jù)傾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
10 | 當(dāng)一個(gè)分區(qū)的大小大于該參數(shù)的值乘以分區(qū)大小中位數(shù),同時(shí)大于參數(shù) spark.sql.adaptive.skewedPartitionThresholdInBytes 設(shè)置的值時(shí),該分區(qū)被判定為數(shù)據(jù)傾斜。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | 當(dāng)一個(gè)分區(qū)的大小大于該參數(shù)的值,同時(shí)大于參數(shù)的spark.sql.adaptive.skewJoin.skewedPartitionFactor 值乘以分區(qū)大小中位數(shù)時(shí),該分區(qū)被判定為數(shù)據(jù)傾斜。理想情況下該參數(shù)的值應(yīng)該比參數(shù) spark.sql.adaptive.advisoryPartitionSizeInBytes 的值大。 |
3.0.0 |