Flink SQL窗口表值函數(shù)(Window TVF)聚合實(shí)現(xiàn)原理淺析

引子

表值函數(shù)(table-valued function, TVF),顧名思義就是指返回值是一張表的函數(shù),在Oracle、SQL Server等數(shù)據(jù)庫中屢見不鮮。而在Flink的上一個(gè)穩(wěn)定版本1.13中,社區(qū)通過FLIP-145提出了窗口表值函數(shù)(window TVF)的實(shí)現(xiàn),用于替代舊版的窗口分組(grouped window)語法。

舉個(gè)栗子,在1.13之前,我們需要寫如下的Flink SQL語句來做10秒的滾動(dòng)窗口聚合:

SELECT TUMBLE_START(procTime, INTERVAL '10' SECONDS) AS window_start,merchandiseId,COUNT(1) AS sellCount
FROM rtdw_dwd.kafka_order_done_log
GROUP BY TUMBLE(procTime, INTERVAL '10' SECONDS),merchandiseId;

在1.13版本中,則可以改寫成如下的形式:

SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;

根據(jù)設(shè)計(jì)文檔的描述,窗口表值函數(shù)的思想來自2019年的SIGMOD論文<<One SQL to Rule Them All>>,而表值函數(shù)屬于SQL 2016標(biāo)準(zhǔn)的一部分。Calcite從1.25版本起也開始提供對(duì)滾動(dòng)窗口和滑動(dòng)窗口TVF的支持。除了標(biāo)準(zhǔn)化、易于實(shí)現(xiàn)之外,窗口TVF還支持舊版語法所不具備的一些特性,如Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化、Top-N支持、GROUPING SETS語法等。

接下來本文簡(jiǎn)單探究一下基于窗口TVF的聚合邏輯,以及對(duì)累積窗口TVF做一點(diǎn)簡(jiǎn)單的改進(jìn)。

SQL定義

窗口TVF函數(shù)的類圖如下所示。

Flink SQL在Calcite原生的SqlWindowTableFunction的基礎(chǔ)上加了指示窗口時(shí)間的三列,即window_startwindow_endwindow_time。SqlWindowTableFunction及其各個(gè)實(shí)現(xiàn)類的主要工作是校驗(yàn)TVF的操作數(shù)是否合法(通過內(nèi)部抽象類AbstractOperandMetadata和對(duì)應(yīng)的子類OperandMetadataImpl)。這一部分不再贅述,在下文改進(jìn)累積窗口TVF的代碼中會(huì)涉及到。

物理計(jì)劃

如果看官對(duì)Calcite基礎(chǔ)概念和Flink SQL的執(zhí)行流程不了解,請(qǐng)務(wù)必先參考上一篇文章<<From Calcite to Tampering with Flink SQL>>。

目前窗口TVF不能單獨(dú)使用,需要配合窗口聚合或Top-N一起使用。以上文中的聚合為例,觀察其執(zhí)行計(jì)劃如下。

EXPLAIN 
SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;

== Abstract Syntax Tree ==
LogicalAggregate(group=[{0, 1, 2}], sellCount=[COUNT()])
+- LogicalProject(window_start=[$48], window_end=[$49], merchandiseId=[$10])
   +- LogicalTableFunctionScan(invocation=[TUMBLE($47, DESCRIPTOR($47), 10000:INTERVAL SECOND)], rowType=[RecordType(BIGINT ts, /* ...... */, TIMESTAMP_LTZ(3) *PROCTIME* procTime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
      +- LogicalProject(ts=[$0], /* ...... */, procTime=[PROCTIME()])
         +- LogicalTableScan(table=[[hive, rtdw_dwd, kafka_order_done_log]])

== Optimized Physical Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[merchandiseId]])
      +- Calc(select=[merchandiseId, PROCTIME() AS procTime])
         +- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])

== Optimized Execution Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
   +- Exchange(distribution=[hash[merchandiseId]])
      +- Calc(select=[merchandiseId, PROCTIME() AS procTime])
         +- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])

