1、配置時(shí)間特性
- ProcessingTime,指算子根據(jù)處理機(jī)器的系統(tǒng)時(shí)鐘決定數(shù)據(jù)流當(dāng)前的時(shí)間。無(wú)須等待水位線來(lái)驅(qū)動(dòng)事件時(shí)間前進(jìn)。
- EventTime,蒜子根據(jù)數(shù)據(jù)自身包含的信息決定當(dāng)前時(shí)間。依靠水位線聲明某個(gè)時(shí)間間隔內(nèi)所有時(shí)間戳都已經(jīng)接受時(shí),事件時(shí)間窗口才觸發(fā)
- IngestionTime,指定每個(gè)接受的記錄都把數(shù)據(jù)源算子的處理時(shí)間作為事件時(shí)間的時(shí)間戳并自動(dòng)生成水位線。
默認(rèn)情況下是處理時(shí)間,設(shè)置其他時(shí)間特性使用
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
1.1 分配時(shí)間戳和生成水位線
- 水位線是當(dāng)前事件攜帶的一個(gè)時(shí)間戳,用來(lái)觸發(fā)計(jì)時(shí)器(比如窗口),通常比當(dāng)前事件的時(shí)間早一些,告知系統(tǒng)該事件來(lái)時(shí),只處理更早一個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù)。
周期性水位分配器
- 系統(tǒng)默認(rèn)200毫秒發(fā)一個(gè)水位線
- Flink會(huì)根據(jù)設(shè)定的間隔,調(diào)用getCurrentWatermark()方法,如果該方法返回值非空,且它的時(shí)間戳大于上一個(gè)水位線的時(shí)間戳,那么算子就會(huì)發(fā)出一個(gè)新的水位線
- 也可以使用BoundedOutOfOrdernessTimeStampExtractor來(lái)替代下面這段
env.getConfig().setAutoWatermarkInterval(100);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
// 亂序數(shù)據(jù)設(shè)置時(shí)間戳和watermark
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private final long maxOutBoundary = 3 * 1000L;
private long currentMaxTimestamp = Integer.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutBoundary);
}
@Override
public long extractTimestamp(SensorReading sensorReading, long l) {
//獲取當(dāng)前記錄的時(shí)間戳
long currentTs = sensorReading.getTimestamp() * 1000L;
// 更新最大的時(shí)間戳
currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
// 返回記錄的時(shí)間戳
return currentTs;
}
});
定點(diǎn)水位線分配器
- 接口中的checkAndGetNextWatermark()方法會(huì)在針對(duì)每個(gè)事件的extractTimestamp()方法后立刻調(diào)用。決定是否生成新水位線,如果該方法返回一個(gè)非空、且大于之前值的水位線。算子就會(huì)將這個(gè)新水位線發(fā)出
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
1.2 水位線、延遲及完整性
- 水位線可用于平衡延遲和結(jié)果的完整性。
- 它們控制著在執(zhí)行某些計(jì)算前需要等待數(shù)據(jù)到達(dá)的時(shí)間。
2、處理函數(shù)
- 處理函數(shù)可以訪問(wèn)記錄的時(shí)間戳和水位線,并支持注冊(cè)在將來(lái)某個(gè)特定時(shí)間出發(fā)的計(jì)時(shí)器
2.1 處理函數(shù)接口
- 以KeyedProcessFunction為例,其中processElement會(huì)對(duì)每個(gè)到來(lái)的記錄進(jìn)行處理,其中的Context可以訪問(wèn)到水位等信息,第一個(gè)參數(shù)是輸入,第三個(gè)是輸出。
- 在processElement中可以注冊(cè)計(jì)數(shù)器registerProcessingTimeTimer,然后在onTimer這里寫觸發(fā)時(shí)干什么
dataStream.keyBy("id")
.process( new MyProcess() )
.print();
public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
ValueState<Long> tsTimerState;
@Override
public void open(Configuration parameters) throws Exception {
tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.getId().length());
// context
// Timestamp of the element currently being processed or timestamp of a firing timer.
ctx.timestamp();
// Get key of the element being processed.
ctx.getCurrentKey();
// ctx.output();
ctx.timerService().currentProcessingTime();
ctx.timerService().currentWatermark();
// 在5處理時(shí)間的5秒延遲后觸發(fā)
ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
// ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
// 刪除指定時(shí)間觸發(fā)的定時(shí)器
// ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
System.out.println(timestamp + " 定時(shí)器觸發(fā)");
ctx.getCurrentKey();
// ctx.output();
ctx.timeDomain();
}
@Override
public void close() throws Exception {
tsTimerState.clear();
}
}
2.2 時(shí)間服務(wù)和計(jì)時(shí)器
- Context和onTimerContext對(duì)象中的TimerService提供了一下方法:
- currentProcessingTime()
- currentWatermark()
- registerProcessingTimeTimer(long timestamp)
- registerEventTimeTimer(long timestamp)
- deleteProcessingTimeTimer(long timestamp)
- deleteEventTimeTimer(long timestamp)
- 計(jì)時(shí)器處罰會(huì)調(diào)用onTimer()回調(diào)函數(shù)
- 系統(tǒng)對(duì)于processElement()和onTimer()兩個(gè)方法的調(diào)用時(shí)同步的
- 每個(gè)鍵值可以有多個(gè)計(jì)時(shí)器,但具體到每個(gè)時(shí)間戳只能有一個(gè)。
- 默認(rèn)會(huì)將計(jì)時(shí)器的時(shí)間戳放到一個(gè)優(yōu)先隊(duì)列中。
- 所有計(jì)時(shí)器會(huì)和其他狀態(tài)一起寫入檢查點(diǎn)。
2.3 副輸出
- 可以發(fā)出多條數(shù)據(jù)流
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判斷溫度,大于30度,高溫流輸出到主流;小于低溫流輸出到側(cè)輸出流
if (value.getTemperature() > 30) {
out.collect(value);
} else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.getSideOutput(lowTempTag).print("low-temp");
3、窗口算子
- 基于時(shí)間的窗口分配器會(huì)根據(jù)元素時(shí)間的時(shí)間戳或當(dāng)前處理時(shí)間將其分配到一個(gè)或多個(gè)窗口,每個(gè)時(shí)間窗口都有一個(gè)開(kāi)始時(shí)間戳和結(jié)束時(shí)間戳
- 所有內(nèi)置的窗口分配器都提供了一個(gè)默認(rèn)的觸發(fā)器,一旦時(shí)間超過(guò)了窗口的結(jié)束時(shí)間就會(huì)觸發(fā)窗口計(jì)算
- 時(shí)間區(qū)間是左閉右開(kāi)的
3.1 滾動(dòng)窗口
滾動(dòng)窗口不會(huì)重疊,滑動(dòng)窗口可以重疊,會(huì)話窗口是根據(jù)沒(méi)有收到信息的間隔來(lái)劃定窗口。滾動(dòng)窗口比較常用。https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_622-timewindow
這里使用.window就可以創(chuàng)建默認(rèn)窗口了,類型為TimeWindow。
DataStream<Integer> resultStream = dataStream.keyBy("id")
.window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
3.2 在窗口上應(yīng)用的函數(shù)
aggregateFunction
- 以增量方式應(yīng)用于窗口內(nèi)的元素,狀態(tài)只有一個(gè)值
- 第一個(gè)參數(shù)是輸入,第三個(gè)是輸出,第二個(gè)是累加用于存儲(chǔ)中間值的類型
DataStream<Integer> resultStream = dataStream.keyBy("id")
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
// 新建的累加器
@Override
public Integer createAccumulator() {
return 0;
}
// 每個(gè)數(shù)據(jù)在上次的基礎(chǔ)上累加
@Override
public Integer add(SensorReading value, Integer accumulator) {
return accumulator + 1;
}
// 返回結(jié)果值
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
// 分區(qū)合并結(jié)果(TimeWindow一般用不到,SessionWindow可能需要考慮合并)
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
ProcessWindowFunction
- 就是窗口的處理函數(shù),可以獲取到環(huán)境信息
SingleOutputStreamOperator<SensorReading> minTempStream = keyStream
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private final long maxOutBoundary = 4 * 1000L;
private long currentMaxTimestamp = Integer.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutBoundary);
}
@Override
public long extractTimestamp(SensorReading sensorReading, long l) {
//獲取當(dāng)前記錄的時(shí)間戳
long currentTs = sensorReading.getTimestamp() * 1000L;
// 更新最大的時(shí)間戳
currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
// 返回記錄的時(shí)間戳
return currentTs;
}
})
.keyBy(SensorReading::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(new ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void process(String s, ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>.Context context, Iterable<SensorReading> iterable, Collector<SensorReading> collector) throws Exception {
System.out.println(context.currentWatermark());
}
});
3.3 自定義窗口算子
- 這個(gè)就是自定義窗口,不實(shí)用現(xiàn)成的窗口函數(shù)
- 主要的幾個(gè)組件是分配器、觸發(fā)器和移除器

