Catalyst Optimizers是Spark SQL的一個(gè)重要功能,他會(huì)將數(shù)據(jù)查詢轉(zhuǎn)換為執(zhí)行計(jì)劃。他分為四個(gè)步驟:
- 分析
- 邏輯優(yōu)化
- 物理規(guī)劃
- 生成代碼
例子:
M&Ms例子
兩段不同語(yǔ)言代碼的執(zhí)行代碼是相同的。所以無(wú)論是你使用了什么語(yǔ)言,你的查詢和計(jì)算會(huì)經(jīng)過(guò)相同處理。
# In Python
count_mnm_df = (mnm_df
.select("State", "Color", "Count")
.groupBy("State", "Color")
.agg(count("Count")
.alias("Total"))
.orderBy("Total", ascending=False))
-- In SQL
SELECT State, Color, Count, sum(Count) AS Total
FROM MNM_TABLE_NAME
GROUP BY State, Color, Count
ORDER BY Total DESC
使用count_mnm_df.explain(True)可以查看具體Python Code的詳細(xì)步驟。(在以后關(guān)于Debugging時(shí),我們會(huì)更深入的討論這部分)
count_mnm_df.explain(True)
== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Project [State#10, Color#11, Count#12]
+- Relation[State#10,Color#11,Count#12] csv
== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#24L DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Project [State#10, Color#11, Count#12]
+- Relation[State#10,Color#11,Count#12] csv
== Optimized Logical Plan ==
Sort [Total#24L DESC NULLS LAST], true
+- Aggregate [State#10, Color#11], [State#10, Color#11, count(Count#12) AS...]
+- Relation[State#10,Color#11,Count#12] csv
== Physical Plan ==
*(3) Sort [Total#24L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(Total#24L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[State#10, Color#11], functions=[count(Count#12)], output=[State#10, Color#11, Total#24L])
+- Exchange hashpartitioning(State#10, Color#11, 200)
+- *(1) HashAggregate(keys=[State#10, Color#11], functions=[partial_count(Count#12)], output=[State#10, Color#11, count#29L])
+- *(1) FileScan csv [State#10,Color#11,Count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/jules/gits/LearningSpark2.0/chapter2/py/src/... dataset.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<State:string,Color:string,Count:int>

階段1:Analysis(分析)
在進(jìn)行SQL或者Dataframe查詢時(shí),Spark SQL生成抽象Abstract Syntax Tree(邏輯樹)。在這個(gè)階段,任何行名和列名都會(huì)被抹除,取而代之的是一個(gè)內(nèi)部Catalog(日志),里面將會(huì)記錄所有的列名、行名、數(shù)據(jù)類型、函數(shù)、列表、數(shù)據(jù)庫(kù)等等。在所有這些屬性都被抹除后,查詢就會(huì)到下一個(gè)階段。
階段2:Logical Optimization(邏輯優(yōu)化)
在上圖中可以看到,Logical Optimization分為兩個(gè)小階段。首先根據(jù)標(biāo)準(zhǔn)的Rule Based Optimization,Catalyst Optimizer會(huì)構(gòu)建一個(gè)包含了多個(gè)plan的集,然后Cost-based optimizer(CBO)會(huì)分配每個(gè)plan的消耗。這些plan會(huì)被布置呈operator trees。
階段3:Physical Planning(規(guī)劃)
Spark SQL會(huì)為每個(gè)邏輯生成一個(gè)最佳規(guī)劃,使Operators和Spark執(zhí)行引擎進(jìn)行匹配。
階段4:Code Generation(代碼生成)
最后一個(gè)階段為生成高效的Java代碼,并在每個(gè)機(jī)器上運(yùn)行。因?yàn)镾park SQL可以對(duì)保存在內(nèi)存里的數(shù)據(jù)進(jìn)行操作,所以Spark會(huì)使用state-of-the-art編譯技術(shù)去提升執(zhí)行效率??偠灾?,他的作用類似編譯器。
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee