Spark SQL CBO 基于代價(jià)的優(yōu)化

Spark CBO 背景

本文將介紹 CBO,它充分考慮了數(shù)據(jù)本身的特點(diǎn)(如大小、分布)以及操作算子的特點(diǎn)(中間結(jié)果集的分布及大?。┘按鷥r(jià),從而更好的選擇執(zhí)行代價(jià)最小的物理執(zhí)行計(jì)劃,即 SparkPlan。

Spark CBO 原理

CBO 原理是計(jì)算所有可能的物理計(jì)劃的代價(jià),并挑選出代價(jià)最小的物理執(zhí)行計(jì)劃。其核心在于評估一個(gè)給定的物理執(zhí)行計(jì)劃的代價(jià)。

物理執(zhí)行計(jì)劃是一個(gè)樹狀結(jié)構(gòu),其代價(jià)等于每個(gè)執(zhí)行節(jié)點(diǎn)的代價(jià)總合,如下圖所示。

image

而每個(gè)執(zhí)行節(jié)點(diǎn)的代價(jià),分為兩個(gè)部分

  • 該執(zhí)行節(jié)點(diǎn)對數(shù)據(jù)集的影響,或者說該節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布

  • 該執(zhí)行節(jié)點(diǎn)操作算子的代價(jià)

每個(gè)操作算子的代價(jià)相對固定,可用規(guī)則來描述。而執(zhí)行節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布,分為兩個(gè)部分:1) 初始數(shù)據(jù)集,也即原始表,其數(shù)據(jù)集的大小與分布可直接通過統(tǒng)計(jì)得到;2)中間節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布可由其輸入數(shù)據(jù)集的信息與操作本身的特點(diǎn)推算。

所以,最終主要需要解決兩個(gè)問題

  • 如何獲取原始數(shù)據(jù)集的統(tǒng)計(jì)信息

  • 如何根據(jù)輸入數(shù)據(jù)集估算特定算子的輸出數(shù)據(jù)集

Statistics 收集

通過如下 SQL 語句,可計(jì)算出整個(gè)表的記錄總數(shù)以及總大小

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">ANALYZE TABLE table_name COMPUTE STATISTICS;</pre>

從如下示例中,Statistics 一行可見, customer 表數(shù)據(jù)總大小為 37026233 字節(jié),即 35.3MB,總記錄數(shù)為 28萬,與事實(shí)相符。

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS;
Time taken: 12.888 seconds

spark-sql> desc extended customer;
c_customer_sk bigint NULL
c_customer_id string NULL

Table Properties [transient_lastDdlTime=1536997324]
Statistics 37026233 bytes, 280000 rows

Time taken: 1.691 seconds, Fetched 36 row(s)</pre>

通過如下 SQL 語句,可計(jì)算出指定列的統(tǒng)計(jì)信息

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS [column1] [,column2] [,column3] [,column4] ... [,columnn];</pre>

從如下示例可見,customer 表的 c_customer_sk 列最小值為 1, 最大值為 280000,null 值個(gè)數(shù)為 0,不同值個(gè)數(shù)為 274368,平均列長度為 8,最大列長度為 8。

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk, c_customer_id, c_current_cdemo_sk;
Time taken: 9.139 seconds
spark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram NULL</pre>

除上述示例中的統(tǒng)計(jì)信息外,Spark CBO 還直接等高直方圖。在上例中,histogram 為 NULL。其原因是,spark.sql.statistics.histogram.enabled 默認(rèn)值為 false,也即 ANALYZE 時(shí)默認(rèn)不計(jì)算及存儲(chǔ) histogram。

下例中,通過 SET spark.sql.statistics.histogram.enabled=true; 啟用 histogram 后,完整的統(tǒng)計(jì)信息如下。

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk,c_customer_id,c_current_cdemo_sk,c_current_hdemo_sk,c_current_addr_sk,c_first_shipto_date_sk,c_first_sales_date_sk,c_salutation,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_day,c_birth_month,c_birth_year,c_birth_country,c_login,c_email_address,c_last_review_date;
Time taken: 125.624 seconds

