Operators

原文鏈接

操作符將一個(gè)或多個(gè)DataStream轉(zhuǎn)換成為新的DataStream。程序可以將多個(gè)轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓步。

本節(jié)給出了基本轉(zhuǎn)換的描述,以及應(yīng)用這些轉(zhuǎn)換后的有效物理分區(qū)和Flink的操作符鏈接的見(jiàn)解。

DataStream轉(zhuǎn)換操作

轉(zhuǎn)換操作 描述
Map
DataStream → DataStream
輸入一個(gè)元素并返回一個(gè)元素。一個(gè)將輸入流的value乘以2的map函數(shù):
<pre> DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
}); </pre>
FlatMap
DataStream → DataStream
輸入一個(gè)元素,返回0,1或多個(gè)元素。一個(gè)將句子拆分成單詞的flatmap函數(shù):
<pre> <code> dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});</code> </pre>
Filter
DataStream → DataStream
計(jì)算每個(gè)元素的布爾函數(shù),并保留函數(shù)返回true的值,一個(gè)過(guò)濾掉0值的filter:
<pre> dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
}); <code></code> </pre>
KeyBy
DataStream → KeyedStream
邏輯上將一條流劃分為不同的區(qū),每個(gè)區(qū)包含相同鍵的元素。在內(nèi)部,這是通過(guò)hash分區(qū)實(shí)現(xiàn)的。如何指定鍵請(qǐng)看keys。這個(gè)轉(zhuǎn)換操作返回一個(gè)KeyedStream。
<pre> <code>dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple</code></pre>
注意:下述類型不能成為key:
1. 它是一個(gè)POJO類,但是沒(méi)有重寫hashCode()方法而是依賴于Object.hashCode()的實(shí)現(xiàn)。
2. 它是任意類型的數(shù)組。
Reduce
KeyedStream → DataStream
keyed數(shù)據(jù)流上的“滾動(dòng)”reduce。將當(dāng)前元素與最后一個(gè)reduce值組合并發(fā)出新的值。
一個(gè)創(chuàng)建流的部分和的reduce函數(shù):
<pre> <code> keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
}); </code></pre>
Fold
KeyedStream → DataStream
具有初始值的keyed數(shù)據(jù)流上的“滾動(dòng)”fold。將當(dāng)前元素與最后一個(gè)fold值組合并發(fā)出新的值。
當(dāng)應(yīng)用于序列(1,2,3,4,5)上時(shí),fold函數(shù)會(huì)發(fā)出"start-1", "start-1-2", "start-1-2-3", ...
<pre> <code> DataStream<String> result = keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations
KeyedStream → DataStream
keyed數(shù)據(jù)流上的滾動(dòng)聚合。min和minBy的區(qū)別在于min返回最小值,minBy返回這個(gè)屬性上具有最小值的元素(max和maxBy相同).
<pre> <code> keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key"); </code></pre>
Window
KeyedStream → WindowedStream
可以在已經(jīng)分區(qū)的KeyedStream上定義窗口。窗口根據(jù)鍵的某些特性(例如,在最后5秒到達(dá)的數(shù)據(jù))分組數(shù)據(jù)。關(guān)于窗口的完整描述請(qǐng)見(jiàn)Windows。
<pre> <code> dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data </code></pre>
WindowAll
DataStream → AllWindowedStream
可以在常規(guī)的DataStream上定義窗口。窗口根據(jù)一些特性(例如,在最后5秒到達(dá)的數(shù)據(jù))分組所有流的事件。關(guān)于窗口的完整描述請(qǐng)見(jiàn)Windows
警告: 這在很多情況下不是一個(gè)并行的轉(zhuǎn)換。windowAll操作符會(huì)把所有的記錄收集到一個(gè)任務(wù)中。
<pre> <code> dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data</code></pre>
Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
將一個(gè)通用函數(shù)應(yīng)用于窗口。下面是一個(gè)手動(dòng)計(jì)算窗口元素的函數(shù)。
注意: 如果使用windowAll轉(zhuǎn)換,需要使用AllWindowFunction代替。
<pre> <code> windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
public void apply (Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
}); </code></pre>
Window Reduce
WindowedStream → DataStream
將reduce函數(shù)應(yīng)用到窗口上,并返回reduce的值。
<pre> <code> windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
}); </code></pre>
Window Fold
WindowedStream → DataStream
將fold函數(shù)應(yīng)用到窗口上,并返回fold值。當(dāng)應(yīng)用到列表(1,2,3,4,5)上時(shí),示例函數(shù)fold列表成字符串"start-1-2-3-4-5":
<pre> <code> windowedStream.fold("start", new FoldFunction<Integer, String>() {
public String fold(String current, Integer value) {
return current + "-" + value;
}
}); </code></pre>
Aggregations on windows
WindowedStream → DataStream
聚合窗口的內(nèi)容。min和minBy的區(qū)別在于min返回最小值,minBy返回這個(gè)屬性上具有最小值的元素(max和maxBy相同).
<pre> <code> windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key"); </code></pre>
Union
DataStream* → DataStream
合并兩個(gè)或多個(gè)數(shù)據(jù)流,然后生成一個(gè)新的包含所有流的所有元素的流。注意:如果將數(shù)據(jù)流與自己合并,在新的流中會(huì)得到每個(gè)元素兩次。
dataStream.union(otherStream1, otherStream2, ...);
Window Join
DataStream,DataStream → DataStream
在公共窗口和給定鍵上連接兩個(gè)數(shù)據(jù)流。
<pre> <code> dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...}); </code></pre>
Window CoGroup
DataStream,DataStream → DataStream
在公共窗口和給定鍵上對(duì)兩個(gè)數(shù)據(jù)流進(jìn)行分組。
<pre> <code> dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...}); </code></pre>
Connect
DataStream,DataStream → ConnectedStreams
“連接”兩個(gè)數(shù)據(jù)流保留它們的類型。連接允許兩個(gè)流之間的共享狀態(tài)。
<pre> <code> DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);</code></pre>
CoMap, CoFlatMap
ConnectedStreams → DataStream
類似于連接數(shù)據(jù)流上的map和flatMap。
<pre> <code> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});</code></pre>
Split
DataStream → SplitStream
根據(jù)某些標(biāo)準(zhǔn)將流分成兩個(gè)或多個(gè)流。
<pre> <code> SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
}); </code></pre>
Select
SplitStream → DataStream
從拆分流中選擇一個(gè)或多個(gè)流。
<pre> <code> SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd"); </code></pre>
Iterate
DataStream → IterativeStream → DataStream
通過(guò)重定向操作符的輸出到前一個(gè)操作符,創(chuàng)建一個(gè)流上的“反饋”循環(huán)。這對(duì)于連續(xù)更新模型的算法特別有用。下面的代碼以一個(gè)數(shù)據(jù)流開(kāi)始并持續(xù)的應(yīng)用迭代體。大于0的元素被發(fā)送回反饋通道,其余元素被發(fā)送到下游。完整的描述請(qǐng)參見(jiàn)iterations。
<pre> <code> IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/do something/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
}); </code></pre>
Extract Timestamps
DataStream → DataStream
從記錄中提取時(shí)間戳,以便于使用事件時(shí)間語(yǔ)義的窗口工作。見(jiàn)Event Time。
stream.assignTimestamps (new TimeStampExtractor() {...});