窗口相關(guān)組件流程
分配器 windowAssigner
- 會(huì)返回0個(gè)或多個(gè)窗口對(duì)象
- 會(huì)提供默認(rèn)的觸發(fā)器
new WindowAssigner<SensorReading, Window>() {
@Override
public Collection<Window> assignWindows(SensorReading sensorReading, long l, WindowAssignerContext windowAssignerContext) {
// long starttime = l - (l % windowsize);
// long endtime = starttime + endtime;
// collector.singletonList(new TimeWindow(starttime, endtime));
}
@Override
public Trigger<SensorReading, Window> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
return EventTimeTrigger.create();
}
@Override
public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
return null;
}
@Override
public boolean isEventTime() {
return false;
}
}
觸發(fā)器
- 定義何時(shí)對(duì)窗口進(jìn)行計(jì)算并發(fā)出結(jié)果
- 默認(rèn)觸發(fā)器會(huì)在處理時(shí)間或水位線超過(guò)了窗口結(jié)束邊界的時(shí)間戳?xí)r觸發(fā)
- 每次觸發(fā)器調(diào)用都會(huì)生成一個(gè)TriggerResult,可以是以下幾個(gè)值:CONTINUE(什么都不做)、FIRE(發(fā)出結(jié)果)、PURGE(清楚窗口內(nèi)容并刪除窗口自身及元數(shù)據(jù))、FIRE_AND_PURGE
- trigger本身還可以操縱context去注冊(cè)計(jì)時(shí)器(是計(jì)時(shí)器不是觸發(fā)器,就是ontimer的那個(gè)東西)
SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
.filter(data -> "pv".equals(data.getBehavior()))
.timeWindowAll(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountResultWithBloomFliter());
public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
@Override
public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
// 每一條數(shù)據(jù)來(lái)到,直接觸發(fā)窗口計(jì)算,并且直接清空窗口
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
移除器
- 用于在窗口執(zhí)行計(jì)算前或后從窗口中刪除元素
new Evictor<SensorReading, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
@Override
public void evictAfter(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
}
}
4、基于時(shí)間的雙流Join
- 基于時(shí)間間隔的Join,對(duì)兩條流中擁有相同鍵值以及彼此之間時(shí)間戳不超過(guò)某一指定間隔的事件進(jìn)行Join
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
- 基于窗口的join
input1.join(input2)
.where() // 左流限制條件
.equalTo() // 右流限制條件
.window()
.apply() // 具體join的joinFunction
5、處理遲到的數(shù)據(jù)
5.1 用副輸出收集遲到的數(shù)據(jù)
OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
};
// 基于事件時(shí)間的開(kāi)窗聚合,統(tǒng)計(jì)15秒內(nèi)溫度的最小值
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.sideOutputLateData(outputTag)
.minBy("temperature");
minTempStream.print("minTemp");
minTempStream.getSideOutput(outputTag).print("late");
env.execute();
5.2 基于遲到數(shù)據(jù)更新結(jié)果
- allowedLateness 指定這個(gè)之后水位線超過(guò)窗口結(jié)束時(shí)間仍會(huì)保留數(shù)據(jù) 這里指定時(shí)間這么久
- 會(huì)觸發(fā)多次計(jì)算,水位線到了會(huì)觸發(fā),遲到數(shù)據(jù)來(lái)了也會(huì)觸發(fā)
SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");