- Window Join
Tumbling Window Join
Sliding Window Join
Session Window Join- Interval Join
- CoGroup
Window Join and CoGroup
- Window Join 是基于時間窗口對兩個流進行關(guān)聯(lián)操作。
- 相比于 Join 操作, CoGroup 提供了一個更為通用的方式來處理兩個流在相同的窗口內(nèi)匹配的元素。 Join 復(fù)用了 CoGroup 的實現(xiàn)邏輯。它們的使用方式如下:
//join
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
//coGroup
stream.coGroup(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<CoGroupFunction>)
從 JoinFunction 和 CogroupFunction 接口的定義中可以大致看出它們的區(qū)別:
public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT join(IN1 first, IN2 second) throws Exception;
}
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
可以看出來,JoinFunction 主要關(guān)注的是兩個流中按照 key 匹配的每一對元素,而 CoGroupFunction 的參數(shù)則是兩個中 key 相同的所有元素。JoinFunction 的邏輯更類似于 INNER JOIN,而 CoGroupFunction 除了可以實現(xiàn) INNER JOIN,也可以實現(xiàn) OUTER JOIN
Window Join分為三種, Tumbing Window join、Sliding Window join、Session Window Join
Window 類型的join的實現(xiàn)機制,通過將數(shù)據(jù)緩存在Window State中,當(dāng)窗口觸發(fā)計算是,執(zhí)行join操作
public class JoinedStreams<T1, T2> {
public static class WithWindow<T1, T2, KEY, W extends Window> {
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
//Join 操作被轉(zhuǎn)換為 CoGroup
coGroupedWindowedStream = input1.coGroup(input2)
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.allowedLateness(allowedLateness);
//JoinFunction 被包裝為 CoGroupFunction
return coGroupedWindowedStream
.apply(new JoinCoGroupFunction<>(function), resultType);
}
}
/**
* CoGroup function that does a nested-loop join to get the join result.
*/
private static class JoinCoGroupFunction<T1, T2, T>
extends WrappingFunction<JoinFunction<T1, T2, T>>
implements CoGroupFunction<T1, T2, T> {
private static final long serialVersionUID = 1L;
public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) {
super(wrappedFunction);
}
@Override
public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
for (T1 val1: first) {
for (T2 val2: second) {
//每一個匹配的元素對
out.collect(wrappedFunction.join(val1, val2));
}
}
}
}
}
那么 CoGroup 又是怎么實現(xiàn)兩個流的操作的呢?Flink 其實是通過一個變換,將兩個流轉(zhuǎn)換成一個流進行處理,轉(zhuǎn)換之后數(shù)據(jù)流中的每一條消息都有一個標(biāo)記來記錄這個消息是屬于左邊的流還是右邊的流,這樣窗口的操作就和單個流的實現(xiàn)一樣了。等到窗口被觸發(fā)的時候,再按照標(biāo)記將窗口內(nèi)的元素分為左邊的一組和右邊的一組,然后交給 CoGroupFunction 進行處理
public class CoGroupedStreams<T1, T2> {
public static class WithWindow<T1, T2, KEY, W extends Window> {
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);
DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
.map(new Input1Tagger<T1, T2>())
.setParallelism(input1.getParallelism())
.returns(unionType); //左邊流
DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
.map(new Input2Tagger<T1, T2>())
.setParallelism(input2.getParallelism())
.returns(unionType); //右邊流
//合并成一個數(shù)據(jù)流
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
windowedStream.trigger(trigger);
}
if (evictor != null) {
windowedStream.evictor(evictor);
}
if (allowedLateness != null) {
windowedStream.allowedLateness(allowedLateness);
}
return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
}
//將 CoGroupFunction 封裝為 WindowFunction
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
extends WrappingFunction<CoGroupFunction<T1, T2, T>>
implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {
public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
super(userFunction);
}
@Override
public void apply(KEY key,
W window,
Iterable<TaggedUnion<T1, T2>> values,
Collector<T> out) throws Exception {
List<T1> oneValues = new ArrayList<>();
List<T2> twoValues = new ArrayList<>();
//窗口內(nèi)的所有元素按標(biāo)記重新分為左邊的一組和右邊的一組
for (TaggedUnion<T1, T2> val: values) {
if (val.isOne()) {
oneValues.add(val.getOne());
} else {
twoValues.add(val.getTwo());
}
}
//調(diào)用 CoGroupFunction
wrappedFunction.coGroup(oneValues, twoValues, out);
}
}
}
Connected Streams
Window Join 可以方便地對兩個數(shù)據(jù)流進行關(guān)聯(lián)操作。但有些使用場景中,我們需要的并非關(guān)聯(lián)操作,ConnectedStreams 提供了更為通用的雙流操作
ConnectedStreams 配合 CoProcessFunction 或 KeyedCoProcessFunction 使用,KeyedCoProcessFunction 要求連接的兩個 stream 都是 KeyedStream,并且 key 的類型一致。
ConnectedStreams 配合 CoProcessFunction 生成 CoProcessOperator,在運行時被調(diào)度為 TwoInputStreamTask,從名字也可以看書來,這個 Task 處理的是兩個輸入。我們簡單看一下 CoProcessOperator 的實現(xiàn)
public class CoProcessOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement1(element.getValue(), context, collector);
context.element = null;
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement2(element.getValue(), context, collector);
context.element = null;
}
}
CoProcessOperator 內(nèi)部區(qū)分了兩個流的處理,分別調(diào)用 CoProcessFunction.processElement1() 和 userFunction.processElement2() 進行處理。對于 KeyedCoProcessOperator 也是類似的機制。
通過內(nèi)部的共享狀態(tài),可以在雙流上實現(xiàn)很多復(fù)雜的操作。接下來我們就介紹 Flink 基于 Connected Streams 實現(xiàn)的另一種雙流關(guān)聯(lián)操作 - Interval Join。
Interval Join

