2021-01-14-Flink-19(Flink Transform 一)

問題:map中一對一的類型匹配問題?

Map 實現(xiàn)

實現(xiàn)一:

public class MapTransform {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-14
     * Time: 22:38
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Integer> map = source.transform("map", TypeInformation.of(Integer.class), new StreamMap<>(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String s) throws Exception {
                return Integer.parseInt(s);
            }
        }));
        map.print();
        environment.execute("job");
    }
}

實現(xiàn)二:

public class MapTransform {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-14
     * Time: 22:38
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<Integer> source = environment.fromElements(1, 2, 3, 4);
        SingleOutputStreamOperator<Integer> map= source.transform("MyMap", TypeInformation.of(Integer.class), new Mymap());
        map.print();
        environment.execute("job");
    }

    //指定一個out 指定輸入類型和輸出類型
    public static class Mymap extends AbstractStreamOperator<Integer>
            implements OneInputStreamOperator<Integer, Integer> {
        @Override
        public void processElement(StreamRecord<Integer> element) throws Exception {
            Integer value = element.getValue();
            element.replace(value * 2);
            output.collect(element);
        }
    }
}

2.FlatMap的實現(xiàn)

public class Flatmap {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-14
     * Time: 23:31
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<String> flatmap = source.transform("flatmap", TypeInformation.of(String.class), new MyFlatmap());
        flatmap.print();
        environment.execute("job");
    }

    public static class MyFlatmap extends AbstractStreamOperator<String>
            implements OneInputStreamOperator<String, String> {

        @Override
        public void processElement(StreamRecord<String> element) throws Exception {
            String value = element.getValue();
            String[] split = value.split(",");
            for (String s : split) {
                if (!"error".equals(s)) {
                      output.collect(element.replace(s));
                }
            }
        }
    }
}

3.Fliter

public class Fliter {
/**
 * Created with IntelliJ IDEA.
 * Description: 
 * User: 
 * Date: 2021-01-15
 * Time: 10:26
 */
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    DataStreamSource<Integer> source = environment.fromElements(1, 2, 3, 4, 5, 6, 7);
    SingleOutputStreamOperator<Integer> fliter = source.transform("fliter", TypeInformation.of(Integer.class), new Fliter1());
    fliter.print();
    environment.execute("job");
}
   static  class Fliter1 extends AbstractStreamOperator<Integer>
         implements OneInputStreamOperator<Integer, Integer> {
     @Override
     public void processElement(StreamRecord<Integer> element) throws Exception {
          if (element.getValue() % 2 != 0){
                output.collect(element);
          }
     }
 }
}

4.keyBy

在Flink開發(fā)中,如果定義一個JavaBean來封裝數(shù)據(jù),通常定義成public的成員變量,這是為了以后賦值和反射更加方便,如果定義了有參的構(gòu)造方法,那么一定要再定義一個無參的構(gòu)造方法,不然運行時會出現(xiàn)異常。并且最好定義一個靜態(tài)的of方法用來賦值,這樣更加方便??梢圆榭匆幌翭link中Tuple的源代碼也是這么實現(xiàn)的

public class CountBean {
    public String word;
    public Integer count;
    public CountBean(){}
    public CountBean(String word, Integer count) {
        this.word = word;
        this.count = count;
    }
    public static CountBean of(String word, Integer count) {
        return new CountBean(word, 1);
    }
    @Override
    public String toString() {
        return “CountBean{” + “word='” + word + ‘\” + “, count=” + count + ‘}’;
    }    @Override
    public int hashCode() {
       return word.hashCode();
    }
}
//將一行數(shù)據(jù)切分后得到的每一個單詞和1組合放入到自定義的bean實例中
DataStream<CountBean> wordAndOne = lines.flatMap(
        new FlatMapFunction<String, CountBean>() {
            @Override
            public void flatMap(String line,Collector<CountBean> out) throws Exception {
                String[] words = line.split(“\\W+”);
                for (String word : words) {
                    //將切分后的但是循環(huán)放入到bean中
                    out.collect(CountBean.of(word, 1));
                }
            }
        }
);
//按照Bean中的屬性名word進行分組
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy(“word”);
public class Keyby {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-15
     * Time: 21:27
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.fromElements("flink,spark,flume", "scala,java,spark");
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(",");
                for (String s1 : split) {
                    collector.collect(Tuple2.of(s1, 1));
                }
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> stream = flatMap.keyBy(1);
        stream.print();
        environment.execute("job");
    }
}
image.png

5.Max

min和minBy的區(qū)別在于,min返回的是參與分組的字段和要比較字段的最小值,如果數(shù)據(jù)中還有其他字段,其他字段的值是總是第一次輸入的數(shù)據(jù)的值。而minBy返回的是要比較的最小值對應(yīng)的全部數(shù)據(jù)
取每個分區(qū)的最大值或者最小值

public class Max {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-16
     * Time: 11:18
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] split = s.split(" ");
                return Tuple2.of(split[0], Integer.parseInt(split[1]));
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyBy = map.keyBy(x -> x.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> max = keyBy.max(1);
        max.print();
        environment.execute("job");
    }

6.Min/Minby

兩個算子都是實現(xiàn)滾動比較最小值的功能,只有KeyedStream才可以調(diào)用min、minBy方法,如果Tuple類型數(shù)據(jù),可以傳入一個要比較的字段對應(yīng)Tuple的下標(biāo)(數(shù)字),如果是自定義的POJO類型數(shù)據(jù),可以傳入一個要聚合的字段名稱。

public class Minby {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-16
     * Time: 14:26
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> map = source.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                String city = split[0];
                String country = split[1];
                int number = Integer.parseInt(split[2]);
                return Tuple3.of(city, country, number);


            }
        });
        KeyedStream<Tuple3<String, String, Integer>, Tuple> keyBy = map.keyBy("f0");
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> min = keyBy.min(2);
        //SingleOutputStreamOperator<Tuple3<String, String, Integer>> minBy = keyBy.minBy(2);
        min.print();
        environment.execute("job");
    }
}

7.Reduce

public class Reduce {
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User:
     * Date: 2021-01-16
     * Time: 15:01
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple2.of(split[0], Integer.parseInt(split[1]));
            }
        });
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = map.keyBy(0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = keyBy.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> stringIntegerTuple2, Tuple2<String, Integer> t1) throws Exception {
                //stringIntegerTuple2 是中間值或者是初始值
                stringIntegerTuple2.f1 += t1.f1;
                return stringIntegerTuple2;
                // return  Tuple2.of(stringIntegerTuple2.f0,stringIntegerTuple2.f1);
            }
        });
        reduce.print();
        environment.execute("job");
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容