引子
表值函數(shù)(table-valued function, TVF),顧名思義就是指返回值是一張表的函數(shù),在Oracle、SQL Server等數(shù)據(jù)庫中屢見不鮮。而在Flink的上一個(gè)穩(wěn)定版本1.13中,社區(qū)通過FLIP-145提出了窗口表值函數(shù)(window TVF)的實(shí)現(xiàn),用于替代舊版的窗口分組(grouped window)語法。
舉個(gè)栗子,在1.13之前,我們需要寫如下的Flink SQL語句來做10秒的滾動(dòng)窗口聚合:
SELECT TUMBLE_START(procTime, INTERVAL '10' SECONDS) AS window_start,merchandiseId,COUNT(1) AS sellCount
FROM rtdw_dwd.kafka_order_done_log
GROUP BY TUMBLE(procTime, INTERVAL '10' SECONDS),merchandiseId;
在1.13版本中,則可以改寫成如下的形式:
SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;
根據(jù)設(shè)計(jì)文檔的描述,窗口表值函數(shù)的思想來自2019年的SIGMOD論文<<One SQL to Rule Them All>>,而表值函數(shù)屬于SQL 2016標(biāo)準(zhǔn)的一部分。Calcite從1.25版本起也開始提供對(duì)滾動(dòng)窗口和滑動(dòng)窗口TVF的支持。除了標(biāo)準(zhǔn)化、易于實(shí)現(xiàn)之外,窗口TVF還支持舊版語法所不具備的一些特性,如Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化、Top-N支持、GROUPING SETS語法等。
接下來本文簡(jiǎn)單探究一下基于窗口TVF的聚合邏輯,以及對(duì)累積窗口TVF做一點(diǎn)簡(jiǎn)單的改進(jìn)。
SQL定義
窗口TVF函數(shù)的類圖如下所示。

Flink SQL在Calcite原生的SqlWindowTableFunction的基礎(chǔ)上加了指示窗口時(shí)間的三列,即window_start、window_end和window_time。SqlWindowTableFunction及其各個(gè)實(shí)現(xiàn)類的主要工作是校驗(yàn)TVF的操作數(shù)是否合法(通過內(nèi)部抽象類AbstractOperandMetadata和對(duì)應(yīng)的子類OperandMetadataImpl)。這一部分不再贅述,在下文改進(jìn)累積窗口TVF的代碼中會(huì)涉及到。
物理計(jì)劃
如果看官對(duì)Calcite基礎(chǔ)概念和Flink SQL的執(zhí)行流程不了解,請(qǐng)務(wù)必先參考上一篇文章<<From Calcite to Tampering with Flink SQL>>。
目前窗口TVF不能單獨(dú)使用,需要配合窗口聚合或Top-N一起使用。以上文中的聚合為例,觀察其執(zhí)行計(jì)劃如下。
EXPLAIN
SELECT window_start,window_end,merchandiseId,COUNT(1) AS sellCount
FROM TABLE( TUMBLE(TABLE rtdw_dwd.kafka_order_done_log, DESCRIPTOR(procTime), INTERVAL '10' SECONDS) )
GROUP BY window_start,window_end,merchandiseId;
== Abstract Syntax Tree ==
LogicalAggregate(group=[{0, 1, 2}], sellCount=[COUNT()])
+- LogicalProject(window_start=[$48], window_end=[$49], merchandiseId=[$10])
+- LogicalTableFunctionScan(invocation=[TUMBLE($47, DESCRIPTOR($47), 10000:INTERVAL SECOND)], rowType=[RecordType(BIGINT ts, /* ...... */, TIMESTAMP_LTZ(3) *PROCTIME* procTime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(ts=[$0], /* ...... */, procTime=[PROCTIME()])
+- LogicalTableScan(table=[[hive, rtdw_dwd, kafka_order_done_log]])
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[merchandiseId]])
+- Calc(select=[merchandiseId, PROCTIME() AS procTime])
+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])
== Optimized Execution Plan ==
Calc(select=[window_start, window_end, merchandiseId, sellCount])
+- WindowAggregate(groupBy=[merchandiseId], window=[TUMBLE(time_col=[procTime], size=[10 s])], select=[merchandiseId, COUNT(*) AS sellCount, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[merchandiseId]])
+- Calc(select=[merchandiseId, PROCTIME() AS procTime])
+- TableSourceScan(table=[[hive, rtdw_dwd, kafka_order_done_log]], fields=[ts, /* ...... */])
在Flink SQL規(guī)則集中,與如上查詢相關(guān)的規(guī)則按順序依次是:
-
ConverterRule:
StreamPhysicalWindowTableFunctionRule
該規(guī)則將調(diào)用窗口TVF的邏輯節(jié)點(diǎn)(即調(diào)用SqlWindowTableFunction的LogicalTableFunctionScan節(jié)點(diǎn))轉(zhuǎn)化為物理節(jié)點(diǎn)(StreamPhysicalWindowTableFunction)。 -
ConverterRule:
StreamPhysicalWindowAggregateRule
該規(guī)則將含有window_start、window_end字段的邏輯聚合節(jié)點(diǎn)FlinkLogicalAggregate轉(zhuǎn)化為物理的窗口聚合節(jié)點(diǎn)StreamPhysicalWindowAggregate以及其上的投影StreamPhysicalCalc。在有其他分組字段的情況下,還會(huì)根據(jù)FlinkRelDistribution#hash生成StreamPhysicalExchange節(jié)點(diǎn)。 -
RelOptRule:
PullUpWindowTableFunctionIntoWindowAggregateRule
顧名思義,該規(guī)則將上面兩個(gè)規(guī)則產(chǎn)生的RelNode進(jìn)行整理,消除代表窗口TVF的物理節(jié)點(diǎn),并將它的語義上拉至聚合節(jié)點(diǎn)中,形成最終的物理計(jì)劃。
然后,StreamPhysicalWindowAggregate節(jié)點(diǎn)翻譯成StreamExecWindowAggregate節(jié)點(diǎn),進(jìn)入執(zhí)行階段。
切片化窗口與執(zhí)行
筆者在很久之前曾寫過一篇《Flink滑動(dòng)窗口原理與細(xì)粒度滑動(dòng)窗口的性能問題》,其中提到粒度太碎的滑動(dòng)窗口會(huì)使得狀態(tài)和Timer膨脹,比較危險(xiǎn),應(yīng)該用滾動(dòng)窗口+在線存儲(chǔ)+讀時(shí)聚合的方法代替。社區(qū)在設(shè)計(jì)窗口TVF聚合時(shí)顯然考慮到了這點(diǎn),提出了切片化窗口(sliced window)的概念,并以此為基礎(chǔ)設(shè)計(jì)了一套與DataStream API Windowing不同的窗口機(jī)制。
如下圖的累積窗口所示,每?jī)蓷l縱向虛線之間的部分就是一個(gè)切片(slice)。

