FlinkCEP with EventTime

在之前的Hello FlinkCEP文章中已經簡單介紹了FlinkCEP的使用,只是為了簡化邏輯并沒有加入時間概念。那么在實際業(yè)務場景中,都是會要求在特定的時間內發(fā)生某種事件。在Flink中“時間”是一個非常重要的概念,可以參考官網對時間的介紹。本文只是基于CEP場景介紹遲到或者亂序的事件,是如何進行條件匹配的。本文基于flink 1.9版本。

In CEP the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are sorted in ascending order based on their timestamp, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.

業(yè)務場景

Hello FlinkCEP中的業(yè)務場景加入一個時間限制,即只有在10秒中內連續(xù)發(fā)生兩筆交易,并且第一筆交易額小于10,第二筆有效交易額大于100,就要觸發(fā)告警。

業(yè)務實現(xiàn)

  1. 交易抽象為SubEvent.java,增加事件時間,其他部分請參見源碼
public class SubEvent extends Event {
    private String date;

    public SubEvent(String id, EventType type, double volume, String date) {
        super(id, type, volume);
        this.date = date;
    }
}
  1. 樣例CEPWithTimeExample.java
public class CEPWithTimeExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
        properties.setProperty("group.id", "cepG");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer010<>("foo", new SimpleStringSchema(), properties));


        DataStream<SubEvent> input = stream.map(new MapFunction<String, SubEvent>() {
            @Override
            public SubEvent map(String value) throws Exception {
                String[] v = value.split(",");
                return new SubEvent(v[0], EventType.valueOf(v[1]), Double.parseDouble(v[2]), v[3]);
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());

        Pattern<SubEvent, ?> pattern = Pattern.<SubEvent>begin("start").where(
                new SimpleCondition<SubEvent>() {
                    @Override
                    public boolean filter(SubEvent subEvent) {
                        System.out.println(subEvent + " from start at " + StringUtilsPlus.stampToDate(System.currentTimeMillis()));
                        return subEvent.getType() == EventType.VALID && subEvent.getVolume() < 10;
                    }
                }
        ).next("end").where(
                new SimpleCondition<SubEvent>() {
                    @Override
                    public boolean filter(SubEvent subEvent) {
                        System.out.println(subEvent + " from end");
                        return subEvent.getType() == EventType.VALID && subEvent.getVolume() > 100;
                    }
                }
        ).within(Time.seconds(10));

        PatternStream<SubEvent> patternStream = CEP.pattern(input, pattern);

        DataStream<Alert> result = patternStream.process(
                new PatternProcessFunction<SubEvent, Alert>() {
                    @Override
                    public void processMatch(
                            Map<String, List<SubEvent>> pattern,
                            Context ctx,
                            Collector<Alert> out) throws Exception {

                        System.out.println(pattern);

                        out.collect(new Alert("111", "CRITICAL"));
                    }
                });

        result.print();

        env.execute("Flink cep example");
    }

    private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<SubEvent> {

        private final long maxOutOfOrderness = 5000;

        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(SubEvent subEvent, long previousElementTimestamp) {
            System.out.println("SubEvent is " + subEvent);
            long timestamp = StringUtilsPlus.dateToStamp(subEvent.getDate());
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
            return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
}
  1. 樣例說明
    使用env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);聲明使用事件時間,并通過BoundedOutOfOrdernessGenerator指定允許事件遲到5秒,關于watermark的介紹可以參見官網。

源碼流程分析

  1. 首先,如果是使用EventTime,會進入flink源碼CepOperator.javaprocessElement方法,當事件時間大于上一次的Watermark時,會把當前的event加入到elementQueueState隊列中,不符合條件的默認會直接丟棄,關鍵代碼如下:
long timestamp = element.getTimestamp();
IN value = element.getValue();

// In event-time processing we assume correctness of the watermark.
// Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.

if (timestamp > lastWatermark) {

    // we have an event with a valid timestamp, so
    // we buffer it until we receive the proper watermark.

    saveRegisterWatermarkTimer();

    bufferEvent(value, timestamp);//event加入緩存隊列

} else if (lateDataOutputTag != null) {
    output.collect(lateDataOutputTag, element);
}
  1. 滿足條件的event加入隊列以后,會在CepOperator.javaonEventTime方法中判斷是否執(zhí)行觸發(fā)計算,這個方法非常的重要,里面大概分為了5個步驟,源碼如下:
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {

    // 1) get the queue of pending elements for the key and the corresponding NFA,
    // 2) process the pending elements in event time order and custom comparator if exists
    //      by feeding them in the NFA
    // 3) advance the time to the current watermark, so that expired patterns are discarded.
    // 4) update the stored state for the key, by only storing the new NFA and MapState iff they
    //      have state to be used later.
    // 5) update the last seen watermark.

    // STEP 1
    PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
    NFAState nfaState = getNFAState();

    // STEP 2
    while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
        long timestamp = sortedTimestamps.poll();
        advanceTime(nfaState, timestamp);
        try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
            elements.forEachOrdered(
                event -> {
                    try {
                        processEvent(nfaState, event, timestamp);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            );
        }
        elementQueueState.remove(timestamp);
    }

    // STEP 3
    advanceTime(nfaState, timerService.currentWatermark());
    // STEP 4
    updateNFA(nfaState);

    if (!sortedTimestamps.isEmpty() || !partialMatches.isEmpty()) {
        saveRegisterWatermarkTimer();
    }

    // STEP 5
    updateLastSeenWatermark(timerService.currentWatermark());
}

STEP 1:隊列中的事件可能時序是亂序的,所以需要先根據event time進行排序;

STEP 2:從事件時間戳最小的開始遍歷,只有時間戳小于等于當前watermark時才會計算;

其他剩下的步驟就是更新相關的狀態(tài)和Watermark

測試數(shù)據

id type volume timestamp
1 VALID 1 2019-10-18 01:00:30
2 VALID 1 2019-10-18 01:00:24
3 VALID 1 2019-10-18 01:00:28
4 VALID 200 2019-10-18 01:00:35
5 VALID 1 2019-10-18 01:00:45

結果分析

  1. id為1的event初始化會進入elementQueueState隊列中,此時Watermark=2019-10-18 01:00:25;
  2. 由于id為2的event,時間戳小于上次的Watermark,即2019-10-18 01:00:24 < 2019-10-18 01:00:25,所以這個事件不會進入elementQueueState隊列,此時Watermark=2019-10-18 01:00:25;
  3. id為3的event時間戳大于上一次的Watermark所以正常進入elementQueueState隊列,此時Watermark=2019-10-18 01:00:25,隊列中時序是亂的,在onEventTime方法先排序,排序之后元素順序為event3,event1,這兩個事件的時間戳都大于當前的Watermark=2019-10-18 01:00:25,所以這時不會觸發(fā)計算;
  4. id為4的event符合條件,進入elementQueueState隊列,并更新Watermark為2019-10-18 01:00:30,隊列排序后結果為event3,event1,event4,但是event4的時間戳大于當前的Watermark,所以只有event3和event1觸發(fā)計算,這時event1滿足start模式的條件;
  5. 同上,id為5的event也會觸發(fā)計算,并更新Watermark為2019-10-18 01:00:40,這時隊列中為event4,event5,只有event4滿足時間戳小于當前的Watermark觸發(fā)計算,并且event4也滿足end模式的條件,所以最終觸發(fā)了一次告警,即event1-->event4。

其他

以上的測試數(shù)據主要是說明何時會觸發(fā)計算,觸發(fā)計算之后才會判斷兩個事件是否在指定的時間內發(fā)生。可以把本例中的maxOutOfOrderness改為20000,再使用下面的數(shù)據進行測試即可,第一組數(shù)據最終會觸發(fā)告警,第二組數(shù)據不會。
第一組

1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:39
3,VALID,300,2019-10-18 03:00:00

第二組

1,VALID,1,2019-10-18 01:00:30
2,VALID,300,2019-10-18 01:00:41
3,VALID,300,2019-10-18 03:00:00

總結

本文基于Hello FlinkCEP文章進一步通過一個樣例和一些源碼的說明,演示了帶有EventTime的事件是如何觸發(fā)計算以及模式匹配的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容