flink學(xué)習(xí)之八-keyby&reduce

上文學(xué)習(xí)了簡單的map、flatmap、filter,在這里開始繼續(xù)看keyBy及reduce

keyBy

先看定義,通過keyBy,DataStream→KeyedStream。

邏輯上將流分區(qū)為不相交的分區(qū)。具有相同Keys的所有記錄都分配給同一分區(qū)。在內(nèi)部,keyBy()是使用散列分區(qū)實(shí)現(xiàn)的。指定鍵有不同的方法。

此轉(zhuǎn)換返回KeyedStream,其中包括使用被Keys化狀態(tài)所需的KeyedStream。

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple(數(shù)組)
    

注意 如果出現(xiàn)以下情況,則類型不能成為關(guān)鍵

  1. 它是POJO類型但不覆蓋hashCode()方法并依賴于Object.hashCode()實(shí)現(xiàn)。
  2. 它是任何類型的數(shù)組。

看段代碼:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

運(yùn)行后,結(jié)果如下:

3> key:1,value:5
3> key:1,value:7
3> key:1,value:2
4> key:2,value:3
4> key:2,value:4

可以看到,前面的 3> 和 4> 輸出 本身是個分組,而且順序是從先輸出key=1的tuple數(shù)組,再輸出key=2的數(shù)組。

也就是說,keyby類似于sql中的group by,將數(shù)據(jù)進(jìn)行了分組。后面基于keyedSteam的操作,都是組內(nèi)操作。

斷點(diǎn)看了下keyedStream的結(jié)構(gòu):

keyedStream.png

可以看到,包含了keyType、keySelector,以及轉(zhuǎn)換后的PartitionTransformation,也就是已經(jīng)做了分區(qū)了。后續(xù)的所有操作都是按照分區(qū)內(nèi)數(shù)據(jù)來處理的。

reduce

reduce表示將數(shù)據(jù)合并成一個新的數(shù)據(jù),返回單個的結(jié)果值,并且 reduce 操作每處理一個元素總是創(chuàng)建一個新值。而且reduce方法不能直接應(yīng)用于SingleOutputStreamOperator對象,也好理解,因?yàn)檫@個對象是個無限的流,對無限的數(shù)據(jù)做合并,沒有任何意義哈!

所以reduce需要針對分組或者一個window(窗口)來執(zhí)行,也就是分別對應(yīng)于keyBy、window/timeWindow 處理后的數(shù)據(jù),根據(jù)ReduceFunction將元素與上一個reduce后的結(jié)果合并,產(chǎn)出合并之后的結(jié)果。

在上面代碼的基礎(chǔ)上修改:

public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                .reduce((ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0, t2.f1 + t1.f1)) // value做累加
                .print();

        env.execute("execute");
    }
}
3> (1,5)
3> (1,12)
3> (1,14)
4> (2,3)
4> (2,7)

可以看到,分組后,每次有一個數(shù)組進(jìn)來,都會產(chǎn)生新的數(shù)據(jù),依然是按照分組來輸出的。

如果改下reduce中的實(shí)現(xiàn):

ReduceFunction<Tuple2<Long, Long>>) (t2, t1) -> new Tuple2<>(t1.f0 + t2.f0, t2.f1 + t1.f1)

那么輸出就是:

2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214).
2019-01-22 12:04:56.083 [Keyed Reduce -> Sink: Print to Std. Out (2/4)] INFO  org.apache.flink.runtime.taskmanager.Task - Ensuring all FileSystem streams are closed for task Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) [FINISHED]
4> (2,3)
4> (4,7)

...

2019-01-22 12:04:56.118 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (2/4) (7117b0831e59cae2201e6f7097356214) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.122 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (1/4) (0fdc49eb18050efa3acec361978f3e93) switched from RUNNING to FINISHED.
2019-01-22 12:04:56.125 [flink-akka.actor.default-dispatcher-4] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Keyed Reduce -> Sink: Print to Std. Out (4/4) (1607b502ab2791f2f567c61da214bd82) switched from RUNNING to FINISHED.
3> (1,5)
3> (2,12)
3> (3,14)

可以看到輸出結(jié)果,一方面是是key-reduce的狀態(tài),從RUNNING遷移到FINISHED;另一方面是按組輸出了最終的reduce值。

聚合

KeyedStream→DataStream

在被Keys化數(shù)據(jù)流上滾動聚合。min和minBy之間的差異是min返回最小值,而minBy返回該字段中具有最小值的數(shù)據(jù)元(max和maxBy類似)。

---TODO 這里存疑,因?yàn)榉祷氐臄?shù)據(jù)始終是數(shù)據(jù)源,難道是我寫錯了什么?SingleOutputStreamOperator<Tuple2>改成SingleOutputStreamOperator<Long> 也是一樣的結(jié)果,等待后續(xù)繼續(xù)驗(yàn)證。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

繼續(xù)在上面代碼的基礎(chǔ)上做實(shí)驗(yàn):

sum
public class KeyByTestJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        KeyedStream keyedStream =  env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以數(shù)組的第一個元素作為key
                ;

        SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.sum(0);
        sumStream.addSink(new PrintSinkFunction<>());

        env.execute("execute");
    }

對第一個元素(位置0)做sum,結(jié)果如下:

3> (1,5)
3> (2,5)
3> (3,5)
...
4> (2,3)
2019-01-22 21:27:07.401 [flink-akka.actor.default-dispatcher-3] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (f3368fedb9805b1e59f4443252a2fb2b) switched from RUNNING to FINISHED.
4> (4,3)

可以看到,對第一個數(shù)據(jù)(也就是key)做了累加,然后value以第一個進(jìn)來的數(shù)據(jù)為準(zhǔn)。

如過改成keyedStream.sum(1); 也就是針對第二個元素求和,得到的結(jié)果如下:

4> (2,3)
4> (2,7)
...
3> (1,5)
3> (1,12)
2019-01-23 10:50:47.498 [flink-akka.actor.default-dispatcher-5] INFO  o.a.flink.runtime.executiongraph.ExecutionGraph - Source: Collection Source (1/1) (df09751c6722a5942b058a1300ae9fb3) switched from RUNNING to FINISHED.
3> (1,14)
min
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.min(1);

得到的輸出結(jié)果是:

3> (1,5)  -- 第一組 第一個數(shù)據(jù)到的結(jié)果
3> (1,5)  -- 第一組 第二個數(shù)據(jù)到的結(jié)果
4> (2,3)  -- 第二組 第一個數(shù)據(jù)到的結(jié)果
4> (2,3)  -- 第二組 第二個數(shù)據(jù)到的結(jié)果
3> (1,2)  -- 第一組 第三個數(shù)據(jù)到的結(jié)果

這里順序有點(diǎn)亂,不過沒問題,數(shù)據(jù)按照順序一個一個的過來,然后計(jì)算當(dāng)前數(shù)據(jù)過來時有最小value的數(shù)據(jù)。

minBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.minBy(1);
3> (1,5)
3> (1,5)
4> (2,3)
3> (1,2)
4> (2,3)

類似的,只是組間打印的順序有區(qū)別而已。

max
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.max(1);
3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

按照順序,取最大的數(shù)據(jù)

maxBy
SingleOutputStreamOperator<Tuple2> sumStream = keyedStream.maxBy(1);

3> (1,5)
4> (2,3)
3> (1,7)
4> (2,4)
3> (1,7)

有一點(diǎn)要牢記,數(shù)據(jù)是一直流過來的,這些聚合方法都是在每次收到新的數(shù)據(jù)之后,重新計(jì)算/比較得出來的結(jié)果,而不是只有一個最終結(jié)果。

PS:有人評論說哪兒來的f0、f1,只能這里貼個圖了...


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容