spark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram height: 1102.3622047244094, num_of_bins: 254
bin_0 lower_bound: 1.0, upper_bound: 1090.0, distinct_count: 1089

...

bin_253 lower_bound: 278870.0, upper_bound: 280000.0, distinct_count: 1106</pre>

從上圖可見,生成的 histogram 為 equal-height histogram,且高度為 1102.36,bin 數(shù)為 254。其中 bin 個(gè)數(shù)可由 spark.sql.statistics.histogram.numBins 配置。對于每個(gè) bin,勻記錄其最小值,最大值,以及 distinct count。

值得注意的是,這里的 distinct count 并不是精確值,而是通過 HyperLogLog 計(jì)算出來的近似值。使用 HyperLogLog 的原因有二

  • 使用 HyperLogLog 計(jì)算 distinct count 速度快速

  • HyperLogLog 計(jì)算出的 distinct count 可以合并。例如可以直接將兩個(gè) bin 的 HyperLogLog 值合并算出這兩個(gè) bin 總共的 distinct count,而無須從重新計(jì)算,且合并結(jié)果的誤差可控

算子對數(shù)據(jù)集影響估計(jì)

對于中間算子,可以根據(jù)輸入數(shù)據(jù)集的統(tǒng)計(jì)信息以及算子的特性,可以估算出輸出數(shù)據(jù)集的統(tǒng)計(jì)結(jié)果。

image

本節(jié)以 Filter 為例說明算子對數(shù)據(jù)集的影響。

對于常見的 Column A < value B Filter,可通過如下方式估算輸出中間結(jié)果的統(tǒng)計(jì)信息

  • 若 B < A.min,則無數(shù)據(jù)被選中,輸出結(jié)果為空

  • 若 B > A.max,則全部數(shù)據(jù)被選中,輸出結(jié)果與 A 相同,且統(tǒng)計(jì)信息不變

  • 若 A.min < B < A.max,則被選中的數(shù)據(jù)占比為 (B.value - A.min) / (A.max - A.min),A.min 不變,A.max 更新為 B.value,A.ndv = A.ndv * (B.value - A.min) / (A.max - A.min)

image

上述估算的前提是,字段 A 數(shù)據(jù)均勻分布。但很多時(shí)候,數(shù)據(jù)分布并不均勻,且當(dāng)數(shù)據(jù)傾斜嚴(yán)重是,上述估算誤差較大。此時(shí),可充分利用 histogram 進(jìn)行更精確的估算

image

啟用 Historgram 后,F(xiàn)ilter Column A < value B的估算方法為

  • 若 B < A.min,則無數(shù)據(jù)被選中,輸出結(jié)果為空

  • 若 B > A.max,則全部數(shù)據(jù)被選中,輸出結(jié)果與 A 相同,且統(tǒng)計(jì)信息不變

  • 若 A.min < B < A.max,則被選中的數(shù)據(jù)占比為 height(<B) / height(All),A.min 不變,A.max = B.value,A.ndv = ndv(<B)

在上圖中,B.value = 15,A.min = 0,A.max = 32,bin 個(gè)數(shù)為 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。該值可根據(jù) A < 15 的 5 個(gè) bin 的 ndv 通過 HyperLogLog 合并而得,無須重新計(jì)算所有 A < 15 的數(shù)據(jù)。

算子代價(jià)估計(jì)

SQL 中常見的操作有 Selection(由 select 語句表示),F(xiàn)ilter(由 where 語句表示)以及笛卡爾乘積(由 join 語句表示)。其中代價(jià)最高的是 join。

Spark SQL 的 CBO 通過如下方法估算 join 的代價(jià)

<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">Cost = rows * weight + size * (1 - weight)
Cost = CostCPU * weight + CostIO * (1 - weight)</pre>

其中 rows 即記錄行數(shù)代表了 CPU 代價(jià),size 代表了 IO 代價(jià)。weight 由 *spark.sql.cbo.joinReorder.card.weight *決定,其默認(rèn)值為 0.7。

Build側(cè)選擇

