操作符將一個(gè)或多個(gè)DataStream轉(zhuǎn)換成為新的DataStream。程序可以將多個(gè)轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓步。
本節(jié)給出了基本轉(zhuǎn)換的描述,以及應(yīng)用這些轉(zhuǎn)換后的有效物理分區(qū)和Flink的操作符鏈接的見(jiàn)解。
DataStream轉(zhuǎn)換操作
| 轉(zhuǎn)換操作 | 描述 |
|---|---|
|
Map DataStream → DataStream |
輸入一個(gè)元素并返回一個(gè)元素。一個(gè)將輸入流的value乘以2的map函數(shù): <pre> DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } }); </pre> |
|
FlatMap DataStream → DataStream |
輸入一個(gè)元素,返回0,1或多個(gè)元素。一個(gè)將句子拆分成單詞的flatmap函數(shù): <pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });</code> </pre> |
|
Filter DataStream → DataStream |
計(jì)算每個(gè)元素的布爾函數(shù),并保留函數(shù)返回true的值,一個(gè)過(guò)濾掉0值的filter: <pre> dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } }); <code></code> </pre> |
|
KeyBy DataStream → KeyedStream |
邏輯上將一條流劃分為不同的區(qū),每個(gè)區(qū)包含相同鍵的元素。在內(nèi)部,這是通過(guò)hash分區(qū)實(shí)現(xiàn)的。如何指定鍵請(qǐng)看keys。這個(gè)轉(zhuǎn)換操作返回一個(gè)KeyedStream。 <pre> <code>dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre> 注意:下述類型不能成為key: 1. 它是一個(gè)POJO類,但是沒(méi)有重寫hashCode()方法而是依賴于Object.hashCode()的實(shí)現(xiàn)。 2. 它是任意類型的數(shù)組。 |
|
Reduce KeyedStream → DataStream |
keyed數(shù)據(jù)流上的“滾動(dòng)”reduce。將當(dāng)前元素與最后一個(gè)reduce值組合并發(fā)出新的值。 一個(gè)創(chuàng)建流的部分和的reduce函數(shù): <pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } }); </code></pre> |
|
Fold KeyedStream → DataStream |
具有初始值的keyed數(shù)據(jù)流上的“滾動(dòng)”fold。將當(dāng)前元素與最后一個(gè)fold值組合并發(fā)出新的值。 當(dāng)應(yīng)用于序列(1,2,3,4,5)上時(shí),fold函數(shù)會(huì)發(fā)出"start-1", "start-1-2", "start-1-2-3", ... <pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() { @Override public String fold(String current, Integer value) { return current + "-" + value; } }); </code></pre> |
|
Aggregations KeyedStream → DataStream |
keyed數(shù)據(jù)流上的滾動(dòng)聚合。min和minBy的區(qū)別在于min返回最小值,minBy返回這個(gè)屬性上具有最小值的元素(max和maxBy相同). <pre> <code> keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key"); </code></pre> |
|
Window KeyedStream → WindowedStream |
可以在已經(jīng)分區(qū)的KeyedStream上定義窗口。窗口根據(jù)鍵的某些特性(例如,在最后5秒到達(dá)的數(shù)據(jù))分組數(shù)據(jù)。關(guān)于窗口的完整描述請(qǐng)見(jiàn)Windows。 <pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre> |
|
WindowAll DataStream → AllWindowedStream |
可以在常規(guī)的DataStream上定義窗口。窗口根據(jù)一些特性(例如,在最后5秒到達(dá)的數(shù)據(jù))分組所有流的事件。關(guān)于窗口的完整描述請(qǐng)見(jiàn)Windows。 警告: 這在很多情況下不是一個(gè)并行的轉(zhuǎn)換。windowAll操作符會(huì)把所有的記錄收集到一個(gè)任務(wù)中。 <pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre> |
|
Window Apply WindowedStream → DataStream AllWindowedStream → DataStream |
將一個(gè)通用函數(shù)應(yīng)用于窗口。下面是一個(gè)手動(dòng)計(jì)算窗口元素的函數(shù)。 注意: 如果使用windowAll轉(zhuǎn)換,需要使用AllWindowFunction代替。 <pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } }); </code></pre> |
|
Window Reduce WindowedStream → DataStream |
將reduce函數(shù)應(yīng)用到窗口上,并返回reduce的值。 <pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); } }); </code></pre> |
|
Window Fold WindowedStream → DataStream |
將fold函數(shù)應(yīng)用到窗口上,并返回fold值。當(dāng)應(yīng)用到列表(1,2,3,4,5)上時(shí),示例函數(shù)fold列表成字符串"start-1-2-3-4-5": <pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() { public String fold(String current, Integer value) { return current + "-" + value; } }); </code></pre> |
|
Aggregations on windows WindowedStream → DataStream |
聚合窗口的內(nèi)容。min和minBy的區(qū)別在于min返回最小值,minBy返回這個(gè)屬性上具有最小值的元素(max和maxBy相同). <pre> <code> windowedStream.sum(0); windowedStream.sum("key"); windowedStream.min(0); windowedStream.min("key"); windowedStream.max(0); windowedStream.max("key"); windowedStream.minBy(0); windowedStream.minBy("key"); windowedStream.maxBy(0); windowedStream.maxBy("key"); </code></pre> |
|
Union DataStream* → DataStream |
合并兩個(gè)或多個(gè)數(shù)據(jù)流,然后生成一個(gè)新的包含所有流的所有元素的流。注意:如果將數(shù)據(jù)流與自己合并,在新的流中會(huì)得到每個(gè)元素兩次。dataStream.union(otherStream1, otherStream2, ...);
|
|
Window Join DataStream,DataStream → DataStream |
在公共窗口和給定鍵上連接兩個(gè)數(shù)據(jù)流。 <pre> <code> dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); </code></pre> |
|
Window CoGroup DataStream,DataStream → DataStream |
在公共窗口和給定鍵上對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行分組。 <pre> <code> dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); </code></pre> |
|
Connect DataStream,DataStream → ConnectedStreams |
“連接”兩個(gè)數(shù)據(jù)流保留它們的類型。連接允許兩個(gè)流之間的共享狀態(tài)。 <pre> <code> DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre> |
|
CoMap, CoFlatMap ConnectedStreams → DataStream |
類似于連接數(shù)據(jù)流上的map和flatMap。 <pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @Override public Boolean map1(Integer value) { return true; } @Override public Boolean map2(String value) { return false; } }); connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { @Override public void flatMap1(Integer value, Collector<String> out) { out.collect(value.toString()); } @Override public void flatMap2(String value, Collector<String> out) { for (String word: value.split(" ")) { out.collect(word); } } });</code></pre> |
|
Split DataStream → SplitStream |
根據(jù)某些標(biāo)準(zhǔn)將流分成兩個(gè)或多個(gè)流。 <pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @Override public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) { output.add("even"); } else { output.add("odd"); } return output; } }); </code></pre> |
|
Select SplitStream → DataStream |
從拆分流中選擇一個(gè)或多個(gè)流。 <pre> <code> SplitStream<Integer> split; DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd"); </code></pre> |
|
Iterate DataStream → IterativeStream → DataStream |
通過(guò)重定向操作符的輸出到前一個(gè)操作符,創(chuàng)建一個(gè)流上的“反饋”循環(huán)。這對(duì)于連續(xù)更新模型的算法特別有用。下面的代碼以一個(gè)數(shù)據(jù)流開(kāi)始并持續(xù)的應(yīng)用迭代體。大于0的元素被發(fā)送回反饋通道,其余元素被發(fā)送到下游。完整的描述請(qǐng)參見(jiàn)iterations。 <pre> <code> IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/do something/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Integer value) throws Exception { return value <= 0; } }); </code></pre> |
|
Extract Timestamps DataStream → DataStream |
從記錄中提取時(shí)間戳,以便于使用事件時(shí)間語(yǔ)義的窗口工作。見(jiàn)Event Time。stream.assignTimestamps (new TimeStampExtractor() {...});
|
可以對(duì)Tuple數(shù)據(jù)流有以下轉(zhuǎn)換:
| 轉(zhuǎn)換操作 | 描述 |
|---|---|
|
Project DataStream → DataStream |
選取元組屬性的子集。 <pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre> |
物理分區(qū)
Flink通過(guò)下述函數(shù),在轉(zhuǎn)換后的流上提供低層次的精確流分區(qū)控制(如果需要的話)。
| 轉(zhuǎn)換操作 | 描述 |
|---|---|
|
Custom partitioning DataStream → DataStream |
使用用戶定義的分區(qū)器來(lái)為每個(gè)元素選擇目標(biāo)任務(wù)。 <pre> <code> dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); </code></pre> |
|
Random partitioning DataStream → DataStream |
根據(jù)均勻分布隨機(jī)劃分元素。 <pre> <code> dataStream.shuffle(); </code></pre> |
|
Rebalancing (Round-robin partitioning) DataStream → DataStream |
分區(qū)元素循環(huán),為每個(gè)分區(qū)創(chuàng)建相同的負(fù)載。在數(shù)據(jù)傾斜的情況下對(duì)性能優(yōu)化有用。 <pre> <code> dataStream.rebalance(); </code></pre> |
|
Rescaling DataStream → DataStream |
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers. The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation. In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations. Please see this figure for a visualization of the connection pattern in the above example: ![]() Apache_Flink_1_4_Documentation__Operators.png
<pre> <code> dataStream.rescale(); </code></pre> |
|
Broadcasting DataStream → DataStream |
向每個(gè)分區(qū)廣播元素。 <pre> <code> dataStream.broadcast(); </code></pre> |
任務(wù)鏈接和資源組
鏈接兩個(gè)子轉(zhuǎn)換意味著它們?cè)谕粋€(gè)線程中以獲得更好的性能。Flink默認(rèn)如果可能會(huì)鏈接操作符(例如,兩個(gè)map轉(zhuǎn)換)。如果需要,API對(duì)鏈接提供細(xì)粒度的控制。
如果想要在整個(gè)作業(yè)中禁止鏈接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。對(duì)于更細(xì)粒度的控制,可以使用下述函數(shù)。注意這些函數(shù)只能在DataStream轉(zhuǎn)換之后使用,因?yàn)樗鼈円昧饲懊娴霓D(zhuǎn)換。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
在Flink中,資源組是一個(gè)槽,見(jiàn)slots。如果需要,您可以在單獨(dú)的槽中手動(dòng)隔離操作符。
| 轉(zhuǎn)換操作 | 描述 |
|---|---|
| Start new chain | 從這個(gè)操作符開(kāi)始一個(gè)新的鏈。兩個(gè)map將被鏈接,并且filter不會(huì)被鏈接到第一個(gè)map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre> |
| Disable chaining | 不要鏈接map操作符. <pre> <code>someStream.map(...).disableChaining(); |
| </code></pre> | |
| Set slot sharing group | Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre> |
