Hello FlinkCEP

關(guān)于FlinkCEP的相關(guān)概念和說明網(wǎng)上已經(jīng)有很多介紹的文章,這里就不再贅述,本文主要通過一個簡單的場景作為FlinkCEP的入門快速上手,并通過樣例初步了解一下Combining Patterns中的事件之間的鄰接模式:

FlinkCEP supports the following forms of contiguity between events:

  1. Strict Contiguity: Expects all matching events to appear strictly one after the other, without any non-matching events in-between.
  2. Relaxed Contiguity: Ignores non-matching events appearing in-between the matching ones.
  3. Non-Deterministic Relaxed Contiguity: Further relaxes contiguity, allowing additional matches that ignore some matching events.

To apply them between consecutive patterns, you can use:

  1. next(), for strict,
  2. followedBy(), for relaxed, and
  3. followedByAny(), for non-deterministic relaxed contiguity.

業(yè)務(wù)場景

對于異常交易產(chǎn)生告警,例如交易分為有效和無效兩種,如果先產(chǎn)生了一筆有效交易額小于10,然后產(chǎn)生了一筆有效交易額大于100,就要觸發(fā)告警。這里為了簡化邏輯并沒有考慮兩筆交易的時間間隔。

業(yè)務(wù)分析

對于這個業(yè)務(wù)場景,主要問題在于這兩筆交易的連續(xù)性,也就是說有三種情況:

  1. 兩筆交易一定是連續(xù)的,且中間無任何的交易產(chǎn)生,也就是兩個條件之間用next()連接
  2. 兩筆交易可以不連續(xù),中間可以有其他的交易,但是最終第二個條件只會匹配上一次成功匹配之后的事件,即會拋棄匹配成功的事件,也就是兩個條件之間用followedBy()連接
  3. 兩筆交易可以不連續(xù),中間可以有其他的交易,并且最終第二個條件會匹配所有滿足第一個條件的交易,也就是兩個條件之間用followedByAny()連接

第一點很好理解,對于第二,第三點會在稍后程序中做詳細的解釋。

業(yè)務(wù)實現(xiàn)

  1. 交易抽象為Event.java,其他部分請參見源碼
public class Event {
    private EventType type; //事件類型,即有效,無效
    private double volume; //交易額
    private String id; //交易流水號
}
  1. 樣例CEPExample.java
public class CEPExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
        properties.setProperty("group.id", "cepG");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer010<>("foo", new SimpleStringSchema(), properties));

        DataStream<Event> input = stream.map(new MapFunction<String, Event>() {
            @Override
            public Event map(String value) throws Exception {
                String[] v = value.split(",");
                return new Event(v[0], EventType.valueOf(v[1]), Double.parseDouble(v[2]));
            }
        });

        Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
                new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                        System.out.println(event + " from start");
                        return event.getType() == EventType.VALID && event.getVolume() < 10;
                    }
                }
        ).next("end").where(
                new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                        System.out.println(event + " from end");
                        return event.getType() == EventType.VALID && event.getVolume() > 100;
                    }
                }
        );

        PatternStream<Event> patternStream = CEP.pattern(input, pattern);

        DataStream<Alert> result = patternStream.process(
                new PatternProcessFunction<Event, Alert>() {
                    @Override
                    public void processMatch(
                            Map<String, List<Event>> pattern,
                            Context ctx,
                            Collector<Alert> out) throws Exception {

                        System.out.println(pattern);

                        out.collect(new Alert("111", "CRITICAL"));
                    }
                });

        result.print();

        env.execute("Flink cep example");
    }
}

測試數(shù)據(jù)

id eventType volume
1 VALID 2
2 VALID 200
3 VALID 3
4 INVALID 1
5 VALID 1
6 VALID 300
7 VALID 600

結(jié)果分析

  1. 如果使用的是next("end"),只會觸發(fā)2次告警,分別為
    next("end")

這就是因為next必須要滿足兩個連續(xù)的事件都符合條件。

  1. 如果使用的是followedBy("end"),會觸發(fā)3次告警,分別為
    followedBy("end")

可以看到滿足條件的event中間可以有不滿足的事件產(chǎn)生。

  1. 如果使用的是followedByAny("end"),會觸發(fā)7次告警,分別為
    followedByAny("end")

followedByAny("end")followedBy("end")主要的區(qū)別就是所有滿足條件的兩個事件都會觸發(fā)告警,即便前一個條件已經(jīng)生效過。

總結(jié)

本文實現(xiàn)了一個簡單的CEP場景,并分析了兩個事件常見的鄰接模式,目前只是初步的一個了解,后續(xù)會根據(jù)遇到的實際場景再介紹相關(guān)的使用方法和原理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容