Spark CBO 背景
本文將介紹 CBO,它充分考慮了數(shù)據(jù)本身的特點(diǎn)(如大小、分布)以及操作算子的特點(diǎn)(中間結(jié)果集的分布及大?。┘按鷥r(jià),從而更好的選擇執(zhí)行代價(jià)最小的物理執(zhí)行計(jì)劃,即 SparkPlan。
Spark CBO 原理
CBO 原理是計(jì)算所有可能的物理計(jì)劃的代價(jià),并挑選出代價(jià)最小的物理執(zhí)行計(jì)劃。其核心在于評估一個(gè)給定的物理執(zhí)行計(jì)劃的代價(jià)。
物理執(zhí)行計(jì)劃是一個(gè)樹狀結(jié)構(gòu),其代價(jià)等于每個(gè)執(zhí)行節(jié)點(diǎn)的代價(jià)總合,如下圖所示。
而每個(gè)執(zhí)行節(jié)點(diǎn)的代價(jià),分為兩個(gè)部分
該執(zhí)行節(jié)點(diǎn)對數(shù)據(jù)集的影響,或者說該節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布
該執(zhí)行節(jié)點(diǎn)操作算子的代價(jià)
每個(gè)操作算子的代價(jià)相對固定,可用規(guī)則來描述。而執(zhí)行節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布,分為兩個(gè)部分:1) 初始數(shù)據(jù)集,也即原始表,其數(shù)據(jù)集的大小與分布可直接通過統(tǒng)計(jì)得到;2)中間節(jié)點(diǎn)輸出數(shù)據(jù)集的大小與分布可由其輸入數(shù)據(jù)集的信息與操作本身的特點(diǎn)推算。
所以,最終主要需要解決兩個(gè)問題
如何獲取原始數(shù)據(jù)集的統(tǒng)計(jì)信息
如何根據(jù)輸入數(shù)據(jù)集估算特定算子的輸出數(shù)據(jù)集
Statistics 收集
通過如下 SQL 語句,可計(jì)算出整個(gè)表的記錄總數(shù)以及總大小
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">ANALYZE TABLE table_name COMPUTE STATISTICS;</pre>
從如下示例中,Statistics 一行可見, customer 表數(shù)據(jù)總大小為 37026233 字節(jié),即 35.3MB,總記錄數(shù)為 28萬,與事實(shí)相符。
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS;
Time taken: 12.888 secondsspark-sql> desc extended customer;
c_customer_sk bigint NULL
c_customer_id string NULLTable Properties [transient_lastDdlTime=1536997324]
Statistics 37026233 bytes, 280000 rowsTime taken: 1.691 seconds, Fetched 36 row(s)</pre>
通過如下 SQL 語句,可計(jì)算出指定列的統(tǒng)計(jì)信息
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">ANALYZE TABLE table_name COMPUTE STATISTICS FOR COLUMNS [column1] [,column2] [,column3] [,column4] ... [,columnn];</pre>
從如下示例可見,customer 表的 c_customer_sk 列最小值為 1, 最大值為 280000,null 值個(gè)數(shù)為 0,不同值個(gè)數(shù)為 274368,平均列長度為 8,最大列長度為 8。
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk, c_customer_id, c_current_cdemo_sk;
Time taken: 9.139 seconds
spark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram NULL</pre>
除上述示例中的統(tǒng)計(jì)信息外,Spark CBO 還直接等高直方圖。在上例中,histogram 為 NULL。其原因是,spark.sql.statistics.histogram.enabled 默認(rèn)值為 false,也即 ANALYZE 時(shí)默認(rèn)不計(jì)算及存儲(chǔ) histogram。
下例中,通過 SET spark.sql.statistics.histogram.enabled=true; 啟用 histogram 后,完整的統(tǒng)計(jì)信息如下。
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">spark-sql> ANALYZE TABLE customer COMPUTE STATISTICS FOR COLUMNS c_customer_sk,c_customer_id,c_current_cdemo_sk,c_current_hdemo_sk,c_current_addr_sk,c_first_shipto_date_sk,c_first_sales_date_sk,c_salutation,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_day,c_birth_month,c_birth_year,c_birth_country,c_login,c_email_address,c_last_review_date;
Time taken: 125.624 secondsspark-sql> desc extended customer c_customer_sk;
col_name c_customer_sk
data_type bigint
comment NULL
min 1
max 280000
num_nulls 0
distinct_count 274368
avg_col_len 8
max_col_len 8
histogram height: 1102.3622047244094, num_of_bins: 254
bin_0 lower_bound: 1.0, upper_bound: 1090.0, distinct_count: 1089...
bin_253 lower_bound: 278870.0, upper_bound: 280000.0, distinct_count: 1106</pre>
從上圖可見,生成的 histogram 為 equal-height histogram,且高度為 1102.36,bin 數(shù)為 254。其中 bin 個(gè)數(shù)可由 spark.sql.statistics.histogram.numBins 配置。對于每個(gè) bin,勻記錄其最小值,最大值,以及 distinct count。
值得注意的是,這里的 distinct count 并不是精確值,而是通過 HyperLogLog 計(jì)算出來的近似值。使用 HyperLogLog 的原因有二
使用 HyperLogLog 計(jì)算 distinct count 速度快速
HyperLogLog 計(jì)算出的 distinct count 可以合并。例如可以直接將兩個(gè) bin 的 HyperLogLog 值合并算出這兩個(gè) bin 總共的 distinct count,而無須從重新計(jì)算,且合并結(jié)果的誤差可控
算子對數(shù)據(jù)集影響估計(jì)
對于中間算子,可以根據(jù)輸入數(shù)據(jù)集的統(tǒng)計(jì)信息以及算子的特性,可以估算出輸出數(shù)據(jù)集的統(tǒng)計(jì)結(jié)果。
本節(jié)以 Filter 為例說明算子對數(shù)據(jù)集的影響。
對于常見的 Column A < value B Filter,可通過如下方式估算輸出中間結(jié)果的統(tǒng)計(jì)信息
若 B < A.min,則無數(shù)據(jù)被選中,輸出結(jié)果為空
若 B > A.max,則全部數(shù)據(jù)被選中,輸出結(jié)果與 A 相同,且統(tǒng)計(jì)信息不變
若 A.min < B < A.max,則被選中的數(shù)據(jù)占比為 (B.value - A.min) / (A.max - A.min),A.min 不變,A.max 更新為 B.value,A.ndv = A.ndv * (B.value - A.min) / (A.max - A.min)
上述估算的前提是,字段 A 數(shù)據(jù)均勻分布。但很多時(shí)候,數(shù)據(jù)分布并不均勻,且當(dāng)數(shù)據(jù)傾斜嚴(yán)重是,上述估算誤差較大。此時(shí),可充分利用 histogram 進(jìn)行更精確的估算
啟用 Historgram 后,F(xiàn)ilter Column A < value B的估算方法為
若 B < A.min,則無數(shù)據(jù)被選中,輸出結(jié)果為空
若 B > A.max,則全部數(shù)據(jù)被選中,輸出結(jié)果與 A 相同,且統(tǒng)計(jì)信息不變
若 A.min < B < A.max,則被選中的數(shù)據(jù)占比為 height(<B) / height(All),A.min 不變,A.max = B.value,A.ndv = ndv(<B)
在上圖中,B.value = 15,A.min = 0,A.max = 32,bin 個(gè)數(shù)為 10。Filter 后 A.ndv = ndv(<B.value) = ndv(<15)。該值可根據(jù) A < 15 的 5 個(gè) bin 的 ndv 通過 HyperLogLog 合并而得,無須重新計(jì)算所有 A < 15 的數(shù)據(jù)。
算子代價(jià)估計(jì)
SQL 中常見的操作有 Selection(由 select 語句表示),F(xiàn)ilter(由 where 語句表示)以及笛卡爾乘積(由 join 語句表示)。其中代價(jià)最高的是 join。
Spark SQL 的 CBO 通過如下方法估算 join 的代價(jià)
<pre style="margin: 0px; padding: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important;">Cost = rows * weight + size * (1 - weight)
Cost = CostCPU * weight + CostIO * (1 - weight)</pre>
其中 rows 即記錄行數(shù)代表了 CPU 代價(jià),size 代表了 IO 代價(jià)。weight 由 *spark.sql.cbo.joinReorder.card.weight *決定,其默認(rèn)值為 0.7。
Build側(cè)選擇
對于兩表Hash Join,一般選擇小表作為build size,構(gòu)建哈希表,另一邊作為 probe side。未開啟 CBO 時(shí),根據(jù)表原始數(shù)據(jù)大小選擇 t2 作為build side
而開啟 CBO 后,基于估計(jì)的代價(jià)選擇 t1 作為 build side。更適合本例
優(yōu)化 Join 類型
Spark SQL 中,Join 可分為 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代價(jià)相對較高。BroadcastJoin 無須 Join,但要求至少有一張表足夠小,能通過 Spark 的 Broadcast 機(jī)制廣播到每個(gè) Executor 中。
在不開啟 CBO 中,Spark SQL 通過 spark.sql.autoBroadcastJoinThreshold 判斷是否啟用 BroadcastJoin。其默認(rèn)值為 10485760 即 10 MB。
并且該判斷基于參與 Join 的表的原始大小。
在下圖示例中,Table 1 大小為 1 TB,Table 2 大小為 20 GB,因此在對二者進(jìn)行 join 時(shí),由于二者都遠(yuǎn)大于自動(dòng) BroatcastJoin 的閾值,因此 Spark SQL 在未開啟 CBO 時(shí)選用 SortMergeJoin 對二者進(jìn)行 Join。
而開啟 CBO 后,由于 Table 1 經(jīng)過 Filter 1 后結(jié)果集大小為 500 GB,Table 2 經(jīng)過 Filter 2 后結(jié)果集大小為 10 MB 低于自動(dòng) BroatcastJoin 閾值,因此 Spark SQL 選用 BroadcastJoin。
優(yōu)化多表 Join 順序
未開啟 CBO 時(shí),Spark SQL 按 SQL 中 join 順序進(jìn)行 Join。極端情況下,整個(gè) Join 可能是 left-deep tree。在下圖所示 TPC-DS Q25 中,多路 Join 存在如下問題,因此耗時(shí) 241 秒。
left-deep tree,因此所有后續(xù) Join 都依賴于前面的 Join 結(jié)果,各 Join 間無法并行進(jìn)行
前面的兩次 Join 輸入輸出數(shù)據(jù)量均非常大,屬于大 Join,執(zhí)行時(shí)間較長
開啟 CBO 后, Spark SQL 將執(zhí)行計(jì)劃優(yōu)化如下
優(yōu)化后的 Join 有如下優(yōu)勢,因此執(zhí)行時(shí)間降至 71 秒
Join 樹不再是 left-deep tree,因此 Join 3 與 Join 4 可并行進(jìn)行,Join 5 與 Join 6 可并行進(jìn)行
最大的 Join 5 輸出數(shù)據(jù)只有兩百萬條結(jié)果,Join 6 有 1.49 億條結(jié)果,Join 7相當(dāng)于小 Join
總結(jié)
上文 Spark SQL 內(nèi)部原理 中介紹的 Optimizer 屬于 RBO,實(shí)現(xiàn)簡單有效。它屬于 LogicalPlan 的優(yōu)化,所有優(yōu)化均基于 LogicalPlan 本身的特點(diǎn),未考慮數(shù)據(jù)本身的特點(diǎn),也未考慮算子本身的代價(jià)
本文介紹的 CBO 考慮了數(shù)據(jù)的統(tǒng)計(jì)特征,從而選擇總代價(jià)最低的物理執(zhí)行計(jì)劃。但物理執(zhí)行計(jì)劃是固定的,一旦選定,不可更改。未考慮運(yùn)行時(shí)信息
下文將介紹 Spark SQL Adapptive Execution,它可根據(jù)運(yùn)行時(shí)信息動(dòng)態(tài)調(diào)整執(zhí)行計(jì)劃從而優(yōu)化執(zhí)行
關(guān)注我的公眾號(hào),后臺(tái)回復(fù)【JAVAPDF】獲取200頁面試題!
5萬人關(guān)注的大數(shù)據(jù)成神之路,不來了解一下嗎?
5萬人關(guān)注的大數(shù)據(jù)成神之路,真的不來了解一下嗎?
5萬人關(guān)注的大數(shù)據(jù)成神之路,確定真的不來了解一下嗎?