1. map
DataStream mapStram = dataStream.map(new?MapFunction<String, Integer>() {
????public Integer map(String value)?throws Exception {
????????return value.length();
????}
});
2. flatMap
DataStream?flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
????public void flatMap(String value, Collector out) throws Exception {?
????????String[] fields = value.split(",");
????????for (String field: fields) {
????????????out.collect(field);
????????}
????}
});
3. filter
DataStream filterStream = dataStream.filter(new FilterFunction()<String>?{
????public boolean filter(String value) throws Exception {
????????return value == 1;
????}
});
4. keyBy
DataStream -> KeyedStream. 邏輯上將相同key拆分到同一分區(qū)
5. Rolling Aggregation
對(duì)KeyedStream的每一個(gè)支流做滾動(dòng)聚合。sum(), min(), max(), minBy(), maxBy()
6. reduce
KeyedStream ->?DataStream.?合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個(gè)新的值,返回的流中包含每一次聚合的結(jié)果
// 分組
KeyedStream keyedStream= dataStream.keyBy("id");
// reduce 聚合,取最小的溫度值,并輸出當(dāng)前的時(shí)間戳
DataStream reduceStream = keyedStream.reduce(new ReduceFunction()<SensorReading> {?
? ??@Override
? ??public SensorReading reduce(SensorReading value1,SensorReading value2)?throws Exception {
? ??????return new SensorReading(value1.getId(), value2.getTimestamp(), Math.min(value1.getTemperature(), value2.getTemperature()));
????}
});
7. Split和Select
DataStream -> split -> SplitStream -> select -> DataStream.
SplitStream?splitStream = dataStream.split(new OutputSelector()<SensorReading> {
????@Override
????public Iterable select(SensorReading value) {
????????return (value.getTemperature()>30) ? Collections.singletonList("high") : Collections.singletonList("low");
????}
});
DataStream?highTempStream = splitStream.select("high");
DataStream?lowTempStream = splitStream.select("low");
DataStream allTempStream = splitStream.select("high","low");
8. Connect
DataStream,DataStream ->?ConnectedStreams. 可以連接兩個(gè)不同類(lèi)型的數(shù)據(jù)流,兩個(gè)流被connect之后,只是被放到了同一個(gè)流里,仍然保持各自的數(shù)據(jù)類(lèi)型
9. CoMap和CoFlatMap
ConnectedStreams -> DataStream.?
// 合流 connect
DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
????@Override
????public Tuple2 map(SensorReading value) throws Exception {
????????return new Tuple2<>(value.getId(), value.getTemperature());
????}
});
ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);
DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>, SensorReading,?Object>() {
????@Override
????public Object map1(Tuple2 value) throws Exception{
????????return new Tuple3<>(value.f0, value.f1,"warning");
????}
????@Override
????public Object map2(SensorReading value) throws Exception {
????????return new Tuple2<>(value.getId(),"healthy");
????}
});
10. Union
連接數(shù)據(jù)類(lèi)型一樣的多條流