StarRocks Elasticsearch Catalog原理簡析

前言

Elasticsearch不僅是強(qiáng)大的全文搜索引擎,在很多場(chǎng)景下(特別是TiDB、ShardingSphere等框架成熟之前)也被當(dāng)做分布式HTAP數(shù)據(jù)庫使用,在存儲(chǔ)、更新海量數(shù)據(jù)的同時(shí),提供高效的點(diǎn)查和部分聚合查詢能力。StarRocks從3.1版本開始支持Elasticsearch Catalog,極大方便了ES數(shù)據(jù)的聯(lián)邦查詢。本文簡要分析其原理,并提出一個(gè)小問題和對(duì)應(yīng)的臨時(shí)解決方法。

元數(shù)據(jù)獲取階段

當(dāng)用戶創(chuàng)建一個(gè)ES Catalog時(shí),本質(zhì)是創(chuàng)建了ElasticsearchConnector和它對(duì)應(yīng)的ElasticsearchMetadata,后者持有該Catalog的全部配置信息和訪問ES集群的EsRestClient。這點(diǎn)和2.x版本中舊有的ES外表不同,每張ES外表都會(huì)對(duì)應(yīng)一個(gè)EsRestClient,會(huì)導(dǎo)致目標(biāo)ES集群的HTTP連接數(shù)比正常偏多,ES Catalog則基本不存在這個(gè)問題。

每個(gè)ES Catalog只有一個(gè)默認(rèn)數(shù)據(jù)庫default_db,以下則是ES實(shí)例中的索引,在FE中稱為EsTable,相當(dāng)于復(fù)用了原ES外表的實(shí)現(xiàn)(當(dāng)然ES Catalog會(huì)自動(dòng)獲取并推斷字段,無需自己建表)。每個(gè)EsTable對(duì)象都持有一個(gè)EsMetaStateTracker用于同步元數(shù)據(jù),其中又分為3個(gè)階段(phase),按順序分別為:

  • VersionPhase:通過GET /請(qǐng)求獲取ES集群的版本號(hào);
  • MappingPhase:通過GET /indexName/_mapping請(qǐng)求獲取索引的Mapping信息,同時(shí)解析keyword類型字段(包括text內(nèi)嵌的keyword)和存在doc_values的字段(即允許排序、聚合的字段),并存入上下文;
  • PartitionPhase:通過GET /indexName/_search_shards請(qǐng)求獲取索引的分片信息,再通過GET /_nodes/http請(qǐng)求獲取ES集群數(shù)據(jù)節(jié)點(diǎn)的地址,將分片ID和所在節(jié)點(diǎn)的映射關(guān)系存入EsShardPartitions容器。

FE計(jì)劃階段

ES Catalog查詢對(duì)應(yīng)的物理節(jié)點(diǎn)是EsScanNode,在生成Fragment的過程中除了維護(hù)Catalog的信息外,還會(huì)負(fù)責(zé)計(jì)算ScanRangeLocation,即每個(gè)BE節(jié)點(diǎn)負(fù)責(zé)請(qǐng)求的ES分片的對(duì)應(yīng)關(guān)系,同時(shí)會(huì)盡量做colocate分配,使得BE節(jié)點(diǎn)和請(qǐng)求的ES分片所在節(jié)點(diǎn)是同一個(gè)(當(dāng)然實(shí)際部署中這種情況不多見)。另外執(zhí)行EXPLAIN語句時(shí),會(huì)打印查詢謂詞翻譯出來的ES DSL,如下所示。注意這個(gè)DSL只是示意作用,實(shí)際執(zhí)行時(shí)BE會(huì)重新生成一次。