切片的本質(zhì)就是將滑動(dòng)/累積窗口化為滾動(dòng)窗口,并盡可能地復(fù)用中間計(jì)算結(jié)果,降低狀態(tài)壓力。自然地,前文所述的Local-Global聚合優(yōu)化、Distinct解熱點(diǎn)優(yōu)化就都可以無縫應(yīng)用了。
那么,切片是如何分配的呢?答案是通過SliceAssigner體系,其類圖如下。

可見,對(duì)于滾動(dòng)窗口而言,一個(gè)窗口就是一個(gè)切片;而對(duì)滑動(dòng)/累積窗口而言,一個(gè)窗口可能包含多個(gè)切片,一個(gè)切片也可能位于多個(gè)窗口中。所以共享切片的窗口要特別注意切片的過期與合并。以負(fù)責(zé)累積窗口的CumulativeSliceAssigner為例,對(duì)應(yīng)的邏輯如下。
@Override
public Iterable<Long> expiredSlices(long windowEnd) {
long windowStart = getWindowStart(windowEnd);
long firstSliceEnd = windowStart + step;
long lastSliceEnd = windowStart + maxSize;
if (windowEnd == firstSliceEnd) {
// we share state in the first slice, skip cleanup for the first slice
reuseExpiredList.clear();
} else if (windowEnd == lastSliceEnd) {
// when this is the last slice,
// we need to cleanup the shared state (i.e. first slice) and the current slice
reuseExpiredList.reset(windowEnd, firstSliceEnd);
} else {
// clean up current slice
reuseExpiredList.reset(windowEnd);
}
return reuseExpiredList;
}
@Override
public void mergeSlices(long sliceEnd, MergeCallback callback) throws Exception {
long windowStart = getWindowStart(sliceEnd);
long firstSliceEnd = windowStart + step;
if (sliceEnd == firstSliceEnd) {
// if this is the first slice, there is nothing to merge
reuseToBeMergedList.clear();
} else {
// otherwise, merge the current slice state into the first slice state
reuseToBeMergedList.reset(sliceEnd);
}
callback.merge(firstSliceEnd, reuseToBeMergedList);
}
可見,累積窗口的中間結(jié)果會(huì)被合并到第一個(gè)切片中。窗口未結(jié)束時(shí),除了第一個(gè)切片之外的其他切片觸發(fā)后都會(huì)過期。
實(shí)際處理切片化窗口的算子名為SlicingWindowOperator,它實(shí)際上是SlicingWindowProcessor的簡(jiǎn)單封裝。SlicingWindowProcessor的體系如下。

