Flink流處理常見(jiàn)API

1. map

DataStream mapStram = dataStream.map(new?MapFunction<String, Integer>() {

????public Integer map(String value)?throws Exception {

????????return value.length();

????}

});


2. flatMap

DataStream?flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {

????public void flatMap(String value, Collector out) throws Exception {?

????????String[] fields = value.split(",");

????????for (String field: fields) {

????????????out.collect(field);

????????}

????}

});


3. filter

DataStream filterStream = dataStream.filter(new FilterFunction()<String>?{

????public boolean filter(String value) throws Exception {

????????return value == 1;

????}

});


4. keyBy

DataStream -> KeyedStream. 邏輯上將相同key拆分到同一分區(qū)


5. Rolling Aggregation

對(duì)KeyedStream的每一個(gè)支流做滾動(dòng)聚合。sum(), min(), max(), minBy(), maxBy()


6. reduce

KeyedStream ->?DataStream.?合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個(gè)新的值,返回的流中包含每一次聚合的結(jié)果

// 分組

KeyedStream keyedStream= dataStream.keyBy("id");

// reduce 聚合,取最小的溫度值,并輸出當(dāng)前的時(shí)間戳

DataStream reduceStream = keyedStream.reduce(new ReduceFunction()<SensorReading> {?

? ??@Override

? ??public SensorReading reduce(SensorReading value1,SensorReading value2)?throws Exception {

? ??????return new SensorReading(value1.getId(), value2.getTimestamp(), Math.min(value1.getTemperature(), value2.getTemperature()));

????}

});


7. Split和Select

DataStream -> split -> SplitStream -> select -> DataStream.

SplitStream?splitStream = dataStream.split(new OutputSelector()<SensorReading> {

????@Override

????public Iterable select(SensorReading value) {

????????return (value.getTemperature()>30) ? Collections.singletonList("high") : Collections.singletonList("low");

????}

});

DataStream?highTempStream = splitStream.select("high");

DataStream?lowTempStream = splitStream.select("low");

DataStream allTempStream = splitStream.select("high","low");


8. Connect

DataStream,DataStream ->?ConnectedStreams. 可以連接兩個(gè)不同類(lèi)型的數(shù)據(jù)流,兩個(gè)流被connect之后,只是被放到了同一個(gè)流里,仍然保持各自的數(shù)據(jù)類(lèi)型


9. CoMap和CoFlatMap

ConnectedStreams -> DataStream.?

// 合流 connect

DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {

????@Override

????public Tuple2 map(SensorReading value) throws Exception {

????????return new Tuple2<>(value.getId(), value.getTemperature());

????}

});

ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>, SensorReading,?Object>() {

????@Override

????public Object map1(Tuple2 value) throws Exception{

????????return new Tuple3<>(value.f0, value.f1,"warning");

????}

????@Override

????public Object map2(SensorReading value) throws Exception {

????????return new Tuple2<>(value.getId(),"healthy");

????}

});


10. Union

連接數(shù)據(jù)類(lèi)型一樣的多條流

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容