Flink實戰(zhàn)雙流join之interval Join

Apache Flink.png

通俗易懂篇:

前面學習的Window Join必須要在一個Window中進行JOIN,那如果沒有Window如何處理呢?

interval join也是使用相同的key來join兩個流(流A、流B),并且流B中的元素中的時間戳,和流A元素的時間戳,有一個時間間隔。也就是:流B的元素的時間戳 ≥ 流A的元素時間戳 + 下界,且流B的元素的時間戳 ≤ 流A的元素時間戳 + 上界。

我們來看Flink官方的一張圖。

image

我們看到,流A的每一個元素,都會和流B的一定時間范圍的元素進行JOIN。
其中,上界和下界可以是負數(shù),也可以是整數(shù)。Interval join目前只支持INNER JOIN。將連接后的元素傳遞給ProcessJoinFunction時,時間戳變?yōu)閮蓚€元素中最大的那個時間戳。

注意:
Interval Join只支持事件時間。

package com.istudy.work;

import com.istudy.bean.FactOrderItem;
import com.istudy.bean.Goods;
import com.istudy.bean.OrderItem;
import com.istudy.streamsource.GoodsSource;
import com.istudy.streamsource.OrderItemSource;
import com.istudy.watermark.GoodsWatermark;
import com.istudy.watermark.OrderItemWatermark;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.math.BigDecimal;

/**
 * @projectname: HaiStream
 * @description:
 * @author: Mr.Zhang
 * @create: 2021-03-14 14:35
 **/
public class IntervalJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 構建商品數(shù)據(jù)流
        SingleOutputStreamOperator<Goods> goodsDS = env.addSource(new GoodsSource(), TypeInformation.of(Goods.class))
                .assignTimestampsAndWatermarks(new GoodsWatermark() {
                });
        // 構建訂單明細數(shù)據(jù)流
        SingleOutputStreamOperator<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class))
                .assignTimestampsAndWatermarks(new OrderItemWatermark());

        // 進行關聯(lián)查詢
        //todo 1、這里我們通過keyBy將兩個流join到一起
        SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                //todo 2、interval join需要設置流A去關聯(lián)哪個時間范圍的流B中的元素。
                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                //todo 此處,我設置的下界為-1、上界為0,
                .between(Time.seconds(-1), Time.seconds(0))
                //todo  且上界是一個開區(qū)間。表達的意思就是流A中某個元素的時間,對應上一秒的流B中的元素。
                .upperBoundExclusive()
                //todo process中將兩個key一樣的元素,關聯(lián)在一起,并加載到一個新的FactOrderItem對象中
                .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                    @Override
                    public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(right.getGoodsId());
                        factOrderItem.setGoodsName(right.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(left.getCount()));
                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));

                        out.collect(factOrderItem);
                    }
                });

        factOrderItemDS.print();

        env.execute("Interval JOIN");
    }
}
image.png

運行結果:


image.png

深挖原理篇:

join() 和 coGroup() 都是基于窗口做關聯(lián)的。但是在某些情況下,兩條流的數(shù)據(jù)步調未必一致。例如,訂單流的數(shù)據(jù)有可能在點擊流的購買動作發(fā)生之后很久才被寫入,如果用窗口來圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的語義,按照指定字段以及右流相對左流偏移的時間區(qū)間進行關聯(lián)。interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時間。

按照指定字段以及右流相對左流偏移的時間區(qū)間進行關聯(lián),即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]

image.png

interval join 也是 inner join,雖然不需要開窗,但是需要用戶指定偏移區(qū)間的上下界,并且只支持事件時間。
示例代碼如下。注意在運行之前,需要分別在兩個流上應用 assignTimestampsAndWatermarks() 方法獲取事件時間戳和水印。

clickRecordStream
  .keyBy(record -> record.getMerchandiseId())
  .intervalJoin(orderRecordStream.keyBy(record -> record.getMerchandiseId()))
  .between(Time.seconds(-30), Time.seconds(30))
  .process(new ProcessJoinFunction<AnalyticsAccessLogRecord, OrderDoneLogRecord, String>() {
    @Override
    public void processElement(AnalyticsAccessLogRecord accessRecord, OrderDoneLogRecord orderRecord, Context context, Collector<String> collector) throws Exception {
      collector.collect(StringUtils.join(Arrays.asList(
        accessRecord.getMerchandiseId(),
        orderRecord.getPrice(),
        orderRecord.getCouponMoney(),
        orderRecord.getRebateAmount()
      ), '\t'));
    }
  })
  .print().setParallelism(1);

由上可見,interval join 與 window join 不同,是兩個 KeyedStream 之上的操作,并且需要調用 between() 方法指定偏移區(qū)間的上下界。如果想令上下界是開區(qū)間,可以調用 upperBoundExclusive()/lowerBoundExclusive() 方法。

interval join 的實現(xiàn)原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法調用的重載方法的邏輯。

public <OUT> SingleOutputStreamOperator<OUT> process(
        ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
        TypeInformation<OUT> outputType) {
    Preconditions.checkNotNull(processJoinFunction);
    Preconditions.checkNotNull(outputType);
    final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
    final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
        new IntervalJoinOperator<>(
            lowerBound,
            upperBound,
            lowerBoundInclusive,
            upperBoundInclusive,
            left.getType().createSerializer(left.getExecutionConfig()),
            right.getType().createSerializer(right.getExecutionConfig()),
            cleanedUdf
        );
    return left
        .connect(right)
        .keyBy(keySelector1, keySelector2)
        .transform("Interval Join", outputType, operator);
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容