Flink雙流算子

1. Connect

DataStream有兩個(gè)connect方法:

public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream)
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream)

前者是與普通DataStream連接,多用于把兩個(gè)流合并;后者是與一個(gè)BroadcastStream連接,多用于受管理數(shù)據(jù)流的控制

ConnectedStream

該流主要包含兩類操作:

  1. flatMap
public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper)

CoFlatMapFunction包含兩個(gè)方法,flatMap1和flatMap2,分別用于flatmap兩個(gè)輸入流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要對(duì)ConnectedStream作keyby,然后使用Keyed status來(lái)暫存和處理數(shù)據(jù)

  1. process
public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction)

與CoFlatMapFunction類似,CoProcessFunction包含兩個(gè)方法,processElement1和processElement2,分別用于process兩個(gè)輸入流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要對(duì)ConnectedStream作keyby,然后使用Keyed status來(lái)暫存和處理數(shù)據(jù)

此外還提供了TimerService,可以在元素來(lái)臨時(shí)注冊(cè)定時(shí)器,由watermark來(lái)觸發(fā)事件。

BroadcastConnectedStream

該流只包含process操作:

  1. process
public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function)

與CoProcessFunction類似,BroadcastProcessFunction包含兩個(gè)方法,processElement和processBroadcastElement,分別用于process數(shù)據(jù)流的元素和廣播流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要使用BroadcastState

2. Join

DataStream的Join操作:

dataStream1.join(dataStream2)
.where(keySelector1)
.equalTo(keySelector2)
.window(win)
.apply(new JoinFunction<T1, T2, T> / FlatJoinFunction<T1, T2, T> function)

也就是兩個(gè)流的數(shù)據(jù)必須:從數(shù)據(jù)中提取key,只有相同key的元素才能join;并且必須是在窗口中的元素才能join,且連接的方式是類似“Inner Join(內(nèi)連接)”,也就是必須兩個(gè)流都有這個(gè)key的元素才能join,且兩個(gè)流中相同key的元素會(huì)兩兩執(zhí)行function的操作。

  1. JoinFunction

該類包含一個(gè)方法OUT join(IN1 first, IN2 second) ,表示每?jī)蓚€(gè)流中的元素生成一個(gè)out元素

  1. FlatJoinFunction

該類包含一個(gè)方法void join (IN1 first, IN2 second, Collector<OUT> out) ,表示每?jī)蓚€(gè)流中的元素可生成多個(gè)out元素

3. CoGroup

DataStream的CoGroup操作:
dataStream1.coGroup(dataStream2)
.where(keySelector1)
.equalTo(keySelector2)
.window(win)
.apply(new CoGroupFunction<T1, T2, T> function)

與join十分類似,也就是兩個(gè)流的數(shù)據(jù)必須:從數(shù)據(jù)中提取key,只有相同key的元素才能coGroup;并且必須是在窗口中的元素才能coGroup。但不同的是coGroup后的數(shù)據(jù)并不是兩兩作用function,而是將同一個(gè)window內(nèi)兩個(gè)流中所有相同key的數(shù)據(jù)都放到一起處理:

  1. CoGroupFunction

該類包含一個(gè)方法void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) ,first、second分別表示window內(nèi)兩個(gè)流中具有相同key的元素,注意如果某個(gè)流在window內(nèi)沒(méi)有該key的元素,則可能為空。因此可以使用此方法實(shí)現(xiàn)“Left Join(左連接)”、“Right Join(右連接)”、“Full Join(全連接)”等語(yǔ)義,當(dāng)然也能夠兼容實(shí)現(xiàn)Join中實(shí)現(xiàn)的“Inner Join(內(nèi)連接)”功能,只是沒(méi)必要這么復(fù)雜。CoGroup比Join功能更為強(qiáng)大

4. Interval Join

DataStream的Interval Join操作:
dataStream1.keyBy(keySelector1)
.intervalJoin(dataStream2.keyBy(keySelector2))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String> function)
Interval Join

與普通Join不同的是,并不是在window內(nèi)的元素join,而是dataStream1中元素與其前后若干時(shí)間區(qū)間的dataStream2的元素join

  1. ProcessJoinFunction

該類包含一個(gè)方法void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) ,與FlatJoinFunction功能類似,只是多了ctx可以用于查詢left、right、out的時(shí)間戳,并且可以產(chǎn)生side output

總結(jié):

  • 相同點(diǎn):
    都是將兩個(gè)流合成一個(gè)流

  • 不同點(diǎn):

Connect:兩個(gè)流的數(shù)據(jù)不一定要發(fā)生關(guān)系,可以各自單獨(dú)處理、單獨(dú)輸出,但是輸出的結(jié)果會(huì)合并成一個(gè)流。也可以使一個(gè)流不輸出數(shù)據(jù),而僅僅作為另一個(gè)流的控制狀態(tài)。如果要使兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則必須使用state

Join/CoGroup:兩個(gè)流之間的數(shù)據(jù)會(huì)發(fā)生某種關(guān)聯(lián),需要對(duì)兩個(gè)流的數(shù)據(jù)抽取key,并且加窗,將窗內(nèi)的同key數(shù)據(jù)進(jìn)行處理,共同生成新的流元素

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容