可以對(duì)Tuple數(shù)據(jù)流有以下轉(zhuǎn)換:

轉(zhuǎn)換操作 描述
Project
DataStream → DataStream
選取元組屬性的子集。
<pre> <code> DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0); </code></pre>

物理分區(qū)

Flink通過(guò)下述函數(shù),在轉(zhuǎn)換后的流上提供低層次的精確流分區(qū)控制(如果需要的話)。

轉(zhuǎn)換操作 描述
Custom partitioning
DataStream → DataStream
使用用戶定義的分區(qū)器來(lái)為每個(gè)元素選擇目標(biāo)任務(wù)。
<pre> <code> dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0); </code></pre>
Random partitioning
DataStream → DataStream
根據(jù)均勻分布隨機(jī)劃分元素。
<pre> <code> dataStream.shuffle(); </code></pre>
Rebalancing (Round-robin partitioning)
DataStream → DataStream
分區(qū)元素循環(huán),為每個(gè)分區(qū)創(chuàng)建相同的負(fù)載。在數(shù)據(jù)傾斜的情況下對(duì)性能優(yōu)化有用。
<pre> <code> dataStream.rebalance(); </code></pre>
Rescaling
DataStream → DataStream
Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.

The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 6, then one upstream operation would distribute elements to three downstream operations while the other upstream operation would distribute to the other three downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 6 then three upstream operations would distribute to one downstream operation while the other three upstream operations would distribute to the other downstream operation.

