前言
Elasticsearch不僅是強(qiáng)大的全文搜索引擎,在很多場(chǎng)景下(特別是TiDB、ShardingSphere等框架成熟之前)也被當(dāng)做分布式HTAP數(shù)據(jù)庫使用,在存儲(chǔ)、更新海量數(shù)據(jù)的同時(shí),提供高效的點(diǎn)查和部分聚合查詢能力。StarRocks從3.1版本開始支持Elasticsearch Catalog,極大方便了ES數(shù)據(jù)的聯(lián)邦查詢。本文簡要分析其原理,并提出一個(gè)小問題和對(duì)應(yīng)的臨時(shí)解決方法。
元數(shù)據(jù)獲取階段
當(dāng)用戶創(chuàng)建一個(gè)ES Catalog時(shí),本質(zhì)是創(chuàng)建了ElasticsearchConnector和它對(duì)應(yīng)的ElasticsearchMetadata,后者持有該Catalog的全部配置信息和訪問ES集群的EsRestClient。這點(diǎn)和2.x版本中舊有的ES外表不同,每張ES外表都會(huì)對(duì)應(yīng)一個(gè)EsRestClient,會(huì)導(dǎo)致目標(biāo)ES集群的HTTP連接數(shù)比正常偏多,ES Catalog則基本不存在這個(gè)問題。
每個(gè)ES Catalog只有一個(gè)默認(rèn)數(shù)據(jù)庫default_db,以下則是ES實(shí)例中的索引,在FE中稱為EsTable,相當(dāng)于復(fù)用了原ES外表的實(shí)現(xiàn)(當(dāng)然ES Catalog會(huì)自動(dòng)獲取并推斷字段,無需自己建表)。每個(gè)EsTable對(duì)象都持有一個(gè)EsMetaStateTracker用于同步元數(shù)據(jù),其中又分為3個(gè)階段(phase),按順序分別為:

- VersionPhase:通過
GET /請(qǐng)求獲取ES集群的版本號(hào); - MappingPhase:通過
GET /indexName/_mapping請(qǐng)求獲取索引的Mapping信息,同時(shí)解析keyword類型字段(包括text內(nèi)嵌的keyword)和存在doc_values的字段(即允許排序、聚合的字段),并存入上下文; - PartitionPhase:通過
GET /indexName/_search_shards請(qǐng)求獲取索引的分片信息,再通過GET /_nodes/http請(qǐng)求獲取ES集群數(shù)據(jù)節(jié)點(diǎn)的地址,將分片ID和所在節(jié)點(diǎn)的映射關(guān)系存入EsShardPartitions容器。
FE計(jì)劃階段
ES Catalog查詢對(duì)應(yīng)的物理節(jié)點(diǎn)是EsScanNode,在生成Fragment的過程中除了維護(hù)Catalog的信息外,還會(huì)負(fù)責(zé)計(jì)算ScanRangeLocation,即每個(gè)BE節(jié)點(diǎn)負(fù)責(zé)請(qǐng)求的ES分片的對(duì)應(yīng)關(guān)系,同時(shí)會(huì)盡量做colocate分配,使得BE節(jié)點(diǎn)和請(qǐng)求的ES分片所在節(jié)點(diǎn)是同一個(gè)(當(dāng)然實(shí)際部署中這種情況不多見)。另外執(zhí)行EXPLAIN語句時(shí),會(huì)打印查詢謂詞翻譯出來的ES DSL,如下所示。注意這個(gè)DSL只是示意作用,實(shí)際執(zhí)行時(shí)BE會(huì)重新生成一次。
MySQL [default_db]> EXPLAIN SELECT id,waybillCode,orderTime FROM realtimewaybillmonitor_202409 WHERE yn <= 0 AND orderTime >= hours_sub(now(), 1) AND waybillCode LIKE 'JDX%' AND length(sku) > 3 LIMIT 1000;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:13: id | 130: waybillCode | 71: orderTime |
| PARTITION: UNPARTITIONED |
| |
| RESULT SINK |
| |
| 2:EXCHANGE |
| limit: 1000 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 02 |
| UNPARTITIONED |
| |
| 1:Project |
| | <slot 13> : 13: id |
| | <slot 71> : 71: orderTime |
| | <slot 130> : 130: waybillCode |
| | limit: 1000 |
| | |
| 0:EsScanNode |
| TABLE: realtimewaybillmonitor_202409 |
| PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%', length(14: sku) > 3 |
| LOCAL_PREDICATES: length(14: sku) > 3 |
| REMOTE_PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%' |
| ES_QUERY_DSL: {"bool":{"must":[{"range":{"yn":{"lte":0}}},{"range":{"orderTime":{"gte":"2024-09-26 15:46:17"}}},{"wildcard":{"waybillCode":"JDX*"}}]}} |
| ES index/type: realtimewaybillmonitor_202409/realtimewaybillmonitor |
| limit: 1000 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
可見上述查詢的前三個(gè)謂詞都可以下推到ES,但是第四個(gè)謂詞無法下推,需要將結(jié)果拉取到SR端再進(jìn)行過濾。
BE執(zhí)行階段
BE接收到前述EsScanNode后,將能夠下推到ES的謂詞封裝為EsPredicate,分為幾種情況:
- 二元謂詞,且一側(cè)需為字面量,形如
yn <= 0和orderTime >= hours_sub(now(), 1)(右側(cè)可以做常量折疊)都符合條件; - 函數(shù)調(diào)用謂詞,支持
esquery()(直接透?jìng)鱀SL的SR內(nèi)置函數(shù))、IS NULL、IS NOT NULL和LIKE,其他的均無法下推。即如果把上述示例的waybillCode LIKE 'JDX%'改成starts_with(waybillCode, 'JDX') = 1,這個(gè)條件就不能下推了; -
IN和NOT IN謂詞,對(duì)應(yīng)terms query,簡單直接; - 復(fù)合的
AND謂詞,實(shí)際上是對(duì)以上三種情況的組合做分解。
下推到ES的謂詞會(huì)從謂詞列表中刪除。接下來每個(gè)BE會(huì)分別創(chuàng)建ESScanReader以掃描ES數(shù)據(jù),這里需要注意,如果不是所有謂詞都下推到了ES(即謂詞列表中還有剩余),那么為了保證結(jié)果準(zhǔn)確,原始查詢中的LIMIT子句也不能下推。
上一節(jié)中的查詢實(shí)際生成的DSL JSON如下所示。如果無法命中doc_values,則會(huì)改用source查詢。
{
"query": {
"bool": {
"filter": [{
"bool": {
"should": [{
"range": {
"yn": {
"lte": "0"
}
}
}]
}
}, {
"bool": {
"should": [{
"range": {
"orderTime": {
"gte": "1727336859000"
}
}
}]
}
}, {
"bool": {
"should": [{
"wildcard": {
"waybillCode": "JDX*"
}
}]
}
}]
}
},
"stored_fields": "_none_",
"docvalue_fields": ["waybillCode", "orderTime", "yn", "id", "sku"],
"sort": ["_doc"],
"size": 4096
}
正式執(zhí)行查詢時(shí),又分為兩種情況。
- 若
LIMIT子句下推到了ES,那么BE會(huì)認(rèn)為這是一個(gè)"exactly-once"的查詢(代碼中如此),可以類比流式處理引擎中exactly-once的含義,即“只查詢一次就可以了”。此時(shí)組裝的搜索請(qǐng)求URL形如{target}/{index}/{type}/_search?terminate_after={limit}&preference=_shards:{shards}&{filter_path}。 - 若沒有
LIMIT子句下推到ES,則需要執(zhí)行Scroll查詢,分頁獲取結(jié)果。此時(shí)組裝的搜索請(qǐng)求URL形如{target}/{index}/{type}/_search?scroll={keep_alive}&preference=_shards:{shards}&{filter_path}。Scroll上下文的TTL由BE參數(shù)es_scroll_keepalive設(shè)定,默認(rèn)是5m。
接下來ESScanReader每次請(qǐng)求上述URL獲取一批數(shù)據(jù),調(diào)用超時(shí)由BE參數(shù)es_http_timeout_ms設(shè)定,默認(rèn)是5000(即5秒),在網(wǎng)絡(luò)環(huán)境欠佳時(shí),應(yīng)適當(dāng)調(diào)大。獲取到的數(shù)據(jù)經(jīng)過JSON解析,獲取到doc_values或者_source,逐行填充到Chunk中(沒有值的則填充默認(rèn)值)。這里實(shí)際上可以優(yōu)化為按列填充,代碼中也有相應(yīng)的TODO標(biāo)記。
ES數(shù)組類型的問題
ES沒有顯式的數(shù)組類型,當(dāng)某字段插入了多個(gè)值時(shí),它會(huì)自然地變?yōu)閿?shù)組類型,但在索引Mapping中無法直接區(qū)分該字段是否為數(shù)組。在我們的歷史ES集群中,有大量ES索引含有實(shí)際為數(shù)組的字段,使用SR ES Catalog查詢時(shí)則會(huì)拋出異?;蛑环祷氐谝粋€(gè)值,影響體驗(yàn)。這里提出一個(gè)不優(yōu)雅的臨時(shí)解決方案,在Catalog參數(shù)中增加array_fields配置項(xiàng),讓用戶創(chuàng)建ES Catalog時(shí)手動(dòng)指定數(shù)組字段。
// Fields that should be treated as arrays when building Elasticsearch external table.
// Since Elasticsearch makes no distinction between scalar and array types, we should manually specify them.
// The format is: `field1,index2:field2...`
// which means `field1` in all indices and `field2` in `index2` are arrays.
@Config(key = KEY_ARRAY_FIELDS,
desc = "Fields that should be treated as arrays when building Elasticsearch external table. " +
"The format is: `field1,index2:field2,...`.",
defaultValue = "")
private String arrayFields;
然后在ElasticsearchMetadata中獲取并緩存每個(gè)索引中的數(shù)組字段名。
private Map<String, Set<String>> indicesWithArrayFields;
public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {
this.esRestClient = esRestClient;
this.properties = properties;
this.catalogName = catalogName;
this.indicesWithArrayFields = Arrays.stream(StringUtils.split(properties.get(KEY_ARRAY_FIELDS), ","))
.map(s -> StringUtils.split(s, ":"))
.filter(kv -> kv.length <= 2)
.collect(
Collectors.toMap(
kv -> kv.length == 2 ? kv[0] : "",
kv -> new HashSet<>(Collections.singletonList(kv.length == 2 ? kv[1] : kv[0])),
(v1, v2) -> {
v1.addAll(v2);
return v1;
}
)
);
}
構(gòu)建EsTable時(shí),會(huì)調(diào)用EsUtil.convertColumnSchema()方法創(chuàng)建ES表的Schema,將對(duì)應(yīng)索引的arrayFields參數(shù)傳遞給它,并將數(shù)組字段重新用ArrayType包裝起來即可。
public static List<Column> convertColumnSchema(EsRestClient client, String index, Set<String> arrayFields)
throws AnalysisException {
List<Column> columns = new ArrayList<>();
String mappings = client.getMapping(index);
JSONObject properties = parseProperties(index, mappings);
if (null == properties) {
return columns;
}
for (String columnName : properties.keySet()) {
JSONObject columnAttr = (JSONObject) properties.get(columnName);
// default set json.
Type type = Type.JSON;
if (columnAttr.has("type")) {
type = convertType(columnAttr.get("type").toString());
if (arrayFields.contains(columnName)) {
type = new ArrayType(type);
}
}
Column column = new Column(columnName, type, true);
columns.add(column);
}
return columns;
}
The End
大家晚安。