【工作】Presto Hive讀取代碼閱讀筆記

PS:基于 presto-0.258

整體流程

接收語句

DispatchManager createQueryInternal
    queryPreparer.prepareQuer // preparedQuery [封裝Statement]
        dispatchQueryFactory.createDispatchQuery => DispatchQuery 
            resourceGroupManager.submit(preparedQuery.getStatement(), dq, selectionContext, queryExecutor)

提交成功

InternalResourceGroup run (LocalDispatchQuery)
    InternalResourceGroup startInBackground
        LocalDispatchQuery waitForMinimumWorkers
            LocalDispatchQuery startExecution
                SqlQueryExecution start

開始執(zhí)行

    PlanRoot plan = analyzeQuery();
    planDistribution(plan);
    scheduler.start(); // SqlQueryScheduler

一些細(xì)節(jié)

hive表的元數(shù)據(jù)訪問

元數(shù)據(jù)總體由 HiveMetadata維護(hù),里面包含metastore連接,partitionManager以及一些輔助方法。

獲取表的元數(shù)據(jù)


        StatementAnalyzer visitTable
            TableMetadata tableMetadata = metadata.getTableMetadata(session, tableHandle.get());
                ConnectorMetadata metadata = getMetadata(session, connectorId); -> HiveMetadata
                    解析一些 
                        HiveStorageFormat 
                        properties 
                        partitionedBy 
                        bucketProperty 
                        preferredOrderingColumns 
                        orcBloomFilterColumns
                        orcBloomFilterFfp
                        comment
                    等信息
                封裝到ConnectorTableMetadata

