前言
祝廣大女性節(jié)日快樂~
快問快答
- Flink DataStream API中的RichFunction有哪些用途/特點?
- RichFunction中獲取到的RuntimeContext是干什么用的?
- 所有Function都有對應的RichFunction實現(xiàn)嗎?
- 所有Flink流處理的算子都可以傳入RichFunction嗎?

前兩個問題實際上可以合并成一個問題。RichFunction的特點是比Function多出了生命周期管理(open()和close()方法),以及能夠獲取其運行時上下文RuntimeContext。RuntimeContext與Function的每個并行實例(即一個Sub-task)相關聯(lián),通過它還能進一步得到如下信息:
- 運行時靜態(tài)信息,如Task的名稱、并行度、最大并行度、當前Sub-task的編號、當前類加載器等;
- 全局數(shù)據(jù)結(jié)構(gòu),即累加器(Accumulators)、廣播變量(Broadcast variables)和分布式緩存(Distributed cache);
- 創(chuàng)建各種狀態(tài)句柄,即我們熟知的
get***State(StateDescriptor)方法。
第三個問題,yes;第四個問題,no。
RichFunction不適用的場景
簡單的開窗聚合場景:
dataStream.keyBy(x -> x.getKey())
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.reduce(new MyRichReduceFunction<>())
這段代碼能編譯通過,但執(zhí)行時會拋出UnsupportedOperationException,提示ReduceFunction of reduce can not be a RichFunction。如果換成aggregate()方法和RichAggregateFunction會有同樣的問題,提示This aggregation function cannot be a RichFunction。在WindowedStream的對應實現(xiàn)中,可以看到此路不通:
public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> function) {
if (function instanceof RichFunction) {
throw new UnsupportedOperationException(
"ReduceFunction of reduce can not be a RichFunction. "
+ "Please use reduce(ReduceFunction, WindowFunction) instead.");
}
// clean the closure
function = input.getExecutionEnvironment().clean(function);
return reduce(function, new PassThroughWindowFunction<>());
}
public <ACC, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, R> function) {
checkNotNull(function, "function");
if (function instanceof RichFunction) {
throw new UnsupportedOperationException(
"This aggregation function cannot be a RichFunction.");
}
TypeInformation<ACC> accumulatorType =
TypeExtractor.getAggregateFunctionAccumulatorType(
function, input.getType(), null, false);
TypeInformation<R> resultType =
TypeExtractor.getAggregateFunctionReturnType(
function, input.getType(), null, false);
return aggregate(function, accumulatorType, resultType);
}
為什么不能用Rich[Reduce / Aggregate]Function?
答案并不難:與FlatMap、Filter等算子不同,Reduce和Aggregate本身就是自帶確定的狀態(tài)語義的算子,不需要用戶手動操作狀態(tài)(如果用戶能干預的話大概率會出問題),也不需要生命期管理的特性(它們的生命期總是始于第一條數(shù)據(jù),終于最后一條數(shù)據(jù))。
以Reduce邏輯為例(Aggregate同理),不妨進一步看下對應的窗口算子是如何構(gòu)造的。
public <R> WindowOperator<K, T, ?, R, W> reduce(
ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) {
Preconditions.checkNotNull(reduceFunction, "ReduceFunction cannot be null");
Preconditions.checkNotNull(function, "WindowFunction cannot be null");
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException(
"ReduceFunction of apply can not be a RichFunction.");
}
if (evictor != null) {
return buildEvictingWindowOperator(
new InternalIterableWindowFunction<>(
new ReduceApplyWindowFunction<>(reduceFunction, function)));
} else {
ReducingStateDescriptor<T> stateDesc =
new ReducingStateDescriptor<>(
WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));
return buildWindowOperator(
stateDesc, new InternalSingleValueWindowFunction<>(function));
}
}
注意到這里創(chuàng)建了ReducingStateDescriptor(ReduceFunction恰好是它的一個入?yún)ⅲ?,并最終獲取了內(nèi)置的ReducingState句柄。其實就DataStream API用戶的日常編程習慣而言,很少會主動用到ReducingState(以及AggregateState)。即使這樣,在它們的描述符構(gòu)造方法中,也加了同樣的強制校驗,防止傳入RichFunction,以保護狀態(tài)的確定性。
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {
super(name, typeClass, null);
this.reduceFunction = checkNotNull(reduceFunction);
if (reduceFunction instanceof RichFunction) {
throw new UnsupportedOperationException(
"ReduceFunction of ReducingState can not be a RichFunction.");
}
}
話說回來,Rich[Reduce / Aggregate]Function在Flink工程內(nèi)部以及示例中都沒有有效的使用過,所以我們大概可以判定這是Flink發(fā)展過程中的遺產(chǎn)吧(笑
The End
晚安晚安。