在Flink SQL規(guī)則集中,與如上查詢相關(guān)的規(guī)則按順序依次是:

  • ConverterRule:StreamPhysicalWindowTableFunctionRule
    該規(guī)則將調(diào)用窗口TVF的邏輯節(jié)點(diǎn)(即調(diào)用SqlWindowTableFunctionLogicalTableFunctionScan節(jié)點(diǎn))轉(zhuǎn)化為物理節(jié)點(diǎn)(StreamPhysicalWindowTableFunction)。
  • ConverterRule:StreamPhysicalWindowAggregateRule
    該規(guī)則將含有window_startwindow_end字段的邏輯聚合節(jié)點(diǎn)FlinkLogicalAggregate轉(zhuǎn)化為物理的窗口聚合節(jié)點(diǎn)StreamPhysicalWindowAggregate以及其上的投影StreamPhysicalCalc。在有其他分組字段的情況下,還會(huì)根據(jù)FlinkRelDistribution#hash生成StreamPhysicalExchange節(jié)點(diǎn)。
  • RelOptRule:PullUpWindowTableFunctionIntoWindowAggregateRule
    顧名思義,該規(guī)則將上面兩個(gè)規(guī)則產(chǎn)生的RelNode進(jìn)行整理,消除代表窗口TVF的物理節(jié)點(diǎn),并將它的語義上拉至聚合節(jié)點(diǎn)中,形成最終的物理計(jì)劃。

然后,StreamPhysicalWindowAggregate節(jié)點(diǎn)翻譯成StreamExecWindowAggregate節(jié)點(diǎn),進(jìn)入執(zhí)行階段。

切片化窗口與執(zhí)行

筆者在很久之前曾寫過一篇《Flink滑動(dòng)窗口原理與細(xì)粒度滑動(dòng)窗口的性能問題》,其中提到粒度太碎的滑動(dòng)窗口會(huì)使得狀態(tài)和Timer膨脹,比較危險(xiǎn),應(yīng)該用滾動(dòng)窗口+在線存儲(chǔ)+讀時(shí)聚合的方法代替。社區(qū)在設(shè)計(jì)窗口TVF聚合時(shí)顯然考慮到了這點(diǎn),提出了切片化窗口(sliced window)的概念,并以此為基礎(chǔ)設(shè)計(jì)了一套與DataStream API Windowing不同的窗口機(jī)制。

如下圖的累積窗口所示,每?jī)蓷l縱向虛線之間的部分就是一個(gè)切片(slice)。

切片的本質(zhì)就是將滑動(dòng)/累積窗口化為滾動(dòng)窗口,并盡可能地復(fù)用中間計(jì)算結(jié)果,降低狀態(tài)壓力。自然地,前文所述的Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化就都可以無縫應(yīng)用了。

那么,切片是如何分配的呢?答案是通過SliceAssigner體系,其類圖如下。

注意`CumulativeSliceAssigner`多了一個(gè)`isIncremental()`方法,這是下文所做優(yōu)化的一步

可見,對(duì)于滾動(dòng)窗口而言,一個(gè)窗口就是一個(gè)切片;而對(duì)滑動(dòng)/累積窗口而言,一個(gè)窗口可能包含多個(gè)切片,一個(gè)切片也可能位于多個(gè)窗口中。所以共享切片的窗口要特別注意切片的過期與合并。以負(fù)責(zé)累積窗口的CumulativeSliceAssigner為例,對(duì)應(yīng)的邏輯如下。

@Override
public Iterable<Long> expiredSlices(long windowEnd) {
    long windowStart = getWindowStart(windowEnd);
    long firstSliceEnd = windowStart + step;
    long lastSliceEnd = windowStart + maxSize;
    if (windowEnd == firstSliceEnd) {
        // we share state in the first slice, skip cleanup for the first slice
        reuseExpiredList.clear();
    } else if (windowEnd == lastSliceEnd) {
        // when this is the last slice,
        // we need to cleanup the shared state (i.e. first slice) and the current slice
        reuseExpiredList.reset(windowEnd, firstSliceEnd);
    } else {
        // clean up current slice
        reuseExpiredList.reset(windowEnd);
    }
    return reuseExpiredList;
}

@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
    long windowStart = getWindowStart(sliceEnd);
    long firstSliceEnd = windowStart + step;
    if (sliceEnd == firstSliceEnd) {
        // if this is the first slice, there is nothing to merge
        reuseToBeMergedList.clear();
    } else {
        // otherwise, merge the current slice state into the first slice state
        reuseToBeMergedList.reset(sliceEnd);
    }
    callback.merge(firstSliceEnd, reuseToBeMergedList);
}

可見,累積窗口的中間結(jié)果會(huì)被合并到第一個(gè)切片中。窗口未結(jié)束時(shí),除了第一個(gè)切片之外的其他切片觸發(fā)后都會(huì)過期。

實(shí)際處理切片化窗口的算子名為SlicingWindowOperator,它實(shí)際上是SlicingWindowProcessor的簡(jiǎn)單封裝。SlicingWindowProcessor的體系如下。

