作者:vivo互聯(lián)網(wǎng)技術(shù)-Shuai Guangying
在《探究Presto SQL引擎(1)-巧用Antlr》中,我們介紹了Antlr的基本用法以及如何使用Antlr4實現(xiàn)解析SQL查詢CSV數(shù)據(jù),更加深入理解Presto查詢引擎支持的SQL語法以及實現(xiàn)思路。
本次帶來的是系列文章的第2篇,本文梳理了Join的原理,以及Join算法在Presto中的實現(xiàn)思路。通過理論和實踐的結(jié)合,可以在理解原理的基礎(chǔ)上,更加深入理解Join算法在OLAP場景下的工程落地技巧,比如火山模型,列式存儲,批量處理等思想的應用。
一、背景
在業(yè)務開發(fā)中使用數(shù)據(jù)庫,通常會有規(guī)范不允許過多表的Join。例如阿里巴巴開發(fā)手冊中,有如下的規(guī)定:
【強制】超過三個表禁止Join。需要Join的字段,數(shù)據(jù)類型必須絕對一致;多表關(guān)聯(lián)查詢時,保證被關(guān)聯(lián)的字段需要有索引。說明:即使雙表Join也要注意表索引、SQL性能。
在大數(shù)據(jù)數(shù)倉的建設(shè)中,盡管我們有星型結(jié)構(gòu)和雪花結(jié)構(gòu),但是最終交付業(yè)務使用的大多是寬表。
可以看出業(yè)務使用數(shù)據(jù)庫中的一個矛盾點:我們需要Join來提供靈活的關(guān)聯(lián)操作,但是又要盡量避免多表和大表Join帶來的性能問題。這是為什么呢?
二、Join的基本原理
在數(shù)據(jù)庫中Join提供的語義是非常豐富的。簡單總結(jié)如下:

通常理解Join的實現(xiàn)原理,從Cross Join是最好的切入點,也就是所謂的笛卡爾積。對于集合進行笛卡爾積運算,理解非常簡單,就是窮舉兩個集合中元素所有的組合情況。在數(shù)據(jù)庫中,集合就對應到數(shù)據(jù)表中的所有行(tuples),集合中的元素就對應到單行(tuple)。所以實現(xiàn)Cross Join的算法也就呼之欲出了。
實現(xiàn)的代碼樣例如下:
List<Tuple> r = newArrayList(
new Tuple(newArrayList(1,"a")),
new Tuple(newArrayList(2,"b")));
List<Tuple> s = newArrayList(
new Tuple(newArrayList(3,"c")),
new Tuple(newArrayList(4,"d")));
int cnt =0;
for(Tuple ri:r){
for(Tuple si:s){
Tuple c = new Tuple().merge(ri).merge(si);
System.out.println(++cnt+": "+ c);
}
}
/**
* out:
1: [1, a, 3, c]
2: [1, a, 4, d]
3: [2, b, 3, c]
4: [2, b, 4, d]
*/
可以看出實現(xiàn)邏輯非常簡單,就是兩個For循環(huán)嵌套。
2.1 Nested Loop Join算法
在這個基礎(chǔ)上,實現(xiàn)Inner Join的第一個算法就順其自然了。非常直白的名稱:Nested Loop,實現(xiàn)關(guān)鍵點如下:

其中,θ操作符可以是:=, !=, <, >, ≤, ≥。
相比笛卡爾積的實現(xiàn)思路,也就是添加了一層if條件的判斷用于過濾滿足條件的組合。
對于Nested Loop算法,最關(guān)鍵的點在于它的執(zhí)行效率。假如參與Join的兩張表一張量級為1萬,一張量級為10w,那么進行比較的次數(shù)為1w*10w=10億次。在大數(shù)據(jù)時代,通常一張表數(shù)據(jù)量都是以億為單位,如果使用Nested Loop Join算法,那么Join操作的比較次數(shù)直接就是天文數(shù)字了。所以Nested Loop Join基本上是作為萬不得已的保底方案。Nested Loop這個框架下,常見的優(yōu)化措施如下:
小表驅(qū)動大表,即數(shù)據(jù)量較大的集作為于for循環(huán)的內(nèi)部循環(huán)。
一次處理一個數(shù)據(jù)塊,而不是一條記錄。也就是所謂的Block Nested Loop Join,通過分塊降低IO次數(shù),提升緩存命中率。
值得一提的是Nested Loop Join的思想雖然非常樸素,但是天然的具備分布式、并行的能力。這也是為什么各類NoSQL數(shù)據(jù)庫中依然保留Nested Loop Join實現(xiàn)的重要一點。雖然單機串行執(zhí)行慢,但是可以并行化的話,那就是加機器能解決的問題了。
2.2 Sort Merge Join算法
通過前面的分析可以知道,Nested Loop Join算法的關(guān)鍵問題在于比較次數(shù)過多,算法的復雜度為O(m*n),那么突破口也得朝著這個點。如果集合中的元素是有序的,比較的次數(shù)會大幅度降低,避免很多無意義的比較運算。對于有序的所以Join的第二種實現(xiàn)方式如下所描述:

通過將JOIN操作拆分成Sort和Merge兩個階段實現(xiàn)Join操作的加速。對于Sort階段,是可以提前準備好可以復用的。這樣的思想對于MySQL這類關(guān)系型數(shù)據(jù)庫是非常友好的,這也能解釋阿里巴巴開發(fā)手冊中要求關(guān)聯(lián)的字段必須建立索引,因為索引保證了數(shù)據(jù)有序。該算法時間復雜度為排序開銷O(m_log(m)+n_log(n))+合并開銷O(m+n)。但是通常由于索引保證了數(shù)據(jù)有序,索引其時間復雜度為O(m+n)。
2.3 Hash Join算法
Sort Merge Join的思想在落地中有一定的限制。所謂成也蕭何敗蕭何,對于基于Hadoop的數(shù)倉而言,保證數(shù)據(jù)存儲的有序性這個點對于性能影響過大。在海量數(shù)據(jù)的背景下,維護索引成本是比較大的。而且索引還依賴于使用場景,不可能每個字段都建一個索引。在數(shù)據(jù)表關(guān)聯(lián)的場景是大表關(guān)聯(lián)小表時,比如:用戶表(大表)--當日訂單表(小表);事實表(大表)–維度表(小表),可以通過空間換時間?;叵胍幌?,在基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)中,tree結(jié)構(gòu)和Hash結(jié)構(gòu)可謂數(shù)據(jù)處理的兩大法寶:一個保證數(shù)據(jù)有序方便實現(xiàn)區(qū)間搜索,一個通過hash函數(shù)實現(xiàn)精準命中點對點查詢效率高。
在這樣的背景下,通過將小表Hash化,實現(xiàn)Join的想法也就不足為奇了。