對于兩表Hash Join,一般選擇小表作為build size,構(gòu)建哈希表,另一邊作為 probe side。未開啟 CBO 時(shí),根據(jù)表原始數(shù)據(jù)大小選擇 t2 作為build side

image

而開啟 CBO 后,基于估計(jì)的代價(jià)選擇 t1 作為 build side。更適合本例

image

優(yōu)化 Join 類型

Spark SQL 中,Join 可分為 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代價(jià)相對較高。BroadcastJoin 無須 Join,但要求至少有一張表足夠小,能通過 Spark 的 Broadcast 機(jī)制廣播到每個(gè) Executor 中。

在不開啟 CBO 中,Spark SQL 通過 spark.sql.autoBroadcastJoinThreshold 判斷是否啟用 BroadcastJoin。其默認(rèn)值為 10485760 即 10 MB。

并且該判斷基于參與 Join 的表的原始大小。

在下圖示例中,Table 1 大小為 1 TB,Table 2 大小為 20 GB,因此在對二者進(jìn)行 join 時(shí),由于二者都遠(yuǎn)大于自動(dòng) BroatcastJoin 的閾值,因此 Spark SQL 在未開啟 CBO 時(shí)選用 SortMergeJoin 對二者進(jìn)行 Join。

而開啟 CBO 后,由于 Table 1 經(jīng)過 Filter 1 后結(jié)果集大小為 500 GB,Table 2 經(jīng)過 Filter 2 后結(jié)果集大小為 10 MB 低于自動(dòng) BroatcastJoin 閾值,因此 Spark SQL 選用 BroadcastJoin。

image

優(yōu)化多表 Join 順序

未開啟 CBO 時(shí),Spark SQL 按 SQL 中 join 順序進(jìn)行 Join。極端情況下,整個(gè) Join 可能是 left-deep tree。在下圖所示 TPC-DS Q25 中,多路 Join 存在如下問題,因此耗時(shí) 241 秒。

  • left-deep tree,因此所有后續(xù) Join 都依賴于前面的 Join 結(jié)果,各 Join 間無法并行進(jìn)行

  • 前面的兩次 Join 輸入輸出數(shù)據(jù)量均非常大,屬于大 Join,執(zhí)行時(shí)間較長

image

開啟 CBO 后, Spark SQL 將執(zhí)行計(jì)劃優(yōu)化如下

image

優(yōu)化后的 Join 有如下優(yōu)勢,因此執(zhí)行時(shí)間降至 71 秒

  • Join 樹不再是 left-deep tree,因此 Join 3 與 Join 4 可并行進(jìn)行,Join 5 與 Join 6 可并行進(jìn)行

  • 最大的 Join 5 輸出數(shù)據(jù)只有兩百萬條結(jié)果,Join 6 有 1.49 億條結(jié)果,Join 7相當(dāng)于小 Join

總結(jié)

  • 上文 Spark SQL 內(nèi)部原理 中介紹的 Optimizer 屬于 RBO,實(shí)現(xiàn)簡單有效。它屬于 LogicalPlan 的優(yōu)化,所有優(yōu)化均基于 LogicalPlan 本身的特點(diǎn),未考慮數(shù)據(jù)本身的特點(diǎn),也未考慮算子本身的代價(jià)

  • 本文介紹的 CBO 考慮了數(shù)據(jù)的統(tǒng)計(jì)特征,從而選擇總代價(jià)最低的物理執(zhí)行計(jì)劃。但物理執(zhí)行計(jì)劃是固定的,一旦選定,不可更改。未考慮運(yùn)行時(shí)信息

  • 下文將介紹 Spark SQL Adapptive Execution,它可根據(jù)運(yùn)行時(shí)信息動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃從而優(yōu)化執(zhí)行

關(guān)注我的公眾號(hào),后臺(tái)回復(fù)【JAVAPDF】獲取200頁面試題!

5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎?

5萬人關(guān)注的大數(shù)據(jù)成神之路,真的不來了解一下嗎?

5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎?

歡迎您關(guān)注《大數(shù)據(jù)成神之路》
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容