1. Connect
DataStream有兩個(gè)connect方法:
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream)
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream)
前者是與普通DataStream連接,多用于把兩個(gè)流合并;后者是與一個(gè)BroadcastStream連接,多用于受管理數(shù)據(jù)流的控制
ConnectedStream
該流主要包含兩類操作:
- flatMap
public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper)
CoFlatMapFunction包含兩個(gè)方法,flatMap1和flatMap2,分別用于flatmap兩個(gè)輸入流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要對(duì)ConnectedStream作keyby,然后使用Keyed status來(lái)暫存和處理數(shù)據(jù)
- process
public <R> SingleOutputStreamOperator<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction)
與CoFlatMapFunction類似,CoProcessFunction包含兩個(gè)方法,processElement1和processElement2,分別用于process兩個(gè)輸入流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要對(duì)ConnectedStream作keyby,然后使用Keyed status來(lái)暫存和處理數(shù)據(jù)
此外還提供了TimerService,可以在元素來(lái)臨時(shí)注冊(cè)定時(shí)器,由watermark來(lái)觸發(fā)事件。
BroadcastConnectedStream
該流只包含process操作:
- process
public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function)
與CoProcessFunction類似,BroadcastProcessFunction包含兩個(gè)方法,processElement和processBroadcastElement,分別用于process數(shù)據(jù)流的元素和廣播流的元素,兩者互不相關(guān)。如果要使得兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則要使用BroadcastState
2. Join
DataStream的Join操作:
dataStream1.join(dataStream2)
.where(keySelector1)
.equalTo(keySelector2)
.window(win)
.apply(new JoinFunction<T1, T2, T> / FlatJoinFunction<T1, T2, T> function)
也就是兩個(gè)流的數(shù)據(jù)必須:從數(shù)據(jù)中提取key,只有相同key的元素才能join;并且必須是在窗口中的元素才能join,且連接的方式是類似“Inner Join(內(nèi)連接)”,也就是必須兩個(gè)流都有這個(gè)key的元素才能join,且兩個(gè)流中相同key的元素會(huì)兩兩執(zhí)行function的操作。
- JoinFunction
該類包含一個(gè)方法OUT join(IN1 first, IN2 second) ,表示每?jī)蓚€(gè)流中的元素生成一個(gè)out元素
- FlatJoinFunction
該類包含一個(gè)方法void join (IN1 first, IN2 second, Collector<OUT> out) ,表示每?jī)蓚€(gè)流中的元素可生成多個(gè)out元素
3. CoGroup
DataStream的CoGroup操作:
dataStream1.coGroup(dataStream2)
.where(keySelector1)
.equalTo(keySelector2)
.window(win)
.apply(new CoGroupFunction<T1, T2, T> function)
與join十分類似,也就是兩個(gè)流的數(shù)據(jù)必須:從數(shù)據(jù)中提取key,只有相同key的元素才能coGroup;并且必須是在窗口中的元素才能coGroup。但不同的是coGroup后的數(shù)據(jù)并不是兩兩作用function,而是將同一個(gè)window內(nèi)兩個(gè)流中所有相同key的數(shù)據(jù)都放到一起處理:
- CoGroupFunction
該類包含一個(gè)方法void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) ,first、second分別表示window內(nèi)兩個(gè)流中具有相同key的元素,注意如果某個(gè)流在window內(nèi)沒(méi)有該key的元素,則可能為空。因此可以使用此方法實(shí)現(xiàn)“Left Join(左連接)”、“Right Join(右連接)”、“Full Join(全連接)”等語(yǔ)義,當(dāng)然也能夠兼容實(shí)現(xiàn)Join中實(shí)現(xiàn)的“Inner Join(內(nèi)連接)”功能,只是沒(méi)必要這么復(fù)雜。CoGroup比Join功能更為強(qiáng)大
4. Interval Join
DataStream的Interval Join操作:
dataStream1.keyBy(keySelector1)
.intervalJoin(dataStream2.keyBy(keySelector2))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String> function)

與普通Join不同的是,并不是在window內(nèi)的元素join,而是dataStream1中元素與其前后若干時(shí)間區(qū)間的dataStream2的元素join
- ProcessJoinFunction
該類包含一個(gè)方法void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) ,與FlatJoinFunction功能類似,只是多了ctx可以用于查詢left、right、out的時(shí)間戳,并且可以產(chǎn)生side output
總結(jié):
相同點(diǎn):
都是將兩個(gè)流合成一個(gè)流不同點(diǎn):
Connect:兩個(gè)流的數(shù)據(jù)不一定要發(fā)生關(guān)系,可以各自單獨(dú)處理、單獨(dú)輸出,但是輸出的結(jié)果會(huì)合并成一個(gè)流。也可以使一個(gè)流不輸出數(shù)據(jù),而僅僅作為另一個(gè)流的控制狀態(tài)。如果要使兩個(gè)流的數(shù)據(jù)發(fā)生關(guān)系,則必須使用state
Join/CoGroup:兩個(gè)流之間的數(shù)據(jù)會(huì)發(fā)生某種關(guān)聯(lián),需要對(duì)兩個(gè)流的數(shù)據(jù)抽取key,并且加窗,將窗內(nèi)的同key數(shù)據(jù)進(jìn)行處理,共同生成新的流元素