SlicingWindowProcessor的三個(gè)重要組成部分分別是:
-
WindowBuffer:在托管內(nèi)存區(qū)域分配的窗口數(shù)據(jù)緩存,避免在窗口未實(shí)際觸發(fā)時(shí)高頻訪問狀態(tài); -
WindowValueState:窗口的狀態(tài),其schema為[key, window_end, accumulator]。窗口結(jié)束時(shí)間作為窗口狀態(tài)的命名空間(namespace); -
NamespaceAggsHandleFunction:通過代碼生成器AggsHandlerCodeGenerator生成的聚合函數(shù)體。注意它并不是一個(gè)AggregateFunction,但是大致遵循其規(guī)范。
每當(dāng)一條數(shù)據(jù)到來時(shí),調(diào)用AbstractWindowAggProcessor#processElement()方法,比較容易理解了。
@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
if (!isEventTime) {
// always register processing time for every element when processing time mode
windowTimerService.registerProcessingTimeWindowTimer(sliceEnd);
}
if (isEventTime && isWindowFired(sliceEnd, currentProgress, shiftTimeZone)) {
// the assigned slice has been triggered, which means current element is late,
// but maybe not need to drop
long lastWindowEnd = sliceAssigner.getLastWindowEnd(sliceEnd);
if (isWindowFired(lastWindowEnd, currentProgress, shiftTimeZone)) {
// the last window has been triggered, so the element can be dropped now
return true;
} else {
windowBuffer.addElement(key, sliceStateMergeTarget(sliceEnd), element);
// we need to register a timer for the next unfired window,
// because this may the first time we see elements under the key
long unfiredFirstWindow = sliceEnd;
while (isWindowFired(unfiredFirstWindow, currentProgress, shiftTimeZone)) {
unfiredFirstWindow += windowInterval;
}
windowTimerService.registerEventTimeWindowTimer(unfiredFirstWindow);
return false;
}
} else {
// the assigned slice hasn't been triggered, accumulate into the assigned slice
windowBuffer.addElement(key, sliceEnd, element);
return false;
}
}
而當(dāng)切片需要被合并時(shí),先從WindowValueState中取出已有的狀態(tài),再遍歷切片,并調(diào)用NamespaceAggsHandleFunction#merge()方法進(jìn)行合并,最后更新狀態(tài)。
@Override
public void merge(@Nullable Long mergeResult, Iterable<Long> toBeMerged) throws Exception {
// get base accumulator
final RowData acc;
if (mergeResult == null) {
// null means the merged is not on state, create a new acc
acc = aggregator.createAccumulators();
} else {
RowData stateAcc = windowState.value(mergeResult);
if (stateAcc == null) {
acc = aggregator.createAccumulators();
} else {
acc = stateAcc;
}
}
// set base accumulator
aggregator.setAccumulators(mergeResult, acc);
// merge slice accumulators
for (Long slice : toBeMerged) {
RowData sliceAcc = windowState.value(slice);
if (sliceAcc != null) {
aggregator.merge(slice, sliceAcc);
}
}
// set merged acc into state if the merged acc is on state
if (mergeResult != null) {
windowState.update(mergeResult, aggregator.getAccumulators());
}
}
看官若要觀察codegen出來的聚合函數(shù)的代碼,可在log4j.properties文件中加上:
logger.codegen.name = org.apache.flink.table.runtime.generated
logger.codegen.level = DEBUG
一點(diǎn)改進(jìn)
我司有很多天級(jí)聚合+秒級(jí)觸發(fā)的Flink作業(yè),在DataStream API時(shí)代多由ContinuousProcessingTimeTrigger實(shí)現(xiàn),1.13版本之前的SQL則需要添加table.exec.emit.early-fire系列參數(shù)。正式采用1.13版本后,累積窗口(cumulate window)完美契合此類需求。但是,有些作業(yè)的key規(guī)模比較大,在一天的晚些時(shí)候會(huì)頻繁向下游Redis刷入大量數(shù)據(jù),造成不必要的壓力。因此,筆者對(duì)累積窗口TVF做了略有侵入的小改動(dòng),通過一個(gè)布爾參數(shù)INCREMENTAL可控制只輸出切片之間發(fā)生變化的聚合結(jié)果。操作很簡(jiǎn)單:
- 修改
SqlCumulateTableFunction函數(shù)的簽名,以及配套的窗口參數(shù)類CumulativeWindowSpec等; - 修改
SliceSharedWindowAggProcess#fireWindow()方法,如下。
@Override
public void fireWindow(Long windowEnd) throws Exception {
sliceSharedAssigner.mergeSlices(windowEnd, this);
// we have set accumulator in the merge() method
RowData aggResult = aggregator.getValue(windowEnd);
if (!isWindowEmpty()) {
if (sliceSharedAssigner instanceof CumulativeSliceAssigner
&& ((CumulativeSliceAssigner) sliceSharedAssigner).isIncremental()) {
RowData stateValue = windowState.value(windowEnd);
if (stateValue == null || !stateValue.equals(aggResult)) {
collect(aggResult);
}
} else {
collect(aggResult);
}
}
// we should register next window timer here,
// because slices are shared, maybe no elements arrived for the next slices
// ......
}
具體可參見這個(gè)commit。當(dāng)然,此方案會(huì)帶來訪問狀態(tài)的overhead,后續(xù)會(huì)做極限壓測(cè)以觀察性能,并做適當(dāng)修改。
The End
民那晚安晚安。