Source Split的切分

        從plan里createStageScheduler
            splitSourceProvider // 這里會出現(xiàn)HiveTableLayoutHandle 描述了表的目錄 分區(qū) 字段 謂詞等 甚至有tableParameters
                HiveSplitSource allAtOnce //返回的是HiveSplitSource實例 封裝了一個AsyncQueue隊列去存儲split
                    HiveSplitSource getNextBatch //這是每一批
                        BackgroundHiveSplitLoader loadSplits //這里觸發(fā)分區(qū) 文件的迭代 和split計算 。。。
                            StoragePartitionLoader loadPartition //這里有個 DirectoryLister 【重點(diǎn)關(guān)注】
                                這里夾雜幾種情況
                                    SymlinkTextInputFormat
                                    shouldUseFileSplitsFromInputFormat(inputFormat))
                                        InputSplit[] splits = inputFormat.getSplits(jobConf, 0); 去拿到split 。。
                                    if (tableBucketInfo.isPresent()) {
                                不同情況解析split的邏輯不一樣
                                正常情況是非bucket普通表
                                是用DirectoryLister去list分區(qū)目錄path 一個文件對應(yīng)一個InternalHiveSplit(也可能被path filter過濾)
                                    Optional<InternalHiveSplit> createInternalHiveSplit(HiveFileInfo fileInfo
                                    這里的邏輯:
                                        1)提取 List<HostAddress> addresses
                                        2)計算分區(qū)這個文件的相對路徑 URI relativePath = partitionInfo.getPath().relativize(path.toUri());
  • 上面返回的只是InternalHiveSplit 還需要在 HiveSplitSource的getNextBatch里變成HiveSplit

  • queues.borrowBatchAsync(bucketNumber xxx 觸發(fā)future list目錄任務(wù) 。。

  • 最后對外輸出的是 HiveSplit【封裝了一大堆東西。。基于maxSplitSize算出來的 即一個文件 可能有多個】

  • 關(guān)于split元數(shù)據(jù)這塊比spark調(diào)度要好很多 因為是流式的 不是靜態(tài)的集合 。。 內(nèi)存需求會少很多。

  • 最主要的是ListenableFuture<?> future = hiveSplitSource.addToQueue(splits.next());

  • 最后輸出的HiveSplit在一個PerBucket + AsyncQueue 組合的復(fù)雜的隊列緩存結(jié)構(gòu)里

節(jié)點(diǎn)選擇 [SOFT Affinity scheduler]

  • 這里實際上是用path的哈希取模所有節(jié)點(diǎn) 得到固定的目標(biāo)節(jié)點(diǎn)映射列表
    (好像忽略了文件實際位置。。但是因為這有緩存 包括文件的 所以可能是綜合考慮 如果是hard的話 是不是可能不均衡 ?)
  • 貌似只適合于存算分離的架構(gòu)。。
  • 如果是存算一體的 建議選HARD Affinity ,即類似spark的preference local node

緩存(Raptorx中的特性)

  • 1)文件 cache 【coordinater上 放內(nèi)存】【done】
            本質(zhì)是guava的Cache<Path, List<HiveFileInfo>> cache類實例 分區(qū)目錄也假設(shè)為不動的。。
            This can only be applied to sealed directories
                見:StoragePartitionLoader.createInternalHiveSplitIterator 
                    boolean cacheable = isUseListDirectoryCache(session);
                    if (partition.isPresent()) {
                        // Use cache only for sealed partitions
                        cacheable &= partition.get().isSealedPartition();
                    }

            文件的list是根據(jù) hdfs 的 remoteIterator 迭代的 。。不像spark 跑了并行任務(wù)去獲取location信息 全部一起緩存 。。
  • 2)tail/footer cache【在節(jié)點(diǎn)上 也是放內(nèi)存】
            注:OrcDataSource這個類和tail/footer沒關(guān)系 只是封裝了流讀取的一些入口 
            這個類是必須要打開至少一次ORC文件的 

            HiveClientModule -> createOrcFileTailSource 里決定了是否啟用緩存 。。
                Cache<OrcDataSourceId, OrcFileTail> cache

            具體來說

            OrcReader里面的兩個主要元數(shù)據(jù) 都來自 orcFileTailSource提供的OrcFileTail // Slice 里保存了 byte[]
                private final Footer footer; // 文件級別的統(tǒng)計 stripe摘要
                private final Metadata metadata; //stripe級的統(tǒng)計 
                
            還有stripe的StripeMetadataSource -> 這個類提供獲取StripeFooter的方法 
                (StripeFooter 包含一堆Stream 即各列數(shù)據(jù)信息 以及索引信息 StripeReader會用 selectRowGroups )
                這里面會判斷是否要緩存isCachedStream 
                return streamKind == BLOOM_FILTER || streamKind == ROW_INDEX; 

            注意:這個方法調(diào)用時是傳入OrcDataSource的 所以能拿到ORC文件流 但是之后就不需要這個流了。seek 等也不需要了。
            OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(OrcDataSource orcDataSource)

謂詞裁剪(plan層)

  • 1)分區(qū)裁剪
            SqlQueryExecution analyzeQuery
                logicalPlanner plan
                    IterativeOptimizer【這個類類似于scala里面的模式匹配 不同的規(guī)則去catch其對應(yīng)的語法樹節(jié)點(diǎn)去執(zhí)行邏輯】
                    而所有的規(guī)則都在 PlanOptimizers 去添加 每個匹配邏輯是一個Rule類的實現(xiàn)
                        如PickTableLayout 有一個規(guī)則是pickTableLayoutForPredicate
                            hivePartitionResult = partitionManager.getPartitions(
                                這里如果有謂詞 where 就會把tablescan替換成FilterNode(里邊包含tablescan)
            這樣就完成了查詢計劃的替換

            分區(qū)裁剪過程【這里很抽象 謂詞傳遞 命名很不好理解 。。?!?
  • 2)謂詞表示體系