MySQL [default_db]> EXPLAIN SELECT id,waybillCode,orderTime FROM realtimewaybillmonitor_202409 WHERE yn <= 0 AND orderTime >= hours_sub(now(), 1) AND waybillCode LIKE 'JDX%' AND length(sku) > 3 LIMIT 1000;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Explain String                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                                                             |
|  OUTPUT EXPRS:13: id | 130: waybillCode | 71: orderTime                                                                                                     |
|   PARTITION: UNPARTITIONED                                                                                                                                  |
|                                                                                                                                                             |
|   RESULT SINK                                                                                                                                               |
|                                                                                                                                                             |
|   2:EXCHANGE                                                                                                                                                |
|      limit: 1000                                                                                                                                            |
|                                                                                                                                                             |
| PLAN FRAGMENT 1                                                                                                                                             |
|  OUTPUT EXPRS:                                                                                                                                              |
|   PARTITION: RANDOM                                                                                                                                         |
|                                                                                                                                                             |
|   STREAM DATA SINK                                                                                                                                          |
|     EXCHANGE ID: 02                                                                                                                                         |
|     UNPARTITIONED                                                                                                                                           |
|                                                                                                                                                             |
|   1:Project                                                                                                                                                 |
|   |  <slot 13> : 13: id                                                                                                                                     |
|   |  <slot 71> : 71: orderTime                                                                                                                              |
|   |  <slot 130> : 130: waybillCode                                                                                                                          |
|   |  limit: 1000                                                                                                                                            |
|   |                                                                                                                                                         |
|   0:EsScanNode                                                                                                                                              |
|      TABLE: realtimewaybillmonitor_202409                                                                                                                   |
|      PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%', length(14: sku) > 3                                      |
|      LOCAL_PREDICATES: length(14: sku) > 3                                                                                                                  |
|      REMOTE_PREDICATES: 9: yn <= 0, 71: orderTime >= '2024-09-26 15:46:17', 130: waybillCode LIKE 'JDX%'                                                    |
|      ES_QUERY_DSL: {"bool":{"must":[{"range":{"yn":{"lte":0}}},{"range":{"orderTime":{"gte":"2024-09-26 15:46:17"}}},{"wildcard":{"waybillCode":"JDX*"}}]}} |
|      ES index/type: realtimewaybillmonitor_202409/realtimewaybillmonitor                                                                                    |
|      limit: 1000                                                                                                                                            |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------+

可見上述查詢的前三個(gè)謂詞都可以下推到ES,但是第四個(gè)謂詞無法下推,需要將結(jié)果拉取到SR端再進(jìn)行過濾。

BE執(zhí)行階段

BE接收到前述EsScanNode后,將能夠下推到ES的謂詞封裝為EsPredicate,分為幾種情況:

  • 二元謂詞,且一側(cè)需為字面量,形如yn <= 0orderTime >= hours_sub(now(), 1)(右側(cè)可以做常量折疊)都符合條件;
  • 函數(shù)調(diào)用謂詞,支持esquery()(直接透?jìng)鱀SL的SR內(nèi)置函數(shù))、IS NULLIS NOT NULLLIKE,其他的均無法下推。即如果把上述示例的waybillCode LIKE 'JDX%'改成starts_with(waybillCode, 'JDX') = 1,這個(gè)條件就不能下推了;
  • INNOT IN謂詞,對(duì)應(yīng)terms query,簡單直接;
  • 復(fù)合的AND謂詞,實(shí)際上是對(duì)以上三種情況的組合做分解。

下推到ES的謂詞會(huì)從謂詞列表中刪除。接下來每個(gè)BE會(huì)分別創(chuàng)建ESScanReader以掃描ES數(shù)據(jù),這里需要注意,如果不是所有謂詞都下推到了ES(即謂詞列表中還有剩余),那么為了保證結(jié)果準(zhǔn)確,原始查詢中的LIMIT子句也不能下推。

上一節(jié)中的查詢實(shí)際生成的DSL JSON如下所示。如果無法命中doc_values,則會(huì)改用source查詢。

{
    "query": {
        "bool": {
            "filter": [{
                "bool": {
                    "should": [{
                        "range": {
                            "yn": {
                                "lte": "0"
                            }
                        }
                    }]
                }
            }, {
                "bool": {
                    "should": [{
                        "range": {
                            "orderTime": {
                                "gte": "1727336859000"
                            }
                        }
                    }]
                }
            }, {
                "bool": {
                    "should": [{
                        "wildcard": {
                            "waybillCode": "JDX*"
                        }
                    }]
                }
            }]
        }
    },
    "stored_fields": "_none_",
    "docvalue_fields": ["waybillCode", "orderTime", "yn", "id", "sku"],
    "sort": ["_doc"],
    "size": 4096
}

正式執(zhí)行查詢時(shí),又分為兩種情況。

  • LIMIT子句下推到了ES,那么BE會(huì)認(rèn)為這是一個(gè)"exactly-once"的查詢(代碼中如此),可以類比流式處理引擎中exactly-once的含義,即“只查詢一次就可以了”。此時(shí)組裝的搜索請(qǐng)求URL形如{target}/{index}/{type}/_search?terminate_after={limit}&preference=_shards:{shards}&{filter_path}。
  • 若沒有LIMIT子句下推到ES,則需要執(zhí)行Scroll查詢,分頁獲取結(jié)果。此時(shí)組裝的搜索請(qǐng)求URL形如{target}/{index}/{type}/_search?scroll={keep_alive}&preference=_shards:{shards}&{filter_path}。Scroll上下文的TTL由BE參數(shù)es_scroll_keepalive設(shè)定,默認(rèn)是5m