而且即使一張表在單機環(huán)境生成Hash內(nèi)存消耗過大,還可以利用Hash將數(shù)據(jù)進行切分,實現(xiàn)分布式能力。所以,在Presto中Join算法通常會選擇Hash Join,該算法的時間復雜度為O(m+n)。
通過相關(guān)資料的學習,可以發(fā)現(xiàn)Join算法的實現(xiàn)原理還是相當簡單的,排序和Hash是數(shù)據(jù)結(jié)構(gòu)最為基礎(chǔ)的內(nèi)容。了解了Join的基本思想,如何落地實踐出來呢?畢竟talk is cheap。在項目中實現(xiàn)Join之前,需要一些鋪墊知識。通常來說核心算法是皇冠上的明珠,但是僅有明珠是不夠的還需要皇冠作為底座。
三、Join工程化前置條件
3.1 SQL處理架構(gòu)-火山模型
在將Join算法落地前,需要先了解一下數(shù)據(jù)庫處理數(shù)據(jù)的基本架構(gòu)。在理解架構(gòu)的基礎(chǔ)上,才能將Join算法放置到合適的位置。在前面系列文章中探討了基于antlr實現(xiàn)SQL語句的解析??梢园l(fā)現(xiàn)SQL語法支持的操作類型非常豐富:查詢表(TableScan),過濾數(shù)據(jù)(Filter),排序(Order),限制(Limit),字段進行運算(Project), 聚合(Group),關(guān)聯(lián)(Join)等。為了實現(xiàn)上述的能力,需要一個具備并行化能力且可擴展的架構(gòu)。
1994年Goetz Graefe在論文《Volcano-An Extensible and Parallel Query Evaluation System》提出了一個架構(gòu)設(shè)計思想,這就是大名鼎鼎的火山模型,也稱為迭代模型?;鹕侥P推鋵嵃宋募到y(tǒng)和查詢處理兩個部分,這里我們重點關(guān)注查詢處理的設(shè)計思想。架構(gòu)圖如下:

簡單解讀一下:
職責分離:將不同操作獨立成一個的Operator,Operator采用open-next-close的迭代器模式。
例如對于SQL 。
SELECT Id, Name, Age, (Age - 30) * 50 AS Bonus
FROM People
WHERE Age > 30
對應到Scan, Select, Project三個Operator,數(shù)據(jù)交互通過next()函數(shù)實現(xiàn)。上述的理論在Presto中可以對應起來,例如Presto中幾個常用的Operator, 基本上是見名知意:

動態(tài)組裝:Operator基于SQL語句的解析實現(xiàn)動態(tài)組裝,多個Operator形成一個管道(pipeline)。
例如:print和predicate兩個operator形成一個管道:
在火山模型的基礎(chǔ)上,Presto吸收了數(shù)據(jù)庫領(lǐng)域的其他思想,對基礎(chǔ)的火山模型進行了優(yōu)化改造,主要體現(xiàn)在如下幾點:
- Operator數(shù)據(jù)處理優(yōu)化成一次一個Page,而不是一次行(也稱為tuple)。
- Page的存儲采用列式結(jié)構(gòu)。即相同的列封裝到一個Block中。
批量處理結(jié)合列式存儲奠定了向量化計算的基礎(chǔ)。這也是數(shù)據(jù)庫領(lǐng)域的優(yōu)化方向。
3.2 批量處理和列式存儲
在研讀Presto源碼時,幾乎到處都可以看到Page/Block的身影。所以理解Page/Block背后的思想是理解Presto實現(xiàn)機制的基礎(chǔ)。有相關(guān)書籍和文檔講解Page/Block的概念,但是由于這些概念是跟其他概念混在一起呈現(xiàn),導致一時間不容易理解。
筆者認為Type-Block-Page三者放在一起,更容易理解。我們使用數(shù)據(jù)庫,通常需要定義表,字段名稱,字段類型。在傳統(tǒng)的DBMS中,通常是按行存儲數(shù)據(jù),通常結(jié)構(gòu)如下:

但是通常OLAP場景不需要讀取所有的字段,基于這樣的場景,就衍生出來了列式存儲。就是我們看到的如下結(jié)構(gòu):