重要
這里要解釋一個較Domain的類。。實際上就是表示某個值的范圍(離散值,范圍,無窮等)
以及其服務(wù)類:TupleDomain 。。是限定了字段 + 值范圍的組合
(PS:這命名實在讓人別扭。)

            參考 TestTupleDomainFilter 
            還搞了個緩存去防止多次解析 。。
            TupleDomainFilterCache -> Converting Domain into TupleDomainFilter is expensive, hence, we use a cache keyed on Domain

            傳遞到下游的時候 是TupleDomain<Subfield> domainPredicate 
            這里面Subfield是一個可以多層表達(dá)的字段表示
            TupleDomain 是一個泛型Map 大概就是<字段 值范圍>的一個模式。

            Constraint<ColumnHandle> 
                // 這又是另一個表示條件的類 。。里面封裝了 TupleDomain<T> summary; 
                // 和另一個 Optional<Predicate<Map<T, NullableValue>>> predicate 這個是Java Function接口里面的Predicate 
                // 有幾個主要方法 and/or/test -> 得到返回值是Boolean抽象 。

            這里面涉及到的泛型有
                ColumnHandle -> 一個空接口 這是presto spi 定義的 各個connector可能有不同實現(xiàn) 
                Map<Column, Domain> effectivePredicate -> 這個Column就是Hive元數(shù)據(jù)里Table下的列,獲取分區(qū)列表時候用到
                HiveColumnHandle -> hive的實現(xiàn) 
                HivePartition -> Map<ColumnHandle, NullableValue> getKeys() //表示field -> value 

讀split邏輯

        具體的task讀的是 hiveSplit 

        弄清楚split切分邏輯【】

        worker上的調(diào)用鏈:
        PrioritizedSplitRunner process
            DriverSplitRunner processFor
                Driver processInternal
                    xxOperator getOutput -> 觸發(fā)計算
                        HivePageSourceProvider createHivePageSource
                            OrcBatchPageSourceFactory createOrcPageSource
                                之后就是ORC的解析 OrcReader -> OrcRecordReader 去讀取到presto的page相關(guān)邏輯了。

是否緩存文件footer元數(shù)據(jù) 不只是開啟了cache配置 還需要選擇的split節(jié)點(diǎn)在期望節(jié)點(diǎn)里 才會去緩存 。即 和nodeSelector策略有關(guān)。而且這個緩存 是以每個文件粒度調(diào)度的 。(包含在hiveSplit里面)

梳理stage/task/driver/split的并發(fā)關(guān)系

  • Query 根據(jù)SQL語句生成查詢執(zhí)行計劃,進(jìn)而生成可以執(zhí)行的查詢(Query),一個查詢執(zhí)行由Stage、Task、Driver、Split、Operator和DataSource組成
  • Stage 執(zhí)行查詢階段 Stage之間是樹狀的結(jié)構(gòu) ,RootStage 將結(jié)果返回給coordinator ,SourceStage接收coordinator數(shù)據(jù) 其他stage都有上下游 stage分為四種 single(root)、Fixed、source、coordinator_only(DML or DDL)
  • Exchange 兩個stage數(shù)據(jù)的交換通過Exchange 兩種Exchange ;Output Buffer (生產(chǎn)數(shù)據(jù)的stage通過此傳給下游stage)Exchange Client (下游消費(fèi));如果stage 是source 直接通過connector 讀數(shù)據(jù)
  • 一個Task包含一或多個Driver,是作用于一個Split的一系列Operator集合。一個Driver用于處理一個Split產(chǎn)生相應(yīng)輸出,輸出由Task收集并傳遞給下游Stage中的Task

核心問題
1)task個數(shù)
正常就是1個stage節(jié)點(diǎn)個數(shù)個,而presto會盡可能使用資源。每個stage每個節(jié)點(diǎn)都有一個task。(當(dāng)然是非root stage)
2)driver個數(shù)
其實就是split個數(shù)
3)split個數(shù)(根據(jù)stage的類型不同而不同)

    single(root)-> 1個
    coordinator only -> 元數(shù)據(jù)操作 也是一個
    如果是source的stage -> 由connector的splitmanager決定
    一個文件最少一個split
    remainingInitialSplits 有個參數(shù)影響了maxSplitBytes // 如果計算次數(shù)少于remainingInitialSplits 會采用 maxInitialSplitSize
        否則用配置的maxSplitSize去滾動每個文件生成HiveSplit
                (最后2個split會平衡 避免過小的split 導(dǎo)致時間不太均衡...)
      hive.max-split-size
      hive.max-initial-splits(默認(rèn)200 不調(diào)節(jié)也行。。需要調(diào)節(jié) maxInitialSplitSize 如果不設(shè)置就是默認(rèn) maxSplitSize/2 )
      hive.max-initial-split-size

    如果是中間stage -> hash_partition_count 這個session 參數(shù)?還是 task.concurrency ?