默認情況下,這些是包含邊界的,但是可以通過.lowerboundexclusive()和. upperboundexclusive()進行設(shè)置,如果設(shè)置了,則不包含邊界
stream
.keyBy(<KeySelector>)
.intervalJoin(otherStream.keyBy(<KeySelector>))
.between(<Time>,<Time>)
.process(<ProcessJoinFunction>)
Interval Join 是基于 ConnectedStreams 實現(xiàn)的:
public class KeyedStream<T, KEY> extends DataStream<T> {
public static class IntervalJoined<IN1, IN2, KEY> {
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}
}
}
在 IntervalJoinOperator 中,使用兩個 MapState 分別保存兩個數(shù)據(jù)流到達的消息,MapState 的 key 是消息的時間。當(dāng)一個數(shù)據(jù)流有新消息到達時,就會去另一個數(shù)據(jù)流的狀態(tài)中查找時間落在匹配范圍內(nèi)的消息,然后進行關(guān)聯(lián)處理。每一條消息會注冊一個定時器,在時間越過該消息的有效范圍后從狀態(tài)中清除該消息。
public class IntervalJoinOperator<K, T1, T2, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {
//左流的狀態(tài)buffer
private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
//右流的狀態(tài)buffer
private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;
@Override
public void processElement1(StreamRecord<T1> record) throws Exception {
//處理左流元素,processElement參數(shù)列表最后一位代表是否是左流元素,用于區(qū)分
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
}
@Override
public void processElement2(StreamRecord<T2> record) throws Exception {
//處理左流元素
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
}
private <THIS, OTHER> void processElement(
final StreamRecord<THIS> record,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception {
final THIS ourValue = record.getValue();
//獲取數(shù)據(jù)的eventtime時間
final long ourTimestamp = record.getTimestamp();
if (ourTimestamp == Long.MIN_VALUE) {
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");
}
// 判斷數(shù)據(jù)的event time是否小于水印,小于丟棄
if (isLate(ourTimestamp)) {
return;
}
//將消息加入狀態(tài)中,MapState的key為當(dāng)前消息的時間戳
addToBuffer(ourBuffer, ourValue, ourTimestamp);
//從另一個數(shù)據(jù)流的狀態(tài)中查找匹配的記錄,遍歷mapstate的數(shù)據(jù)
for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
final long timestamp = bucket.getKey();
//判斷bucket的時間是否在
消息時間+LowerBound < key<消息時間+UpperBound
if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound) {
continue;
}
//將bucket中的數(shù)據(jù)取出,傳遞到下游
for (BufferEntry<OTHER> entry: bucket.getValue()) {
if (isLeft) {
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
} else {
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
}
}
}
//注冊清理狀態(tài)的timer,水印超過cleanupTime 觸發(fā)
long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft) {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
} else {
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
}
}
}
//定時器觸發(fā)的回調(diào)函數(shù)
@Override
public void onEventTime(InternalTimer<K, String> timer) throws Exception {
long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();
logger.trace("onEventTime @ {}", timerTimestamp);
// 通過namespace判斷是左流的狀態(tài)還是右流的狀態(tài)
// 注意區(qū)分左右的清除邏輯,因為左右流的到來是有先后順序的
switch (namespace) {
case CLEANUP_NAMESPACE_LEFT: {
//左流先到,定時upperBound時間后清理
long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ {}", timestamp);
leftBuffer.remove(timestamp);
break;
}
case CLEANUP_NAMESPACE_RIGHT: {
//右流是晚來的數(shù)據(jù)不需要等待,當(dāng)watermark大于數(shù)據(jù)時間就可以清理掉
long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ {}", timestamp);
rightBuffer.remove(timestamp);
break;
}
default:
throw new RuntimeException("Invalid namespace " + namespace);
}
}
參考
https://blog.csdn.net/u013516966/article/details/102952239
https://blog.jrwang.me/2019/flink-source-code-two-stream-join/
https://mp.weixin.qq.com/s/MoIS0qQlvk6N_hnQU6r2SA