In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.

Please see this figure for a visualization of the connection pattern in the above example:
Apache_Flink_1_4_Documentation__Operators.png

<pre> <code> dataStream.rescale(); </code></pre>
Broadcasting
DataStream → DataStream
向每個(gè)分區(qū)廣播元素。 <pre> <code> dataStream.broadcast(); </code></pre>

任務(wù)鏈接和資源組

鏈接兩個(gè)子轉(zhuǎn)換意味著它們?cè)谕粋€(gè)線程中以獲得更好的性能。Flink默認(rèn)如果可能會(huì)鏈接操作符(例如,兩個(gè)map轉(zhuǎn)換)。如果需要,API對(duì)鏈接提供細(xì)粒度的控制。
如果想要在整個(gè)作業(yè)中禁止鏈接,使用StreamExecutionEnvironment.disableOperatorChaining()方法。對(duì)于更細(xì)粒度的控制,可以使用下述函數(shù)。注意這些函數(shù)只能在DataStream轉(zhuǎn)換之后使用,因?yàn)樗鼈円昧饲懊娴霓D(zhuǎn)換。例如,你可以使用someStream.map(…). startnewchain(),但你不能使用someStream.startNewChain()。
在Flink中,資源組是一個(gè)槽,見(jiàn)slots。如果需要,您可以在單獨(dú)的槽中手動(dòng)隔離操作符。

轉(zhuǎn)換操作 描述
Start new chain 從這個(gè)操作符開(kāi)始一個(gè)新的鏈。兩個(gè)map將被鏈接,并且filter不會(huì)被鏈接到第一個(gè)map。 <pre> <code> someStream.filter(...).map(...).startNewChain().map(...); </code></pre>
Disable chaining 不要鏈接map操作符. <pre> <code>someStream.map(...).disableChaining();
</code></pre>
Set slot sharing group Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). <pre> <code> someStream.filter(...).slotSharingGroup("name");</code></pre>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • 原文鏈接:https://ci.apache.org/projects/flink/flink-docs-rele...
    寫B(tài)ug的張小天閱讀 37,764評(píng)論 4 19
  • 看完了被討厭的勇氣一書,發(fā)現(xiàn)這真是本專門刷新三觀的書,而且是我看過(guò)的書中刷的最徹底的,其主要原因是,阿德勒心理學(xué)—...
    午洵閱讀 493評(píng)論 0 0
  • I have never make any article in English before in my ent...
    NicoleQIu閱讀 193評(píng)論 0 1
  • 1.工欲善其事必先利其器 做好準(zhǔn)備工作的重要性,準(zhǔn)備工作認(rèn)真負(fù)責(zé)無(wú)疑會(huì)增加自己的信心,即增加自己底氣又讓對(duì)方感受到...
    凱妮Hu閱讀 186評(píng)論 0 0

友情鏈接更多精彩內(nèi)容