Flink CEP 簡(jiǎn)介及使用

開(kāi)篇

這段時(shí)間較忙,很久沒(méi)有來(lái)記錄一些學(xué)習(xí)到的東西。額,也有可能是變懶了。。。 恰巧最近在看flink相關(guān)的東西,于是就把相關(guān)的東西以及自己的一些理解記錄一下。

這篇文章主要是簡(jiǎn)單介紹一下CEP是干哈的,什么場(chǎng)景下可以用到。接著就來(lái)實(shí)戰(zhàn)一下,用CEP來(lái)實(shí)現(xiàn)一個(gè)需求。最后是對(duì)CEP的一些比較常用的API進(jìn)行介紹。

這里貼一下官網(wǎng)鏈接下面example的地址

Complex event processing for Flink

FlinkCEP 是在flink上面實(shí)現(xiàn)的一個(gè)可以處理復(fù)雜事件的庫(kù)。它可以從無(wú)界事件流中提取你想要的事件或者信息。

FlinkCEP提供了一系列 Pattern API ,我們可以定義自己的 pattern sequences ,用于從原始事件流中匹配和提取事件。

CEP

我們可以這樣理解,輸入的簡(jiǎn)單事件,flink計(jì)算輸出的是提取出來(lái)的復(fù)雜事件。那么就得知,復(fù)雜事件是根據(jù)一些規(guī)則簡(jiǎn)單事件流中計(jì)算而得來(lái)的,這個(gè)規(guī)則就是我們定義的pattern sequences

由此,假如我們需要檢測(cè)某些復(fù)雜事件,無(wú)需寫(xiě)額外的代碼去監(jiān)測(cè)輸入的原始事件,按需定義pattern sequences 即可。這么講一通,確實(shí)比較抽象,接下來(lái)我們看一個(gè)例子,進(jìn)一步體會(huì)體會(huì)CEP。

example

我們購(gòu)物時(shí),通常有一些點(diǎn)擊瀏覽商品和加入收藏的一些行為。現(xiàn)在假設(shè)每發(fā)生上述的行為,都會(huì)作為事件輸入到Flink中。
現(xiàn)在我們定義規(guī)則,對(duì)于某個(gè)用戶(hù),若滿足下面其中一個(gè)條件,就會(huì)被標(biāo)記為潛在客戶(hù),可能會(huì)作為后續(xù)商品的推廣對(duì)象。

  1. 先點(diǎn)擊瀏覽商品,然后將商品加入收藏。
  2. 1分鐘內(nèi)點(diǎn)擊瀏覽了商品3次。

好的,我們開(kāi)始寫(xiě)代碼,首先定義Event

public class Event {
    private String name;
    /** 事件類(lèi)型  0--瀏覽商品   1---收藏商品*/
    private int type;
}

為了本地測(cè)試方便,通過(guò)nc命令打開(kāi)端口向flink模擬輸入事件,flink接收到事件之后進(jìn)行轉(zhuǎn)換

DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 9999, "\n");

KeyedStream<Event, String> partitionedInput = dataStreamSource.filter(Objects::nonNull)
                .map(s -> {
                    // 輸入的string,逗號(hào)分隔,第一個(gè)字段為用戶(hù)名,第二個(gè)字段為事件類(lèi)型
                    String[] strings = s.split(",");
                    if (strings.length != 2) {
                        return null;
                    }
                    Event event = new Event();
                    event.setName(strings[0]);
                    event.setType(Integer.parseInt(strings[1]));
                    return event;
                }).returns(Event.class)
                .keyBy((KeySelector<Event, String>) Event::getName);

接著我們定義兩個(gè)pattern sequences, patternA 和 patternB代表的規(guī)則代碼中已經(jīng)進(jìn)行注釋。

        // 先點(diǎn)擊瀏覽商品,然后將商品加入收藏
        Pattern<Event, ?> patternA = Pattern.<Event>begin("firstly")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        // 點(diǎn)擊商品
                        return event.getType() == 0;
                    }
                })
                .followedBy("and")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        // 將商品加入收藏
                        return event.getType() == 1;
                    }
                });

        // 1分鐘內(nèi)點(diǎn)擊瀏覽了商品3次。
        Pattern<Event, ?> patternB = Pattern.<Event>begin("start")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        // 瀏覽商品
                        return event.getType() == 0;
                    }
                })
                .timesOrMore(1)
                .within(Time.minutes(3));

定義好pattern之后,我們就可以利用這兩個(gè)pattern對(duì)輸入流進(jìn)行檢測(cè),因?yàn)槲覀冇袃蓚€(gè)pattern sequences, 檢測(cè)完之后可以對(duì)檢測(cè)出的復(fù)雜事件進(jìn)行union。

        // CEP用pattern將輸入的時(shí)間事件流轉(zhuǎn)化為復(fù)雜事件流
        PatternStream<Event> patternStreamA = CEP.pattern(partitionedInput, patternA);
        PatternStream<Event> patternStreamB = CEP.pattern(partitionedInput, patternB);

        DataStream<String> streamA = processPatternStream(patternStreamA, "收藏商品");
        DataStream<String> streamB = processPatternStream(patternStreamB, "連續(xù)瀏覽商品");

        // 最后兩個(gè)復(fù)雜事件流進(jìn)行合并
        streamA.union(streamB)
                .print();

