Flink 使用介紹相關(guān)文檔目錄
前言
概括來說,watermark用于基于event time的流計算系統(tǒng)數(shù)據(jù)流可能發(fā)生亂序的情況。對于event time數(shù)據(jù)流,接收到數(shù)據(jù)的時間和上游產(chǎn)生數(shù)據(jù)的時間是不相關(guān)的,因此可能會出現(xiàn)產(chǎn)生時間較早的數(shù)據(jù)由于網(wǎng)絡(luò)抖動等原因到達(dá)Flink系統(tǒng)較晚的情形。Watermark用于應(yīng)對數(shù)據(jù)亂序的情況。Watermark是數(shù)據(jù)流中的一種特殊數(shù)據(jù),由Flink內(nèi)部周期(可自定義)產(chǎn)生。下游接收到watermark的時候,會認(rèn)為timestamp在watermark之前的數(shù)據(jù)已經(jīng)到齊。針對這些數(shù)據(jù)的運(yùn)算過程可以開始。Watermark之后的數(shù)據(jù)沒有到齊,需要在緩存的同時,等待后續(xù)數(shù)據(jù)的到來。Watermark的生成策略可以實現(xiàn)數(shù)據(jù)亂序的兼容。例如將watermark發(fā)送的時間設(shè)置為當(dāng)前接收到的數(shù)據(jù)的最大timestamp(記為t)減去5s。這樣下游認(rèn)為t-5s之前的數(shù)據(jù)已經(jīng)到齊。t-5s之后的數(shù)據(jù)先緩存起來等待。從而實現(xiàn)容忍“亂序的程度”不超過5s的情形。
更為詳細(xì)的Flink watermark講解參見以下文章:
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/
接下來講解如何配置使用watermark生成策略(WatermarkStrategy)。
配置watermark自動發(fā)送周期
Flink默認(rèn)的watermark自動發(fā)送周期為200ms。Flink支持全局方式和局部方式配置自動發(fā)送周期。
全局配置方式:修改flink-conf.yaml文件,增加或修改
pipeline.auto-watermark-interval(對應(yīng)PipelineOptions.AUTO_WATERMARK_INTERVAL)配置項。
作業(yè)局部修改方式:調(diào)用ExecutionConfig.setAutoWatermarkInterval(...)方法。
env.getConfig().setAutoWatermarkInterval(100);
WatermarkStrategy使用
forMonotonousTimestamps
生成單調(diào)遞增的watermark。數(shù)據(jù)流元素到來的時候不發(fā)送watermark,僅在到達(dá)自動發(fā)送周期的時候才發(fā)送。
這里遞增的意思并非僅僅是watermark時間戳數(shù)值的嚴(yán)格遞增,每次發(fā)送的watermark都是最近接收到的元素攜帶的timestamp。(從元素提取出攜帶的timestamp過程由TimestampAssigner負(fù)責(zé),后面分析)。一種例外情況是如果遇到遲到的數(shù)據(jù)(watermark比前一個元素?。?,這個元素的watermark會被MonotonousTimestamps排除不做記錄,可以保證向下游發(fā)送的watermark是遞增的。
// 使用env創(chuàng)建數(shù)據(jù)源
source = ...
source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forMonotonousTimestamps();
forBoundedOutOfOrderness
上面的單調(diào)遞增方式無法解決元素亂序的問題。這里的BoundedOutOfOrderness是專門為數(shù)據(jù)存在亂序這種場景考慮的。使用時候需要指定一個參數(shù),即最大可容忍的數(shù)據(jù)遲到時間。如果亂序數(shù)據(jù)遲到超過這個時間限制,該數(shù)據(jù)將被忽略。當(dāng)然還可以配置為旁路輸出,參見Flink 使用之?dāng)?shù)據(jù)分流。
BoundedOutOfOrderness實現(xiàn)并不復(fù)雜,基本和上面單調(diào)遞增的方式一致。區(qū)別是在周期發(fā)送watermark的時候,發(fā)送的watermark需要減去最大可容忍的數(shù)據(jù)遲到時間。從而實現(xiàn)了數(shù)據(jù)計算的觸發(fā)時刻向后拖延,在拖延的時間段內(nèi)“等待”亂序數(shù)據(jù)到來。
使用方法如下:
source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forBoundedOutOfOrderness(Duration.ofSeconds(30));
forGenerator
前面兩種watermark generator能夠滿足絕大多數(shù)使用場景。如果仍不能滿足要求,F(xiàn)link提供了創(chuàng)建自定義watermark generator的方式。
這里以Integer類型的數(shù)據(jù)源為例,說明自定義generator的寫法。
.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new WatermarkGeneratorSupplier<Integer>() {
@Override
public WatermarkGenerator<Integer> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<Integer>() {
@Override
public void onEvent(Integer integer, long eventTimestamp, WatermarkOutput watermarkOutput) {
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
}
};
}
}));
在自定義WatermarkGenerator的時候按需實現(xiàn)兩個方法:
- onEvent:接收到元素的時候觸發(fā)。參數(shù)分別為輸入的元素,元素提取出來的timestamp(event time)和控制輸出watermark的對象(后面解釋)。
- onPeriodicEmit:到達(dá)自動發(fā)送watermark周期的時候觸發(fā)。參數(shù)只有一個,和前面的相同。
WatermarkOutput用來像下游發(fā)送watermark,或者控制數(shù)據(jù)輸出。它有三個方法:
- emitWatermark: 發(fā)送watermark到下游。
- markIdle: 標(biāo)記output為空閑狀態(tài)。
- markActive: 標(biāo)記output為活動狀態(tài)。如果output在空閑狀態(tài)發(fā)送了watermark,也會自動標(biāo)記為活動狀態(tài)。
Flink中一個計算步驟可能有多個上游(雙數(shù)據(jù)流或更多),計算步驟會考慮到所有上游的watermark。設(shè)想如果一個流一直不產(chǎn)生watermark,需要等待這個流的數(shù)據(jù)呢還是這個流目前就沒有數(shù)據(jù)可以忽略?Flink不好判斷。為了解決這個問題引入了idle(空閑)機(jī)制。如果一個數(shù)據(jù)源標(biāo)記了空閑狀態(tài),下游計算的時候不會考慮這個數(shù)據(jù)源的watermark。未能正確處理數(shù)據(jù)源的idle狀態(tài)會導(dǎo)致Flink整個計算過程的阻塞。務(wù)必要注意這一點(diǎn)。
noWatermarks
不生成任何watermark。
source.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());
withTimestampAssigner
該方法用來配置如何從元素中抽取出watermark。例如到來的數(shù)據(jù)包含數(shù)據(jù)生成時的timestamp,格式為Tuple2<String, Long>類型,值為("Hello", 1690437024)。我們可以獲取元素的第二個字段作為Flink內(nèi)部的timestamp使用。
withTimestampAssigner使用方法如下所示:
source.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forMonotonousTimestamps().withTimestampAssigner((element, timestamp) -> {
// element: 到來的元素
// timestamp:上游為元素指定的timestamp,通常為數(shù)據(jù)源產(chǎn)生的timestamp
// 需要編寫自己的抽取邏輯
// 返回抽取出的timestamp
}));
withIdleness
上面forGenerator章節(jié)提到了idle問題。大家可能會問:有沒有一種常見的策略可以自動標(biāo)記idle狀態(tài)?比如數(shù)據(jù)流持續(xù)一段時間沒有數(shù)據(jù)到來的時候自動標(biāo)記為idle狀態(tài)。withIdleness正好是這種策略。它對watermark generator做了包裝。用戶在其中不需要再去編寫何時標(biāo)記idle的邏輯。
withIdleness的用法如下所示:
.assignTimestampsAndWatermarks(WatermarkStrategy.<元素類型>forBoundedOutOfOrderness(Duration.ofSeconds(30)).withIdleness(Duration.ofSeconds(5)));
WatermarkStrategy源代碼分析
forBoundedOutOfOrderness
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
上面創(chuàng)建了BoundedOutOfOrdernessWatermarks。繼續(xù)查看它的代碼。分析內(nèi)容在注釋中。
@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
// 暫存最大的timestamp
// 發(fā)送的timestamp一定是遞增(或者大小不變)的
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. */
// 最大可容忍的數(shù)據(jù)遲到的時間范圍(亂序程度)
private final long outOfOrdernessMillis;
/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// 接收到元素的時候,更新maxTimestamp
// 如果接收到遲到的元素(eventTimestamp比maxTimestamp?。?,忽略不更新,確保timestamp遞增
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 在發(fā)送watermark時間周期到來的時候,發(fā)送watermark
// 發(fā)送的watermark需要減去outOfOrdernessMillis
// 含義是讓下游認(rèn)為maxTimestamp - outOfOrdernessMillis - 1之前的數(shù)據(jù)已經(jīng)到齊
// 只有認(rèn)為到齊的數(shù)據(jù)參會參與計算,未到齊的數(shù)據(jù)會緩存等待
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
forMonotonousTimestamps
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
這里創(chuàng)建了AscendingTimestampsWatermarks。它繼承了BoundedOutOfOrdernessWatermarks,代碼如下所示。
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/** Creates a new watermark generator with for ascending timestamps. */
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
看到構(gòu)造方法很容易明白,AscendingTimestampsWatermarks是一種不容忍任何數(shù)據(jù)遲到的BoundedOutOfOrdernessWatermarks。
withIdleness
default WatermarkStrategy<T> withIdleness(Duration idleTimeout) {
checkNotNull(idleTimeout, "idleTimeout");
checkArgument(
!(idleTimeout.isZero() || idleTimeout.isNegative()),
"idleTimeout must be greater than zero");
return new WatermarkStrategyWithIdleness<>(this, idleTimeout);
}
繼續(xù)查看WatermarkStrategyWithIdleness的createWatermarkGenerator方法:
@Override
public WatermarkGenerator<T> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarksWithIdleness<>(
baseStrategy.createWatermarkGenerator(context), idlenessTimeout);
}
創(chuàng)建出的watermark generator為WatermarksWithIdleness。該類使用了裝飾器模式。在不改變原有watermark generator的基礎(chǔ)之上增加了標(biāo)記idle的能力。它有三個成員變量。
// 包裝的watermark生成器
private final WatermarkGenerator<T> watermarks;
// idle定時器,用來判斷是否idle
private final IdlenessTimer idlenessTimer;
// 狀態(tài)標(biāo)記,目前是否處于idle狀態(tài)
private boolean isIdleNow = false;
繼續(xù)分析onEvent和onPeriodicEmit方法:
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// 調(diào)用被包裝watermark generator的onEvent方法
watermarks.onEvent(event, eventTimestamp, output);
// 告知idlenessTimer有活動發(fā)生
idlenessTimer.activity();
// 標(biāo)記空閑狀態(tài)為false
isIdleNow = false;
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
if (idlenessTimer.checkIfIdle()) {
// 檢查空閑狀態(tài),如果為空閑
if (!isIdleNow) {
// 如果當(dāng)前狀態(tài)不是空閑
// 說明剛從活動狀態(tài)變?yōu)榭臻e狀態(tài)
// 標(biāo)記為idle狀態(tài)
output.markIdle();
// 記錄空閑狀態(tài)為true
isIdleNow = true;
}
} else {
// 如果不是idle狀態(tài),調(diào)用包裝的watermark generator的onPeriodicEmit方法
watermarks.onPeriodicEmit(output);
}
}
最后的問題就是IdlenessTimer是怎么判斷是否idle的。我們繼續(xù)分析它的構(gòu)造函數(shù),activity方法和checkIfIdle方法。
IdlenessTimer(Clock clock, Duration idleTimeout) {
// 獲取時鐘
this.clock = clock;
long idleNanos;
// 將idle超時時間轉(zhuǎn)換為納秒保存
try {
idleNanos = idleTimeout.toNanos();
} catch (ArithmeticException ignored) {
// long integer overflow
idleNanos = Long.MAX_VALUE;
}
this.maxIdleTimeNanos = idleNanos;
}
public void activity() {
// 內(nèi)部有個計數(shù)器,只要有活動,該計數(shù)器自增1
// counter是long類型,即便是自增溢出了,也不會影響
counter++;
}
public boolean checkIfIdle() {
if (counter != lastCounter) {
// lastCounter為最近一次counter計數(shù)
// 如果不等,說明期間有活動
// 這里不寫if (counter > lastCounter)的原因是兼容counter溢出的情況
// activity since the last check. we reset the timer
// 更新lastCounter,重設(shè)計時器
lastCounter = counter;
startOfInactivityNanos = 0L;
return false;
} else // timer started but has not yet reached idle timeout
if (startOfInactivityNanos == 0L) {
// first time that we see no activity since the last periodic probe
// begin the timer
// 首次發(fā)現(xiàn)counter沒有更新,即沒有活動,啟用計時器
startOfInactivityNanos = clock.relativeTimeNanos();
return false;
} else {
// 如果當(dāng)前時間和計時器時間差超過maxIdleTimeNanos,說明處于空閑狀態(tài)
return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;
}
}
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。