即每個字段對應一個Block, 多個Block的切面才是一條記錄,也就是所謂的行,在一些論文中稱為tuple。通過對比可以清楚看出Presto中,Page就是典型了列式存儲的實現(xiàn)。所以在Presto中,每個Type必然會關(guān)聯(lián)到一種Block。例如:bigint類型就對應著LongArrayBlockBuilder,varchar類型對應著VariableWidthBlock。
理解了原理,操作Page/Block就變得非常簡單了,簡單的demo代碼如下:
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.VarcharType;
import com.google.common.collect.Lists;
import io.airlift.slice.Slice;
import java.util.List;
import static io.airlift.slice.Slices.utf8Slice;
/**
* PageBlockDemo
*
* @version 1.0
* @since 2021/6/22 19:26
*/
public class PageBlockDemo {
private static Page buildPage(List<Type> types,List<Object[]> dataSet){
PageBuilder pageBuilder = new PageBuilder(types);
// 封裝成Page
for(Object[] row:dataSet){
// 完成一行
pageBuilder.declarePosition();
for (int column = 0; column < types.size(); column++) {
BlockBuilder out = pageBuilder.getBlockBuilder(column);
Object colVal = row[column];
if(colVal == null){
out.appendNull();
}else{
Type type = types.get(column);
Class<?> javaType = type.getJavaType();
if(javaType == long.class){
type.writeLong(out,(long)colVal);
}else if(javaType == Slice.class){
type.writeSlice(out, utf8Slice((String)colVal));
}else{
throw new UnsupportedOperationException("not implemented");
}
}
}
}
// 生成Page
Page page = pageBuilder.build();
pageBuilder.reset();
return page;
}
private static void readColumn(List<Type> types,Page page){
// 從Page中讀取列
for(int column=0;column<types.size();column++){
Block block = page.getBlock(column);
Type type = types.get(column);
Class<?> javaType = type.getJavaType();
System.out.print("column["+type.getDisplayName()+"]>>");
List<Object> colList = Lists.newArrayList();
for(int pos=0;pos<block.getPositionCount();pos++){
if(javaType == long.class){
colList.add(block.getLong(pos));
}else if(javaType == Slice.class){
colList.add(block.getSlice(pos,0,block.getSliceLength(pos)).toStringUtf8());
}else{
throw new UnsupportedOperationException("not implemented");
}
}
System.out.println(colList);
}
}
public static void main(String[] args) {
/**
* 假設(shè)有兩個字段,一個字段類型為int, 一個字段類型為varchar
*/
List<Type> types = Lists.newArrayList(BigintType.BIGINT, VarcharType.VARCHAR);
// 按行存儲
List<Object[]> dataSet = Lists.newArrayList(
new Object[]{1L,"aa"},
new Object[]{2L,"ba"},
new Object[]{3L,"cc"},
new Object[]{4L,"dd"});
Page page = buildPage(types, dataSet);
readColumn(types,page);
}
}
// 運行結(jié)果:
//column[bigint]>>[1, 2, 3, 4]
//column[varchar]>>[aa, ba, cc, dd]
將數(shù)據(jù)封裝成Page在各個Operator中流轉(zhuǎn),一方面避免了對象的序列化和反序列化成本,另一方面相比tuple的方式降低了函數(shù)調(diào)用的開銷。這跟集裝箱運貨降低運輸成本的思想是類似的。
四、Join算法的工程實踐
理解了Join的核心算法和基礎(chǔ)架構(gòu),結(jié)合前文中對antlr實現(xiàn)SQL表達式的解析以及實現(xiàn)where條件過濾,我們已經(jīng)具備了實現(xiàn)Join的基礎(chǔ)條件。接下來簡單講述一下Join算法的落地流程。首先在語法層面需要支持Join的語法,由于本文目的在于研究算法實現(xiàn)流程,而不在于實現(xiàn)完整的Join功能,因此我們暫且先考慮支持兩張表單字段的等值Join語法。
首先在語法上需要支持Join, 基于antlr語法的定義關(guān)鍵點如下:
querySpecification
: SELECT selectItem (',' selectItem)*
(FROM relation (',' relation)*)?
(WHERE where=booleanExpression)?
;
selectItem
: expression #selectSingle
;
relation
: left=relation
(
joinType JOIN rightRelation=relation joinCriteria
) #joinRelation
| sampledRelation #relationDefault
;
joinType
: INNER?
;
joinCriteria
: ON booleanExpression
;
上述的語法定義將Join的關(guān)鍵要素拆解得非常清晰:Join的左表, Join的類型,Join關(guān)鍵詞, Join的右表, Join的關(guān)聯(lián)條件。例如,通常我們最簡單的Join語句用例如下(借用presto的tpch數(shù)據(jù)源):
select t2.custkey, t2.phone, t1.orderkey from orders t1 inner join customer t2 on t1.custkey=t2.custkey limit 10;
對應著語法和SQL語句用例,可以看到在將Join算法落地,還需要考慮如下細節(jié)點:
檢測SQL語句,確保SQL語句符合語法要求。
梳理表的別名和字段的對應關(guān)系,確保查詢的字段和表能夠?qū)饋恚琂oin條件的字段類型能夠匹配。
Join算法的選取,是HashJoin還是NestedLoopJoin還是SortMergeJoin?
哪個表是build表,哪個表是probe表?
Join條件的判斷如何實現(xiàn)?
整個查詢涉及到Operator如何組裝,以實現(xiàn)最終結(jié)果的輸出?
我們回顧一下SQL執(zhí)行的關(guān)鍵流程:

基于上面的流程,問題其實已經(jīng)有了答案。
Parser:借助antlr的能力即可實現(xiàn)SQL語法的檢測。
Binding:基于SQL語句生成AST,利用元數(shù)據(jù)檢測字段和表的映射關(guān)系以及Join條件的字段類型。
Planner:基于AST生成查詢計劃。
Executor:基于查詢計劃生成對應的Operator并執(zhí)行。
以NestedLoop Join算法為例,了解一下Presto的實現(xiàn)思路。對于NestedLoopJoin Join算法的落地,在Presto中其實是拆解為兩個階段:組合階段和過濾階段。在實現(xiàn)JoinOperator時,只需負責兩個表數(shù)據(jù)的笛卡爾積組合即可。核心代碼如下:
// NestedLoopPageBuilder中實現(xiàn)兩個Page計算笛卡爾積的處理邏輯,這里RunLengthEncodedBlock用于一個元素復制,典型地笛卡爾積計算中需要將一列元素從1行復制成多行。
@Override
public Page next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (noColumnShortcutResult >= 0) {
rowIndex = maxRowIndex;
return new Page(noColumnShortcutResult);
}
rowIndex++;
// Create an array of blocks for all columns in both pages.
Block[] blocks = new Block[numberOfProbeColumns + numberOfBuildColumns];
// Make sure we always put the probe data on the left and build data on the right.
int indexForRleBlocks = buildPageLarger ? 0 : numberOfProbeColumns;
int indexForPageBlocks = buildPageLarger ? numberOfProbeColumns : 0;
// For the page with less rows, create RLE blocks and add them to the blocks array
for (int i = 0; i < smallPage.getChannelCount(); i++) {
Block block = smallPage.getBlock(i).getSingleValueBlock(rowIndex);
blocks[indexForRleBlocks] = new RunLengthEncodedBlock(block, largePage.getPositionCount());
indexForRleBlocks++;
}
// Put the page with more rows in the blocks array
for (int i = 0; i < largePage.getChannelCount(); i++) {
blocks[indexForPageBlocks + i] = largePage.getBlock(i);
}
return new Page(largePage.getPositionCount(), blocks);
}
五、小結(jié)
本文簡單梳理了Join的基本算法以及在Presto中實現(xiàn)的基本框架,并以NestedLoop Join算法為例,演示了在Presto中的實現(xiàn)核心點??梢钥闯鱿啾仍嫉乃惴枋?,Presto的工程落地是截然不同: 不僅支持了所有的Join語義,而且實現(xiàn)了分布式能力。這其中有架構(gòu)層面的思考,也有性能層面的思考,非常值得探索跟研究。就Join算法,可以探索的點還有很多,比如多表Join的順序選取,大表跟小表Join的算法優(yōu)化,Semi Join的算法優(yōu)化,Join算法數(shù)據(jù)傾斜的問題等等,可謂路漫漫其修遠兮,將在后續(xù)系列文章中繼續(xù)分析探索。