接下來ESScanReader每次請(qǐng)求上述URL獲取一批數(shù)據(jù),調(diào)用超時(shí)由BE參數(shù)es_http_timeout_ms設(shè)定,默認(rèn)是5000(即5秒),在網(wǎng)絡(luò)環(huán)境欠佳時(shí),應(yīng)適當(dāng)調(diào)大。獲取到的數(shù)據(jù)經(jīng)過JSON解析,獲取到doc_values或者_source,逐行填充到Chunk中(沒有值的則填充默認(rèn)值)。這里實(shí)際上可以優(yōu)化為按列填充,代碼中也有相應(yīng)的TODO標(biāo)記。

ES數(shù)組類型的問題

ES沒有顯式的數(shù)組類型,當(dāng)某字段插入了多個(gè)值時(shí),它會(huì)自然地變?yōu)閿?shù)組類型,但在索引Mapping中無法直接區(qū)分該字段是否為數(shù)組。在我們的歷史ES集群中,有大量ES索引含有實(shí)際為數(shù)組的字段,使用SR ES Catalog查詢時(shí)則會(huì)拋出異?;蛑环祷氐谝粋€(gè)值,影響體驗(yàn)。這里提出一個(gè)不優(yōu)雅的臨時(shí)解決方案,在Catalog參數(shù)中增加array_fields配置項(xiàng),讓用戶創(chuàng)建ES Catalog時(shí)手動(dòng)指定數(shù)組字段。

// Fields that should be treated as arrays when building Elasticsearch external table.                      
// Since Elasticsearch makes no distinction between scalar and array types, we should manually specify them.
// The format is: `field1,index2:field2...`                                                                 
// which means `field1` in all indices and `field2` in `index2` are arrays.                                 
@Config(key = KEY_ARRAY_FIELDS,                                                                             
        desc = "Fields that should be treated as arrays when building Elasticsearch external table. " +     
                "The format is: `field1,index2:field2,...`.",                                               
        defaultValue = "")                                                                                  
private String arrayFields;                                                                                 

然后在ElasticsearchMetadata中獲取并緩存每個(gè)索引中的數(shù)組字段名。

private Map<String, Set<String>> indicesWithArrayFields;                                                     
                                                                                                             
public ElasticsearchMetadata(EsRestClient esRestClient, Map<String, String> properties, String catalogName) {
    this.esRestClient = esRestClient;                                                                        
    this.properties = properties;                                                                            
    this.catalogName = catalogName;                                                                          
                                                                                                             
    this.indicesWithArrayFields = Arrays.stream(StringUtils.split(properties.get(KEY_ARRAY_FIELDS), ","))     
            .map(s -> StringUtils.split(s, ":"))                                                              
            .filter(kv -> kv.length <= 2)                                                                    
            .collect(                                                                                        
                    Collectors.toMap(                                                                        
                            kv -> kv.length == 2 ? kv[0] : "",                                               
                            kv -> new HashSet<>(Collections.singletonList(kv.length == 2 ? kv[1] : kv[0])),  
                            (v1, v2) -> {                                                                    
                                v1.addAll(v2);                                                               
                                return v1;                                                                   
                            }                                                                                
                    )                                                                                        
            );                                                                                               
}                                                                                                            

構(gòu)建EsTable時(shí),會(huì)調(diào)用EsUtil.convertColumnSchema()方法創(chuàng)建ES表的Schema,將對(duì)應(yīng)索引的arrayFields參數(shù)傳遞給它,并將數(shù)組字段重新用ArrayType包裝起來即可。

public static List<Column> convertColumnSchema(EsRestClient client, String index, Set<String> arrayFields)
        throws AnalysisException {                                                                        
    List<Column> columns = new ArrayList<>();                                                             
    String mappings = client.getMapping(index);                                                           
    JSONObject properties = parseProperties(index, mappings);                                             
    if (null == properties) {                                                                             
        return columns;                                                                                   
    }                                                                                                     
    for (String columnName : properties.keySet()) {                                                       
        JSONObject columnAttr = (JSONObject) properties.get(columnName);                                  
        // default set json.                                                                              
        Type type = Type.JSON;                                                                            
        if (columnAttr.has("type")) {                                                                     
            type = convertType(columnAttr.get("type").toString());                                        
            if (arrayFields.contains(columnName)) {                                                       
                type = new ArrayType(type);                                                               
            }                                                                                             
        }                                                                                                 
        Column column = new Column(columnName, type, true);                                               
        columns.add(column);                                                                              
    }                                                                                                     
    return columns;                                                                                       
}                                                                                                         

The End

大家晚安。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容