SlicingWindowProcessor的三個(gè)重要組成部分分別是:

  • WindowBuffer:在托管內(nèi)存區(qū)域分配的窗口數(shù)據(jù)緩存,避免在窗口未實(shí)際觸發(fā)時(shí)高頻訪問狀態(tài);
  • WindowValueState:窗口的狀態(tài),其schema為[key, window_end, accumulator]。窗口結(jié)束時(shí)間作為窗口狀態(tài)的命名空間(namespace);
  • NamespaceAggsHandleFunction:通過代碼生成器AggsHandlerCodeGenerator生成的聚合函數(shù)體。注意它并不是一個(gè)AggregateFunction,但是大致遵循其規(guī)范。

每當(dāng)一條數(shù)據(jù)到來時(shí),調(diào)用AbstractWindowAggProcessor#processElement()方法,比較容易理解了。

@Override
public boolean processElement(RowData key, RowData element) throws Exception {
    long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
    if (!isEventTime) {
        // always register processing time for every element when processing time mode
        windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
    }
    if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
        // the assigned slice has been triggered, which means current element is late,
        // but maybe not need to drop
        long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
        if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
            // the last window has been triggered, so the element can be dropped now
            return true;
        } else {
            windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
            // we need to register a timer for the next unfired window,
            // because this may the first time we see elements under the key
            long unfiredFirstWindow = sliceEnd;
            while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
                unfiredFirstWindow += windowInterval;
            }
            windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
            return false;
        }
    } else {
        // the assigned slice hasn't been triggered, accumulate into the assigned slice
        windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }
}

而當(dāng)切片需要被合并時(shí),先從WindowValueState中取出已有的狀態(tài),再遍歷切片,并調(diào)用NamespaceAggsHandleFunction#merge()方法進(jìn)行合并,最后更新狀態(tài)。

@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
    // get base accumulator
    final RowData acc;
    if (mergeResult == null) {
        // null means the merged is not on state, create a new acc
        acc = aggregator.createAccumulators();
    } else {
        RowData stateAcc = windowState.value(mergeResult);
        if (stateAcc == null) {
            acc = aggregator.createAccumulators();
        } else {
            acc = stateAcc;
        }
    }
    // set base accumulator
    aggregator.setAccumulators(mergeResult, acc);
    // merge slice accumulators
    for (Long slice : toBeMerged) {
        RowData sliceAcc = windowState.value(slice);
        if (sliceAcc != null) {
            aggregator.merge(slice, sliceAcc);
        }
    }
    // set merged acc into state if the merged acc is on state
    if (mergeResult != null) {
        windowState.update(mergeResult, aggregator.getAccumulators());
    }
}

看官若要觀察codegen出來的聚合函數(shù)的代碼,可在log4j.properties文件中加上:

logger.codegen.name = org.apache.flink.table.runtime.generated
logger.codegen.level = DEBUG

一點(diǎn)改進(jìn)

我司有很多天級(jí)聚合+秒級(jí)觸發(fā)的Flink作業(yè),在DataStream API時(shí)代多由ContinuousProcessingTimeTrigger實(shí)現(xiàn),1.13版本之前的SQL則需要添加table.exec.emit.early-fire系列參數(shù)。正式采用1.13版本后,累積窗口(cumulate window)完美契合此類需求。但是,有些作業(yè)的key規(guī)模比較大,在一天的晚些時(shí)候會(huì)頻繁向下游Redis刷入大量數(shù)據(jù),造成不必要的壓力。因此,筆者對(duì)累積窗口TVF做了略有侵入的小改動(dòng),通過一個(gè)布爾參數(shù)INCREMENTAL可控制只輸出切片之間發(fā)生變化的聚合結(jié)果。操作很簡(jiǎn)單:

  • 修改SqlCumulateTableFunction函數(shù)的簽名,以及配套的窗口參數(shù)類CumulativeWindowSpec等;
  • 修改SliceSharedWindowAggProcess#fireWindow()方法,如下。
@Override
public void fireWindow(Long windowEnd) throws Exception {
    sliceSharedAssigner.mergeSlices(windowEnd, this);
    // we have set accumulator in the merge() method
    RowData aggResult = aggregator.getValue(windowEnd);
    if (!isWindowEmpty()) {
        if (sliceSharedAssigner instanceof CumulativeSliceAssigner
                && ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {
            RowData stateValue = windowState.value(windowEnd);
            if (stateValue == null || !stateValue.equals(aggResult)) {
                collect(aggResult);
            }
        } else {
            collect(aggResult);
        }
    }
    // we should register next window timer here,
    // because slices are shared, maybe no elements arrived for the next slices
    // ......
}

具體可參見這個(gè)commit。當(dāng)然,此方案會(huì)帶來訪問狀態(tài)的overhead,后續(xù)會(huì)做極限壓測(cè)以觀察性能,并做適當(dāng)修改。

The End

民那晚安晚安。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容