舉例說明:對與讀取hive表來說,1G的數(shù)據(jù),設(shè)置 hive.max-split-size = 64MB,hive.max-initial-split-size= 64MB,最后才會得到期望的1G/64MB個source split

線程并發(fā)模型

  • task.max-worker-threads // worker啟動的線程池的大小,即工作線程個數(shù)
  • task.concurrency // set session task_concurrency=1; 這個影響 agg/join 的并發(fā)
  • task.min-drivers // 默認(rèn)是 task.max-worker-threads x2 ,worker最少在執(zhí)行的split數(shù),如果有足夠資源和任務(wù)
  • task.min-drivers-per-task // task最少并行執(zhí)行的split數(shù)
  • initial_splits_per_node // 。。(應(yīng)該是調(diào)度時候)
在taskExecutor的enqueueSplits里
        for (SplitRunner taskSplit : taskSplits) {
            xxx
            scheduleTaskIfNecessary(taskHandle); //按task級別調(diào)度 會用到 task.min-drivers-per-task 即可并發(fā)運(yùn)行的split 

            addNewEntrants(); 
            //在資源變動( 如task remove/split finish/等時候 去嘗試去調(diào)度更多split 【這里比較模糊。?!坑玫?task.min-drivers 參數(shù) )
            //比如 task.min-drivers-per-task 是4 task.min-drivers是10 則相當(dāng)于進(jìn)行了2次調(diào)度 。。
        }

    在Presto中有一個配置query.execution-policy,它有兩個選項,一個是all-at-once,另一個是 phased // set session execution_policy='phased'; 

    線程和并發(fā)模型:
        SqlTaskExecutionFactory -> SqlTaskExecution
        Coordinator分發(fā)Task到對應(yīng)Worker,通過HttpClient發(fā)送給節(jié)點(diǎn)上TaskResource提供的RESTful接口
        Worker啟動一個SqlTaskExecution對象或者更新對應(yīng)對象需要處理的Split
            這里能看到每個split其實對應(yīng)一個driverSplitRunner(這個類里面有DriverSplitRunnerFactory)
                    // Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
                    ImmutableList.Builder<DriverSplitRunner> runners = ImmutableList.builder();
                    for (ScheduledSplit scheduledSplit : pendingSplits.removeAllSplits()) {
                        // create a new driver for the split
                        runners.add(partitionedDriverRunnerFactory.createDriverRunner(scheduledSplit, lifespan));
                    }
                    enqueueDriverSplitRunner(false, runners.build());

                    在DriverSplitRunner的Process方法里
                    this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);

        TaskExecutor 封裝了TaskRunner(執(zhí)行split的地方 PrioritizedSplitRunner(實現(xiàn)類是DriverSplitRunner))
        TaskExecutor 里具體執(zhí)行任務(wù)是是一個線程池
                config.getMaxWorkerThreads(), // 這個是啟動的固定線程池 。。不同SQL不同task都在里面執(zhí)行 。。線程池大小是固定的:task.max-worker-threads
                config.getMinDrivers(),// 這個默認(rèn)是上面 x 2 不知有什么用?
                config.getMinDriversPerTask(), // ?
                config.getMaxDriversPerTask(),
        PrioritizedSplitRunner實現(xiàn)了時間片機(jī)制(固定1秒去執(zhí)行split 挑選優(yōu)先級)
        這種調(diào)度是不是犧牲了部分性能 換取迭代 優(yōu)先級 多租戶 多任務(wù)管理 結(jié)果快速反饋機(jī)制。。。
        
        PrioritizedSplitRunner里實際運(yùn)行的是Driver,封裝的一堆Operatior 如表Scan/filter/limit/taskoutPut 作用在split上


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容