Catalyst的工作流程:
-
Unresolved Logical Plan:
- SQL語句首先通過sql parser模塊被分詞, 形成select, where ,join等語句塊, 并將這些語句塊行成語法樹. 此棵樹稱為Unresolved Logical Plan
-
Logical Plan:
- 借助表的元數(shù)據(jù)將Unresolved Logical Plan解析為Logical Plan.
- 例如, 上一步的邏輯執(zhí)行框架有了基本骨架后, 系統(tǒng)并不知道tableA包括什么列, 列的數(shù)據(jù)類型, 表的數(shù)據(jù)格式(text,orc), 表的物理位置, sql中的函數(shù)指的哪個類等等; 這些在信息都保存在標的元數(shù)據(jù)中, 同時也能判斷sql語句的正確性
-
Optimized Logical Plan:
- 通過RBO, 將Logical Plan優(yōu)化為Optimized Logical Plan
- 這步優(yōu)化器是Catalyst最重要的部分(Optimizer類), RBO-指基于規(guī)則的優(yōu)化, 規(guī)則有很多種, 常見的有如下3種:
- 謂詞下推:
比如一個sql:
由于數(shù)據(jù)量的大小會顯著影響join的效率, 因此, 通過謂詞下推把join放在where的后面執(zhí)行. 先用where條件縮小表B的量, 再進行joinselect * from A join B on 條件1 where B.age>10;- 常量累加:
比如一個sql:
select x+1+2;查詢列會被優(yōu)化成x+3, 雖然改動很小, 但是優(yōu)化后每次操作都不會再去執(zhí)行1+2這個動作- 列值裁剪
有的sql語句可能并不會使用到一個表的所有列, 此時只需要用標的部分列參加運算即可, 這尤其在列式存儲的表中大大提高掃描性能(減少網(wǎng)絡(luò)內(nèi)存的數(shù)據(jù)量消耗)
- 謂詞下推:
-
Physical Plan:
- 得到的Optimized Logical Plan并不能被spark所理解, 此時需要轉(zhuǎn)換為Physical Plan
- 比如join算子, spark并不知道使用BroadcastJoin, 還是ShuffleHashJoin,還是SortMergeJoin; 物理執(zhí)行計劃就是在這些具體實現(xiàn)中跳出一個耗時最小的算法實現(xiàn), 這個過程涉及基于代價的優(yōu)化CBO. join通常有兩個選擇題要做
- 一個是上面說的選擇哪種join算法
- 另一個是join的順序選擇
對于星型模型和雪花模型來講, 不同的join順序意味著不同的執(zhí)行效率. 例如
A,B表都很大, 但是C表很小, 則AjoinB顯然需要大量的系統(tǒng)資源完成; 如果使用'A join C join B'的順序執(zhí)行, 因為C很小, 所以A join C會很快得到結(jié)果; 而小的結(jié)果集再去join B, 性能會顯而易見的比前一種方案好. 而這種join順序的選擇, 并沒有一個固定的規(guī)則來完成, 只有知道表的基礎(chǔ)信息(表的大小, 表的記錄總條數(shù)), 才能從中選擇一條代價最小的語法樹來執(zhí)行. 即CBO的核心在于評估處一條給定語法樹的實際執(zhí)行代價A join B join C
CBO的實現(xiàn)思路
-
采集原始表的基本信息
這個操作是CBO最基礎(chǔ)的一項工作,采集的主要信息包括表級別指標和列級別指標,如下所示,estimatedSize和rowCount為表級別信息,basicStats和Histograms為列級別信息,后者粒度更細,對優(yōu)化更加重要。- estimatedSize: 每個LogicalPlan節(jié)點輸出數(shù)據(jù)大?。ń鈮海?/li>
- rowCount: 每個LogicalPlan節(jié)點輸出數(shù)據(jù)總條數(shù)
- basicStats: 基本列信息,包括列類型、Max、Min、number of nulls, number of distinct values, max column length, average column length等
- Histograms: Histograms of columns, i.e., equi-width histogram (for numeric and string types) and equi-height histogram (only for numeric types).
這些指標是一些統(tǒng)計指標, 因此需要單獨執(zhí)行統(tǒng)計, 最好在業(yè)務(wù)低峰期, 對表數(shù)據(jù)有較大的變化的表單獨統(tǒng)計. hive通過analyze命令對表的數(shù)據(jù)信息進行統(tǒng)計
定義核心算子的基數(shù)推導規(guī)則
假如sql中有where條件語句"where cid>N", 如何推導出經(jīng)過這個條件過濾后的中間表的基本統(tǒng)計信息?
第一步的原始表基本信息中, 其中有一項是列值分布的Histograms(直方圖), 可以從直方圖中粗略找到cid大于N的記錄有多少條
<img src="img/cbohistogram,png.png">-
核心算子實際代價計算
通常來講,節(jié)點實際執(zhí)行代價主要從兩個維度來定義:CPU Cost以及IO Cost。為后續(xù)講解方便起見,需要先行定義一些基本參數(shù):- Hr:從HDFS上讀取1byte數(shù)據(jù)所需代價
- Hw:往HDFS上寫入1byte數(shù)據(jù)所需代價
- Tr:數(shù)據(jù)總條數(shù)(the number of tuples in the relation )
- Tsz:數(shù)據(jù)平均大小(Average size of the tuple in the relation )
- CPUc:兩值比較所需CPU資源代價(CPU cost for a comparison in nano seconds )
- NEt:1byte數(shù)據(jù)通過網(wǎng)絡(luò)在集群節(jié)點間傳輸花費代價(the average cost of transferring 1 byte over network in the Hadoop cluster from any node to any node )
- ……
上文說過,每種算子的實際執(zhí)行代價計算方式都不同,挑兩個比較簡單、容易理解的來分析,第一個是Table Scan算子,第二個是Hash Join算子。
-
Table Scan算子
直觀上來講這類算子只有IO Cost,CPU Cost為0。Table Scan Cost = IO Cost = Tr * Tsz * Hr,很簡單,Tr * Tsz表示需要scan的數(shù)據(jù)總大小,再乘以Hr就是所需代價。OK,很直觀,很簡單。 -
Hash Join算子
以Broadcast Hash Join為例(小表構(gòu)建hash桶,大表負責探測),假設(shè)大表分布在n個節(jié)點上,每個節(jié)點的數(shù)據(jù)條數(shù)\平均大小分別為Tr(R1)\Tsz(R1),Tr(R2)\Tsz(R2), … Tr(Rn)\Tsz(Rn),小表數(shù)據(jù)條數(shù)為Tr(Rsmall)\Tsz(Rsmall),那么CPU代價和IO代價分別為:- CPU Cost = 小表構(gòu)建Hash Table代價 + 大表探測代價 = Tr(Rsmall) * CPUc + (Tr(R1) + Tr(R2) + … + Tr(Rn)) * N * CPUc,此處假設(shè)HashTable構(gòu)建所需CPU資源遠遠高于兩值簡單比較代價,為N * CPUc
- IO Cost = 小表scan代價 + 小表廣播代價 + 大表scan代價 = Tr(Rsmall) * Tsz(Rsmall) * Hr + n * Tr(Rsmall) * Tsz(Rsmall) * NEt + (Tr(R1)* Tsz(R1) + … + Tr(Rn) * Tsz(Rn)) * Hr
選擇最優(yōu)執(zhí)行路徑(代價最小的執(zhí)行路徑)
通常使用動態(tài)規(guī)劃, 從各種執(zhí)行路徑中找出代價最小的執(zhí)行路徑