完整的代碼可以從這里取 代碼地址

運(yùn)行如下

// 輸入
Jack,0
Jack,1
//輸出
> Jack 成為潛在客戶(hù) ,收藏商品

// 輸入
Andy,0
Andy,0
Andy,0
// 輸出
> Andy 成為潛在客戶(hù) ,連續(xù)瀏覽商品

Pattern API 解釋

在上面這個(gè)example里面,我們使用了一些比較常用的Pattern API,下面我們來(lái)對(duì)這些API進(jìn)行認(rèn)識(shí)。

conditions

對(duì)于每個(gè)pattern需要為其設(shè)置條件(condition)來(lái)判斷到來(lái)的簡(jiǎn)單事件是否被接受,上面的example中,就用到了pattern.where()來(lái)進(jìn)行條件判斷。除此之外還提供了, pattern.or()pattern.until()兩個(gè)方法。
pattern.where()是給當(dāng)前的pattern定義一個(gè)條件。
pattern.or()是給當(dāng)前的pattern定義一個(gè)or條件。
pattern. until()是給當(dāng)前的pattern定義一個(gè)結(jié)束循環(huán)的條件。

除了為pattern指定條件,還可以設(shè)置它的循環(huán)次數(shù),下面代碼摘自官網(wǎng),每句代碼的作用是顯而易見(jiàn)的,就不翻譯了。

// expecting 4 occurrences
start.times(4);

// expecting 0 or 4 occurrences
start.times(4).optional();

// expecting 2, 3 or 4 occurrences
start.times(2, 4);

// expecting 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).greedy();

// expecting 0, 2, 3 or 4 occurrences
start.times(2, 4).optional();

// expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
start.times(2, 4).optional().greedy();

// expecting 1 or more occurrences
start.oneOrMore();

// expecting 1 or more occurrences and repeating as many as possible
start.oneOrMore().greedy();

// expecting 0 or more occurrences
start.oneOrMore().optional();

// expecting 0 or more occurrences and repeating as many as possible
start.oneOrMore().optional().greedy();

// expecting 2 or more occurrences
start.timesOrMore(2);

// expecting 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).greedy();

// expecting 0, 2 or more occurrences
start.timesOrMore(2).optional()

// expecting 0, 2 or more occurrences and repeating as many as possible
start.timesOrMore(2).optional().greedy();

Combining Patterns

我們定義多個(gè)pattern之后,需要將他們組合起來(lái),如上面我們的example中就:先瀏覽商品,再點(diǎn)擊收藏,我們回憶一下,是這樣寫(xiě)的,兩個(gè)patternfollowedBy進(jìn)行連接。

        // 先點(diǎn)擊瀏覽商品,然后將商品加入收藏
        Pattern<Event, ?> patternA = Pattern.<Event>begin("firstly")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        // 點(diǎn)擊商品
                        return event.getType() == 0;
                    }
                })
                .followedBy("and")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        // 將商品加入收藏
                        return event.getType() == 1;
                    }
                });

CEP中pattern連接有3中模式,
Strict Contiguity:要求一個(gè)event之后必須緊跟下一個(gè)符合條件的event,中間不允許有其他事件。
Relaxed Contiguity:和上一種不同的是,該模式允許中間有其他無(wú)關(guān)的event,會(huì)對(duì)他們進(jìn)行忽略。
Non-Deterministic Relaxed Contiguity:非確定性寬松連續(xù)性,可以對(duì)已經(jīng)匹配的事件就行忽略,對(duì)接下來(lái)的事件繼續(xù)匹配。

上面三種模式分別對(duì)應(yīng)的表達(dá)式是:

next()
followedBy()
followedByAny()

對(duì)于next()followedBy()比較好理解,那我們來(lái)看個(gè)關(guān)于followedByAny()栗子。
如下 pattern sequence 和input,b+表示一個(gè)或者多個(gè)b:

// pattern sequence
a (followedByAny) b+ (followedByAny)  c
// input 
 "a", "b1", "d1", "b2", "d2", "b3" "c"

那么,CEP匹配得到的結(jié)構(gòu)有:{a b1 c}, {a b1 b2 c}, {a b1 b3 c}, {a b1 b2 b3 c}, {a b2 c}, {a b2 b3 c}, {a b3 c}。可以看到, {a b1 b3 c}忽略了已經(jīng)匹配的事件b2,{a b2 c}{a b2 b3 c}都忽略了已經(jīng)匹配的事件b1,就是時(shí)所謂的Non-Deterministic Relaxed Contiguity。

總結(jié)

Flink CEP的簡(jiǎn)單介紹和使用就到這里,通過(guò)本文可以對(duì)fink cep有一個(gè)簡(jiǎn)單的認(rèn)識(shí)。對(duì)于復(fù)雜事件的提取,我們只關(guān)注于pattern sequence的編寫(xiě),極大簡(jiǎn)化了程序的復(fù)雜性,提高了可維護(hù)性。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容