
通俗易懂篇:
前面學習的Window Join必須要在一個Window中進行JOIN,那如果沒有Window如何處理呢?
interval join也是使用相同的key來join兩個流(流A、流B),并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。也就是:流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且流B的元素的時間戳 ≤ 流A的元素時間戳 + 上界。
我們來看Flink官方的一張圖。

我們看到,流A的每一個元素,都會和流B的一定時間范圍的元素進行JOIN。
其中,上界和下界可以是負數(shù),也可以是整數(shù)。Interval join目前只支持INNER JOIN。將連接后的元素傳遞給ProcessJoinFunction時,時間戳變?yōu)閮蓚€元素中最大的那個時間戳。
注意:
Interval Join只支持事件時間。
package com.istudy.work;
import com.istudy.bean.FactOrderItem;
import com.istudy.bean.Goods;
import com.istudy.bean.OrderItem;
import com.istudy.streamsource.GoodsSource;
import com.istudy.streamsource.OrderItemSource;
import com.istudy.watermark.GoodsWatermark;
import com.istudy.watermark.OrderItemWatermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
/**
* @projectname: HaiStream
* @description:
* @author: Mr.Zhang
* @create: 2021-03-14 14:35
**/
public class IntervalJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 構建商品數(shù)據(jù)流
SingleOutputStreamOperator<Goods> goodsDS = env.addSource(new GoodsSource(), TypeInformation.of(Goods.class))
.assignTimestampsAndWatermarks(new GoodsWatermark() {
});
// 構建訂單明細數(shù)據(jù)流
SingleOutputStreamOperator<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class))
.assignTimestampsAndWatermarks(new OrderItemWatermark());
// 進行關聯(lián)查詢
//todo 1、這里我們通過keyBy將兩個流join到一起
SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
//todo 2、interval join需要設置流A去關聯(lián)哪個時間范圍的流B中的元素。
.intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
//todo 此處,我設置的下界為-1、上界為0,
.between(Time.seconds(-1), Time.seconds(0))
//todo 且上界是一個開區(qū)間。表達的意思就是流A中某個元素的時間,對應上一秒的流B中的元素。
.upperBoundExclusive()
//todo process中將兩個key一樣的元素,關聯(lián)在一起,并加載到一個新的FactOrderItem對象中
.process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
@Override
public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
FactOrderItem factOrderItem = new FactOrderItem();
factOrderItem.setGoodsId(right.getGoodsId());
factOrderItem.setGoodsName(right.getGoodsName());
factOrderItem.setCount(new BigDecimal(left.getCount()));
factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));
out.collect(factOrderItem);
}
});
factOrderItemDS.print();
env.execute("Interval JOIN");
}
}

運行結果:

深挖原理篇:
join() 和 coGroup() 都是基于窗口做關聯(lián)的。但是在某些情況下,兩條流的數(shù)據(jù)步調未必一致。例如,訂單流的數(shù)據(jù)有可能在點擊流的購買動作發(fā)生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區(qū)間進行關聯(lián)。interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時間。
按照指定字段以及右流相對左流偏移的時間區(qū)間進行關聯(lián),即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時間。
示例代碼如下。注意在運行之前,需要分別在兩個流上應用 assignTimestampsAndWatermarks() 方法獲取事件時間戳和水印。
clickRecordStream
.keyBy(record -> record.getMerchandiseId())
.intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
.between(Time.seconds(-30), Time.seconds(30))
.process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
@Override
public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
collector.collect(StringUtils.join(Arrays.asList(
accessRecord.getMerchandiseId(),
orderRecord.getPrice(),
orderRecord.getCouponMoney(),
orderRecord.getRebateAmount()
), '\t'));
}
})
.print().setParallelism(1);
由上可見,interval join 與 window join 不同,是兩個 KeyedStream 之上的操作,并且需要調用 between() 方法指定偏移區(qū)間的上下界。如果想令上下界是開區(qū)間,可以調用 upperBoundExclusive()/lowerBoundExclusive() 方法。
interval join 的實現(xiàn)原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法調用的重載方法的邏輯。
public <OUT> SingleOutputStreamOperator<OUT> process(
ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
TypeInformation<OUT> outputType) {
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);
final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator<>(
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);
return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);
}