0. Overview
后面將寫幾篇文章介紹一下 OLAP 的大數(shù)據(jù)系統(tǒng)架構(gòu)。這里的 Druid 不是阿里巴巴的連接池,而是用于大數(shù)據(jù)實(shí)時(shí) OLAP 的 Druid。
OLAP 和 OLTP 經(jīng)常被拿到一起來討論。其中 OLAP 的全稱是 On-Line Analytical Processing,OLTP 的全稱是 On-Line Transaction Processing。網(wǎng)上分析對(duì)比這兩種系統(tǒng)的討論很多都是長篇累牘,其實(shí)從系統(tǒng)角度來看 OLAP 和 OLTP 的最大區(qū)別無非是下面幾點(diǎn):
| OLTP | OLAP | |
|---|---|---|
| 查詢數(shù)據(jù) | 少 | 多 |
| 延遲 | 要求延遲低 | 可以容忍較高延遲 |
| 數(shù)據(jù)量 | 小 | 大 |
| 并發(fā)訪問 | 大 | 小 |
OLTP 對(duì)應(yīng)常見的關(guān)系型數(shù)據(jù)庫,比如 MySQL 等。OLAP 又分實(shí)時(shí) OLAP 和離線 OLAP。大數(shù)據(jù)的一些架構(gòu),比如常見 Hive + Hadoop,SparkSQL + HDFS,Kylin 等就是離線 OLAP,而一些監(jiān)控告警系統(tǒng)這種對(duì)實(shí)時(shí)性要求比較高的系統(tǒng)就是實(shí)時(shí) OLAP。
Druid 就屬于實(shí)時(shí) OLAP。我們是從去年的差不多這個(gè)時(shí)間開始使用 Druid 從 0 到 1 搭建了我們的實(shí)時(shí) OLAP 系統(tǒng),這套系統(tǒng)目前在線上運(yùn)行半年,單個(gè) DataSource 攝入的數(shù)據(jù)在百億級(jí)別。在這個(gè)過程中遇到很多問題,也發(fā)現(xiàn)了 Druid 的一些局限性,這篇文章先簡答介紹一下 Druid 的架構(gòu)和一些特性,下一篇文章再介紹一些實(shí)踐,可能不是最佳的。
1. 特性
Druid 很早就進(jìn)入了 Apache 孵化器,但是現(xiàn)在還沒有畢業(yè)。官網(wǎng):https://druid.apache.org,Github: https://github.com/apache/incubator-druid
根據(jù)官方文檔,Druid 的核心特性主要包括:
- 列式存儲(chǔ)。列式存儲(chǔ)的優(yōu)勢(shì)在于查詢的時(shí)候可以只返回指定的列的數(shù)據(jù),其次同一列數(shù)據(jù)往往具有很多共性,這帶來另一個(gè)好處就是存儲(chǔ)的時(shí)候壓縮效果比較好。
- 可擴(kuò)展的分布式架構(gòu)。
- 并行計(jì)算。
- 數(shù)據(jù)攝入支持實(shí)時(shí)和批量。這里的實(shí)時(shí)的意思是輸入攝入即可查。如果大家看過我之前關(guān)于實(shí)時(shí)計(jì)算的文章,應(yīng)該猜到了這就是典型的 lambda 架構(gòu),后面再細(xì)說。
- 運(yùn)維友好。
- 云原生架構(gòu),高容錯(cuò)性。
- 支持索引,便于快速查詢。
- 基于時(shí)間的分區(qū)
- 自動(dòng)聚合。
不知道官方是不是為了刻意湊數(shù),正好十條。其中很多特性其實(shí)應(yīng)該算是 OLAP 系統(tǒng)的共同特性,比如列式存儲(chǔ)等。當(dāng)時(shí)我選型使用 Druid 的時(shí)候,其實(shí)最吸引我的主要是下面三條:
- 實(shí)時(shí)攝取可查詢。換句話說就是數(shù)據(jù)查詢無延遲,這個(gè)在一些對(duì)實(shí)時(shí)性要求比較高的場(chǎng)景下,比如監(jiān)控告警,還是很重要的。
- 自動(dòng)實(shí)時(shí)聚合。
- 高效的索引結(jié)構(gòu)便于查詢。
2. 架構(gòu)
Druid 的架構(gòu)在我看來還是比較復(fù)雜的,包含 6 個(gè)不同的組件。
Coordinator:顧名思義,Coordinator 就是協(xié)調(diào)器,主要負(fù)責(zé) segment 的分發(fā)等。比如我們只保存 30 天的數(shù)據(jù),這個(gè)規(guī)則就是由 Coordinator 來定時(shí)執(zhí)行的。
Overlord:處理數(shù)據(jù)攝入的 task,將 task 提交到 MiddleManager。比如使用 Tranquility 做數(shù)據(jù)攝入的時(shí)候,每個(gè) segment 都會(huì)生成一個(gè)對(duì)應(yīng)的 task。
Broker : 處理外部請(qǐng)求,并對(duì)結(jié)果進(jìn)行匯總。
Router : Router 相當(dāng)于多個(gè) Broker 前面的路由,不是必須的。
-
Historical :Historical 可以理解為將 segment 存儲(chǔ)到本地,相當(dāng)于 cache。相比于 Deep Storage 的,Historical 將 segment 直接存儲(chǔ)到本地磁盤,只有 segment 存儲(chǔ)到本地才能被查詢。其實(shí)這個(gè)地方是有點(diǎn)異于直觀感受的。正常我們可能會(huì)認(rèn)為查詢先查本地,如果本地沒有數(shù)據(jù)才去查 Deep Storage,但是實(shí)際上如果本地沒有相應(yīng)的 segment,則查詢是無法查詢的。
Historical 處理那些 segment 是由 Coordinator 指定的,但是 Historical 并不會(huì)和 Coordinator 直接交互,而是通過 Zookeeper 來解耦。
MiddleManager : MiddleManager 可以認(rèn)為是一個(gè)任務(wù)調(diào)度進(jìn)程,主要用來處理 Overload 提交過來的 task。每個(gè) task 會(huì)以一個(gè) JVM 進(jìn)程啟動(dòng)。
各個(gè)組件之間的交互如下:

根據(jù)線條,上圖主要關(guān)注三個(gè)部分:
Queries: Routers 將請(qǐng)求路由到 Broker,Broker 向 MiddleManager 和 Historical 進(jìn)行數(shù)據(jù)查詢。這里 MiddleManager 主要負(fù)責(zé)查詢正在進(jìn)行攝入的數(shù)據(jù)查詢,比如現(xiàn)在正在攝入 12:00 ~ 13:00 的數(shù)據(jù),那么我們要查詢就去查詢 MiddleManager,MiddleManager 再將請(qǐng)求分發(fā)到具體的 peon,也就是 task 的運(yùn)行實(shí)體上。而歷史數(shù)據(jù)的查詢是通過 Historical 查詢的,然后數(shù)據(jù)返回到 Broker 進(jìn)行匯總。這里需要注意的時(shí)候數(shù)據(jù)查詢并不會(huì)落到 Deep Storage 上去,也就是查詢的數(shù)據(jù)一定是 cache 到本地磁盤的。很多人一個(gè)直觀理解查詢的過程應(yīng)該是先查詢 Historical,Historical 沒有這部分?jǐn)?shù)據(jù)則去查 Deep Storage。Druid 并不是這么設(shè)計(jì)的。
Data/Segment: 這里包括兩個(gè)部分,MiddleManager 的 task 在結(jié)束的時(shí)候會(huì)將數(shù)據(jù)寫入到 Deep Storage,這個(gè)過程一般稱作 Segment Handoff。然后 Historical 定期的去下載 Deep Storage 中的 segment 數(shù)據(jù)到本地。
-
Metadata: Druid 的元數(shù)據(jù)主要存儲(chǔ)到兩個(gè)部分,一個(gè)是 Metadata Storage,這個(gè)一般是 MySQL 等關(guān)系型數(shù)據(jù)庫;另一個(gè)是 Zookeeper。下圖是 Druid 在 Zookeeper 中的 znode。zk 的作用主要是用來給各個(gè)組件進(jìn)行解耦。
zk.png
3. 數(shù)據(jù)存儲(chǔ)
Druid 的數(shù)據(jù)存儲(chǔ)單位是 segment,segment 按時(shí)間粒度(可以通過參數(shù) segmentGranularity 指定)劃分。每個(gè) segment 會(huì)被存儲(chǔ)到 Deep Storage 和 Historical 進(jìn)程所在的節(jié)點(diǎn)上,當(dāng)然 segment 可以是有多個(gè)備份的,這樣查詢的時(shí)候就可以實(shí)現(xiàn)并行查詢,并不是為了高可用,高可用通過 Deep Storage 保證。
Druid 的數(shù)據(jù)格式如下:

分成三個(gè)部分:
- Timestamp:時(shí)間戳信息
- Dimension:維度信息
- Metrics: 一般是數(shù)值型
Druid 會(huì)自動(dòng)對(duì)數(shù)據(jù)進(jìn)行 Rollup,也就是聚合。如果時(shí)間粒度是一小時(shí),那么在這一個(gè)小時(shí)內(nèi)維度相同的數(shù)據(jù)會(huì)被合并為一條,Timestamp 都變成整點(diǎn),metrics 會(huì)根據(jù)聚合函數(shù)進(jìn)行聚合,比如 sum, max, min 等,注意是沒有平均 avg 的。Timestamp 和 Metrics 直接壓縮存儲(chǔ)即可,比較簡單。下面重點(diǎn)說一下維度的存儲(chǔ)。
Druid 的一大亮點(diǎn)就是支持多維度實(shí)時(shí)聚合查詢,簡單來說就是 filter 和 group。而實(shí)現(xiàn)這個(gè)特性的關(guān)鍵技術(shù)主要兩點(diǎn):bitmap + 倒排。
首先,Druid 會(huì)將維度值編碼映射成數(shù)字 ID,類似數(shù)據(jù)倉庫中的維度表,主要是為了存儲(chǔ)節(jié)省空間。比如上面圖中的 Page 維度:Justin Bieber 被編碼成 0,Ke$ha 被編碼成 1。對(duì)于 Username 維度:Boxer -> 0,Reach -> 1,Helz -> 0,Xeno -> 1。
然后 Page 這列數(shù)據(jù)就會(huì)被存儲(chǔ)為 [0,0,1,1]。
最后是位圖,用來表示對(duì)于某個(gè)維度的某個(gè)值,有哪些列包含了這個(gè)值,比如:
- Justin Bieber: [1,1,0,0]
- Boxer: [1,0,0,0]
那么 filter 查詢 Page='Justin Bieber' and Username='Boxer',直接將 1100 和 1000 做位運(yùn)算 and 即可。group 也是類似。
上面的位圖,其實(shí)也是一種倒排,常規(guī)的倒排后面的 list 中直接包含的是 Document ID,這里直接表示成位圖,其實(shí)是異曲同工。
4. 數(shù)據(jù)攝入
前面簡單提到 Druid 的數(shù)據(jù)攝入支持實(shí)時(shí)流模式和批模式,也就是典型的 Lambda 架構(gòu)。Lambda 架構(gòu)簡單來說就兩點(diǎn):
- 通過實(shí)時(shí)處理保證實(shí)時(shí)性
- 通過批處理保證數(shù)據(jù)完整性和準(zhǔn)確性
如果看過我之前的關(guān)于 Google DataFlow 的文章,當(dāng)時(shí)作者就大肆批評(píng)了 lambda 架構(gòu),然后在 Google 內(nèi)部是通過 MillWheel 支持 exactly-once 語義來避免了 lambda 架構(gòu)。在 druid 中的數(shù)據(jù)攝入官方支持了多種方式,關(guān)于各種方式的對(duì)比可以用如下一個(gè)圖來概括。

