ProcessFunction是Flink的low-level Stream Api,我們?cè)谑褂肍link的API進(jìn)行作業(yè)時(shí),不外乎關(guān)心以下三種東西:
- events(也就是流數(shù)據(jù),streaming elements)
- state(狀態(tài))
- timer(時(shí)間)
ProccessFunction就是這樣一種API,它提供了所有能訪問以上三種東西的接口。需要注意的是,如果要訪問keyed state,則必須使用keyedstream:
stream.keyBy(...).process(new MyProcessFunction())
ProccessFunction就像是一個(gè)RichMapFunction + Timer的組合體,但功能更強(qiáng)大。比如當(dāng)你用RichMapFunction+Timer時(shí),state的訪問就是一個(gè)問題,你無(wú)法在除map以外的其它方法訪問keyed state。
在我剛剛開始用ProccessFunction的時(shí)候,遇到些問題,這里記錄下。
為什么定時(shí)任務(wù)不觸發(fā)
相信大家剛開始使用ProccessFunction的時(shí)候,應(yīng)該都先看了Flink官網(wǎng)的Demo,不知道大家有沒有運(yùn)行成功,我反正沒有運(yùn)行成功過,定時(shí)任務(wù)也不觸發(fā)。我測(cè)試用到的kafka數(shù)據(jù)非常簡(jiǎn)單就兩字段:userId和viewTime。ProccessFunction里的主要邏輯如下:
public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
}
// update the state's count
current.count++;
// ctx.timestamp()方法返回你的事件時(shí)間
current.lastModified = ctx.timestamp();
// 更新狀態(tài)
state.update(current);
// 一分鐘后調(diào)度計(jì)算(執(zhí)行onTimer方法)
ctx.timerService().registerEventTimeTimer(currentWarter + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
CountWithTimestamp result = state.value();
if (timestamp == result.lastModified + 60000) {
out.collect(new Tuple2<>(key.f0, result.count));
}
}
那為什么沒有觸發(fā)onTimer呢?沒辦法,只能看源代碼了,因應(yīng)之前有閱讀過Flink1.6各個(gè)算子之前的數(shù)據(jù)傳輸?shù)拇a的經(jīng)驗(yàn),直覺告訴我,應(yīng)該在StreamInputProcessor.java類里找原因,不過現(xiàn)在我用的是flink1.10,所以我還得去Flink1.10里找,代碼差不多,也是從StreamInputProcessor里找,只不過在Flink1.10里它變成了一個(gè)接口類,不過沒關(guān)系,大同小異。在StreamTaskNetworkInput.java類中的一個(gè)關(guān)鍵方法processElement:
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
從上面可以看到,F(xiàn)link在處理元素?cái)?shù)據(jù)的時(shí)候,有4個(gè)分支判斷,即是判斷StreamElement屬于哪種類型:
- StreamRecord - 代表一條流數(shù)據(jù)
- WaterMark - 水位線,本身就是一個(gè)時(shí)間戳,指示元素timestamp小等于watermark的值都已經(jīng)到了,算子通過調(diào)用。
- StreamStatus - 流狀態(tài),包含IDLE和ACTIVE兩種狀態(tài)。
- LatencyMarker - 一個(gè)特殊的mark,用于判斷數(shù)據(jù)的延遲情況。
其中recordOrMark.isWatermark()分支就是用于判斷是否為一個(gè)wartermark, 當(dāng)我們?cè)诔绦蛑薪o元素打上watermark的時(shí)候,程序就會(huì)進(jìn)入這個(gè)判斷,處理新的watermark,當(dāng)新的watermark大于舊的watermark時(shí),會(huì)覆蓋舊的watermark,并且如果有定時(shí)任務(wù),則觸發(fā)onTimer(新的watermark大于等于定時(shí)的timer),如下:
//AbstractStreamOperator.java
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}
//InternalTimerServiceImpl.java
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
//從隊(duì)列里取出定時(shí)任務(wù),并且判斷時(shí)間是否小于等于當(dāng)前的watermark
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
給程序加上WaterMark
因?yàn)槲襊roccessFunctio里使用的是ctx.timerService().registerEventTimeTimer,基于事件時(shí)間的timer,所以要使ProccessFunction里的定時(shí)任務(wù)能夠觸發(fā),我們還需要如下兩步:
- 把Time Characteristic設(shè)置為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- 給數(shù)據(jù)加上watermark
stream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<UserAction>() {
private long currentMaxTimestamp = 0L;
private long maxOutOfOrderness = 10000L;//最大允許的亂序時(shí)間是10s
private Watermark watermark = null;
@Override
public long extractTimestamp(UserAction element, long previousElementTimestamp) {
long timestamp = element.viewTime;
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return watermark;
}
})
.keyBy("userId")
.process(new CountWithTimeoutFunction())
.print();
再次運(yùn)行程序,把斷點(diǎn)放在onTimer()里,依次發(fā)送數(shù)據(jù):
shizc,2020-06-04 16:25:00
shizc,2020-06-04 16:26:00
當(dāng)發(fā)送完第二條數(shù)據(jù)的時(shí)候,斷點(diǎn)走到了onTimer(),但當(dāng)繼續(xù)執(zhí)行時(shí),由于我們程序里有這樣一個(gè)判斷:
// timestamp是觸發(fā)timer時(shí)的那個(gè)點(diǎn),
if (timestamp == result.lastModified + 60000) {
// emit the state on timeout
out.collect(new Tuple2<>(key.f0, result.count));
}
我們發(fā)現(xiàn)程序永遠(yuǎn)也進(jìn)入不了這個(gè)判斷,具體原因是:
- 基于event-time的定時(shí)任務(wù)需要下一個(gè)元素的watermark高于當(dāng)前的定時(shí)任務(wù),比如數(shù)據(jù)當(dāng)前的timestamp是2020-06-04 16:25:00,設(shè)置60s后觸發(fā)onTimer,那么定時(shí)任務(wù)就是2020-06-04 16:26:00,如果想要onTimer觸發(fā),就必須要有一條數(shù)的timestamp是大于等于2020-06-04 16:26:00。
- 當(dāng)滿足上面 1 的條件時(shí),由于以下代碼的原因?qū)е聇imestamp == result.lastModified + 6000永遠(yuǎn)不成立:
......
//
current.lastModified = ctx.timestamp();
// 更新狀態(tài)
state.update(current);
// 一分鐘后調(diào)度計(jì)算(執(zhí)行onTimer方法)
ctx.timerService().registerEventTimeTimer(currentWarter + 60000);
比如:我們第一條數(shù)據(jù)是(shizc,2020-06-04 16:25:00),那么current.lastModified = ctx.timestamp() == 2020-06-04 16:25:00,執(zhí)行registerEventTimeTimer后,我期望在2020-06-04 16:26:00執(zhí)行onTimer,所以我發(fā)送了第二條數(shù)據(jù)(shizc,2020-06-04 16:26:00),斷點(diǎn)走到了onTimer():
此時(shí)onTimer方法里的timestamp=2020-06-04 16:26:00,而result.lastModified=2020-06-04 16:26:00,所以if (timestamp == result.lastModified + 60000),永遠(yuǎn)不會(huì)成立。這里應(yīng)該是官網(wǎng)demo的bug,不應(yīng)該去數(shù)據(jù)里的timestamp,應(yīng)該取的是用前一個(gè)watermark來(lái)注冊(cè)timer。
使用WaterMark注冊(cè)Timer
我們修改下CountWithTimeoutFunction里的processElement代碼,并加上些日志輸出,代碼如下:
@Override
public void processElement(UserAction value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
...
// 取前一個(gè)watermark,基于前一個(gè)watermark的60s后,觸發(fā)onTimer計(jì)算。而不是基于timestamp。
long prevWaterMark = ctx.timerService().currentWatermark();
LOG.info("前一個(gè)watermark: {}({})", prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
current.lastModified = prevWaterMark;
// write the state back
state.update(current);
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
CountWithTimestamp result = state.value();
long prevWaterMark = result.lastModified;
LOG.info("onTimer觸發(fā)時(shí)間: {}({}), 前一個(gè)watermark: {}({})", timestamp, DateFormatUtils.format(timestamp, "yyyy-MM-dd HH:mm:ss"), prevWaterMark, DateFormatUtils.format(prevWaterMark, "yyyy-MM-dd HH:mm:ss"));
Tuple1<String> key = (Tuple1<String>) ctx.getCurrentKey();
if (timestamp == result.lastModified + 60000) {
out.collect(new Tuple2<>(key.f0, result.count));
}
}
代碼里關(guān)鍵的部分是long prevWaterMark = ctx.timerService().currentWatermark();,我們應(yīng)該取前一個(gè)watermark,基于前一個(gè)watermark的60s后,觸發(fā)onTimer計(jì)算。而不是基于timestamp。
然后我們依次發(fā)送測(cè)試數(shù)據(jù):
shizc,2020-06-04 16:25:00
shizc,2020-06-04 16:26:00
shizc,2020-06-04 16:28:00
得到的輸出如下:
前一個(gè)watermark: -10000(1970-01-01 07:59:50)
onTimer觸發(fā)時(shí)間: 50000(1970-01-01 08:00:50), 前一個(gè)watermark: -10000(1970-01-01 07:59:50)
(shizc,1)
前一個(gè)watermark: 1591259090000(2020-06-04 16:24:50)
onTimer觸發(fā)時(shí)間: 1591259150000(2020-06-04 16:25:50), 前一個(gè)watermark: 1591259090000(2020-06-04 16:24:50)
(shizc,2)
前一個(gè)watermark: 1591259150000(2020-06-04 16:25:50)
onTimer觸發(fā)時(shí)間: 1591259210000(2020-06-04 16:26:50), 前一個(gè)watermark: 1591259150000(2020-06-04 16:25:50)
(shizc,3)
通過日志可以看出,數(shù)據(jù)已經(jīng)是正常輸出了。
使用ProccessTime注冊(cè)Timer
接著來(lái)看下基于ProccessTimer注冊(cè)Timer和基于EventTime的Timer有什么不同:
public void registerProcessingTimeTimer(N namespace, long time) {
//取出head,
InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
//如果head不為空,則取head的timestamp作為下一個(gè)triggertime
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
// 查檢當(dāng)前的time是否小于nextTrigger,如果小于則直接把當(dāng)前的時(shí)間設(shè)置定時(shí)任務(wù)。
if (time < nextTriggerTime) {
if (nextTimer != null) {
nextTimer.cancel(false);
}
// 真正設(shè)置定時(shí)任務(wù)
nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
}
}
}
public void registerEventTimeTimer(N namespace, long time) {
//直接加入到隊(duì)列,等待下一個(gè)watermark到達(dá)后,再取出判斷執(zhí)行。
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
可以看出proccessTime和eventTime的定時(shí)任務(wù)還是有很大差別的,proccessTime是先與隊(duì)首判斷,如果小于隊(duì)首,則直接設(shè)置定時(shí)任務(wù),否則加入隊(duì)列(里面具體沒看,不過應(yīng)該是有排序的);而eventTime的是直接加入到隊(duì)列,等待下一個(gè)watermark的觸發(fā),再取出判斷執(zhí)行。
proccessTimeTimer的執(zhí)行邏輯是從頭開始執(zhí)行的,當(dāng)你注冊(cè)了三個(gè)timer,并不是已經(jīng)都為三個(gè)timer設(shè)置了定時(shí)任務(wù),而是先為時(shí)間最小的那個(gè)設(shè)置了定時(shí)任務(wù),等待時(shí)間觸發(fā)后,再設(shè)置第二個(gè)定時(shí)任務(wù),如下:
private void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer<K, N> timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
// 真正設(shè)置定時(shí)任務(wù)
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
}
}
所以我們不擔(dān)心timer多,而是擔(dān)心timer多而間隔小。所以基于proccessTime的timer我們可以這樣改進(jìn):
long currentProccessTime = ctx.timerService().currentProcessingTime();
if(current.lastModified == 0 || current.lastModified + 60000 <= currentProccessTime) {
current.lastModified = currentProccessTime;
// write the state back
state.update(current);
ctx.timerService().registerProcessingTimeTimer(current.lastModified + 60000);
} else {
state.update(current);
}
每隔一分鐘設(shè)置一次timer,onTimer()里就不需要在有if (timestamp == result.lastModified + 60000)的判斷了,因?yàn)槲覀円呀?jīng)控制了timer的個(gè)數(shù),同樣基于eventTime的timer也可以這樣改進(jìn)。
運(yùn)行代碼,每隔60s輸出數(shù)據(jù):
onTimer觸發(fā)時(shí)間: 1591344468660(2020-06-05 16:07:48), processingTime: 1591344408660(2020-06-05 16:06:48)
(shizc,2)
onTimer觸發(fā)時(shí)間: 1591344499806(2020-06-05 16:08:19), processingTime: 1591344439806(2020-06-05 16:07:19)
(flink,2)
以上就是我使用ProccessFunction遇到的一些問題,希望對(duì)大家有所幫助,若有不正確的地方歡迎糾正 ~~.
新的發(fā)現(xiàn)
就以上的問題,我在flink jira上提了個(gè)issue: FLINK-19167,我把我遇到的問題描述后,F(xiàn)link大神們也都參與了討論:

最后Dawid Wysakowicz 大神指出了問題的根源: 這個(gè)demo只有在多個(gè)key(多個(gè)window)的情況下才能正確運(yùn)行,單個(gè)key的情況下,會(huì)存在lastModified的值被覆蓋的情況。舉個(gè)例子:
有3條數(shù)據(jù):(key1, 1), (key1,2),(key1,60001), 當(dāng)window收到第一條數(shù)據(jù)時(shí),注冊(cè)一個(gè)基于eventtime的timer,watermark為1 + 6000, 當(dāng)?shù)谌龡l數(shù)據(jù)進(jìn)來(lái)時(shí),先把lastModified更新為60001,此時(shí)觸發(fā)onTimer, timestamp == lastModified,都同為60001,所以 if 的判斷不成立。
如果把(key1, 60001)的key改成key2, 那么前面的key1的所有timer都會(huì)觸發(fā)。是因?yàn)閗ey1的lastModified不會(huì)被更新,但是watermark大于它所注冊(cè)的wartermark,所以會(huì)觸發(fā)。
timestamp: 60001, lastModified: 1
timestamp: 60002, lastModified: 2
(key1,2)
所以由以上得出的結(jié)論是: 基于eventTime的 每一個(gè)key的timer,它們是同一個(gè)流,同一個(gè)流也就會(huì)受到watermark的增長(zhǎng)而影響到(可以想象成 他們?cè)谕粭l時(shí)間軸上)
Flink的大神們也都說(shuō)了,官網(wǎng)的例子的確有沒有考慮到的地方,只是一個(gè)很簡(jiǎn)單的例子,多個(gè)key沒問題,單個(gè)key不生效。