關(guān)于上圖中的 ”Can handle late data“ 做一下簡單說明,我們上面在數(shù)據(jù)存儲(chǔ)一節(jié)有說到 Druid 的底層存儲(chǔ)使用了 segment 結(jié)構(gòu)。舉個(gè)例子,如果時(shí)間粒度是 1 個(gè)小時(shí),那么 12:00 ~ 13:00 的數(shù)據(jù)就會(huì)存儲(chǔ)到一個(gè) segment 里面。但是這里有一個(gè)小問題需要考慮一下,就是這個(gè) segment 的數(shù)據(jù)什么時(shí)候 ready 我怎么知道呢?這個(gè)在流處理中一種常規(guī)的做法是 watermark,簡單來說就設(shè)置一個(gè)可以接受的時(shí)間延遲,比如 5 分鐘,那么 12:00 ~ 13:00 會(huì)一直接受數(shù)據(jù)直到 13:05,然后之后這個(gè) segment 就會(huì)被 handoff 掉,12:00 ~ 13:00 之間的數(shù)據(jù)就不再接受了。這個(gè)過程就叫做 ”handle late data“。然后我們發(fā)現(xiàn)上圖中 Tranquility 是不支持 late data 處理的,這個(gè)是需要特別注意的。
從上圖我們可以看到 Native batch 和 Hadoop 都對(duì)應(yīng)了 Lambda 架構(gòu)中的批處理,而 Tranquility 則對(duì)應(yīng)了 Lambda 架構(gòu)中實(shí)時(shí)處理,是一種 push 的方式。然后這里還有一種方式叫 Kafka Indexing Service,這種方式通過 pull 的方式來攝取數(shù)據(jù),我們也可以看到通過 Kafka Indexing Service 這一種服務(wù)其實(shí)就可以完成數(shù)據(jù)攝取并滿足所有需求,不然就要通過兩種方式聯(lián)合使用。但是使用 Kafka Indexing Service 的最大問題就是和 Kafka 強(qiáng)耦合。
因?yàn)槲覀兊臉I(yè)務(wù)是在阿里云公有云上,然后所有數(shù)據(jù)采集都使用了阿里云的日志服務(wù)(SLS)來處理的,所以這里我們并不能使用 Kafka index。這里我們使用的方式是 Tranquility + Hadoop 的方式來進(jìn)行數(shù)據(jù)攝取。單個(gè) DataSource 的數(shù)據(jù)攝入量達(dá)到百億級(jí)別。
5. 查詢
5.1 Natvie
Druid 最開始的時(shí)候是不支持 SQL 查詢的,原生查詢是通過查詢 Broker 提供的 http server 來實(shí)現(xiàn)的,如下:
curl -X POST '<queryable_host>:<port>/druid/v2/?pretty' -H 'Content-Type:application/json' -H 'Accept:application/json' -d @<query_json_file>
下面是一個(gè)簡單的 json 查詢示例。
{
"queryType": "timeseries",
"dataSource": "sample_datasource",
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ],
"granularity": "day",
"aggregations": [
{ "type": "longSum", "name": "sample_name1", "fieldName": "sample_fieldName1" },
{ "type": "doubleSum", "name": "sample_name2", "fieldName": "sample_fieldName2" }
],
"context": {
"grandTotal": true
}
}
同時(shí)社區(qū)也提供了很多種語言的 client 用來做 Druid 的查詢,比如我們使用的 Java 的 client zapr/druidry ,關(guān)于更多語言的 client,可以參考這里 client libraries 。
Druid 的查詢類型有下面幾種:
-
聚合查詢(Aggregation Queries):
- Timeseries : 可以簡單理解為性能更好的 select。
- TopN : TopN 相當(dāng)于 GroupBy 加 Ordering,相同的查詢我們正常也可以通過 GroupBy 查詢來實(shí)現(xiàn),但是 TopN 的性能更好。TopN 的底層實(shí)現(xiàn)也是比較直觀的,將并行查詢的每個(gè)查詢的結(jié)果的 TopK 結(jié)果返回給 Broker,由 Broker 進(jìn)行聚合匯總。注意這里返回的結(jié)果是 K 條記錄,而不是 N 條記錄,K 默認(rèn)值為 max(1000, threshold) 決定(threshold 由用戶指定,就相當(dāng)于 TopN 中的 N)。
- GroupBy : GroupBy。
-
元數(shù)據(jù)查詢(Metadata Queries)
Druid 的元數(shù)據(jù)一般是存儲(chǔ)到 MySQL 中,包含一些 dataSource,segment 的元信息。
metadata.png
Druid 提供的元數(shù)據(jù)查詢有下面三種
Time Boundary: 用來查詢查詢指定模式的數(shù)據(jù)第一次出現(xiàn)和最近一次出現(xiàn)的時(shí)間。
Segment Metadata:返回 segment 的元信息,包括維度信息等。
Datasource Metadata:返回 dataSource 的元信息。
-
搜索查詢(Search Queries)
范圍查詢 (Scan):scan 的結(jié)果是以流模式返回的,也就是 client 真正讀取的時(shí)候才會(huì)占用內(nèi)存。
Select: 官方已經(jīng)不建議使用 Select 查詢。這里就不在介紹了。
Druid 的底層存儲(chǔ)由于是使用時(shí)間來做分片的,所以查詢的時(shí)候一定需要帶上時(shí)間區(qū)間。
我在上面說過一次 Druid 的 Rollup 不支持 average,也就是平均值,那么如果我查詢的時(shí)候要查詢平均值應(yīng)該怎么做呢?(其實(shí)查詢平均值是一個(gè)非常常見的需求,關(guān)于為了 Druid 的 Rollup 不支持 average,歡迎留言討論。)
答案是 postaggregate,druid 在查詢的時(shí)候可以定義聚合操作,是查詢的時(shí)候直接計(jì)算的。同時(shí) druid 還提供了針對(duì)聚合后的值的聚合操作,叫做 postaggregate。一個(gè)簡單的查詢 json 文件示例。
{
"queryType": "timeseries",
"dataSource": "sample_datasource",
"granularity": "day",
"descending": "true",
"filter": {
"type": "and",
"fields": [
{ "type": "selector", "dimension": "sample_dimension1", "value": "sample_value1" },
{ "type": "or",
"fields": [
{ "type": "selector", "dimension": "sample_dimension2", "value": "sample_value2" },
{ "type": "selector", "dimension": "sample_dimension3", "value": "sample_value3" }
]
}
]
},
"aggregations": [
{ "type": "longSum", "name": "sample_name1", "fieldName": "sample_fieldName1" },
{ "type": "doubleSum", "name": "sample_name2", "fieldName": "sample_fieldName2" }
],
"postAggregations": [
{ "type": "arithmetic",
"name": "sample_divide",
"fn": "/",
"fields": [
{ "type": "fieldAccess", "name": "postAgg__sample_name1", "fieldName": "sample_name1" },
{ "type": "fieldAccess", "name": "postAgg__sample_name2", "fieldName": "sample_name2" }
]
}
],
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ]
}
5.2 SQL
SQL 在大數(shù)據(jù)系統(tǒng),尤其是 OLAP 中的重要性是不言而喻的。所以早期看到 Druid 不支持 SQL 查詢,我是非常詫異的,后面果不其然,Druid 還是推出了 SQL 查詢。這一層構(gòu)建與 Native 請(qǐng)求之上,也就是說 SQL 會(huì)被解釋成 Native 的查詢,然后去請(qǐng)求 Broker。
Druid SQL 解析基于 Apache Calcite,說起 Apache Calcite 是一個(gè)業(yè)界使用非常廣泛的 SQL 語法解析模塊,如果沒有記錯(cuò), Hive 使用的好像也是它。
Druid SQL 值得一提的是提供了非常多的 function,包括數(shù)值計(jì)算,字符串操作,時(shí)間操作等。舉個(gè)例子,其中一個(gè)字符串操作函數(shù)叫做 REGEXP_EXTRACT(expr, pattern, [index]) 對(duì) expr 做正則匹配,并提取特定的字段。使用這個(gè)函數(shù)可以做非常多的事情。但是 function 有的時(shí)候?qū)τ?SQL 的執(zhí)行計(jì)劃優(yōu)化并不是非常友好,不知道這里 Druid 團(tuán)隊(duì)是如何權(quán)衡的。
6. 其他
其他有一些值得討論的話題列在這里。
6.1 明細(xì)查詢
由于 Druid 會(huì)對(duì)存儲(chǔ)的數(shù)據(jù)做 Rollup,正常情況下是不能存儲(chǔ)明細(xì)的。但是如果是你一定需要明細(xì)的話,有個(gè)辦法就是將所有所有的列,包括 metric,都設(shè)置成 dimension,同時(shí)將聚合粒度設(shè)置到可以接受的粒度,比如秒。
6.2 高基數(shù)
這里的高基數(shù)指的是 Druid 的 Dimension 的值可能會(huì)有非常多的值,這樣引入一個(gè)問題就是存儲(chǔ)的時(shí)候會(huì)消耗比較大的空間,同時(shí)對(duì)于 CPU 的占用也會(huì)有一定程度的影響。
7. 總結(jié)
整體來看,Druid 算是一個(gè)優(yōu)秀的實(shí)時(shí) OLAP 系統(tǒng),雖然有一些地方設(shè)計(jì)的并不是盡善盡美,但是瑕不掩瑜。這篇文章簡單介紹一些 Druid 的整體情況,希望可以給使用 Druid 的同學(xué)做一些參考。下一篇文章將會(huì)介紹一下我們過去一年基于 Druid 的實(shí)踐情況以及一些踩過的坑。
最后有一個(gè)問題我非常好奇,橫向?qū)Ρ?Apache 基金會(huì)的項(xiàng)目,Druid 在很多方面都是可圈可點(diǎn)的,但是為什么現(xiàn)在還沒有從 Apache 畢業(yè),實(shí)在是令人困惑。

