Flink(2) 簡單上手

maven 依賴

  <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

Flink 批處理能力

/**
 * 批處理
 */
public class WordCount {

    public static void main(String[] args)  throws Exception{
        //1.創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.從文件讀取數(shù)據(jù)
        String inputPath = "/you/path/hello.txt";
        DataSet<String> inputDataSet = env.readTextFile(inputPath);
        //3.對數(shù)據(jù)集進行處理 ,拆成 單個單詞,轉(zhuǎn)換成 2元組(word ,1)進行統(tǒng)計
        DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0) //按照第一位置的word分組
                .sum(1); //按照第二個位置上的數(shù)據(jù)求和

        resultSet.print(); //輸出

    }

    //自定義類實現(xiàn)FlatMapFunction接口
    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            //按照空格分詞
            String[] words = value.split(" ");
            //遍歷所有的 word ,包成而元組輸出
            for (String word: words ) {
                 out.collect(new Tuple2<String,Integer>(word ,1));
            }
        }
    }
    
}

Flink 流處理能力


/**
 * 流處理
 */
public class StreamWordCount {

    public static void main(String[] args) throws Exception{
        //env
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

       //默認的并行度,是按照的當前電腦的cpu
//        env.getParallelism();
//        env.setParallelism(4);
        //前面的數(shù)字 1 ,代表當前我們現(xiàn)在環(huán)境當前并行執(zhí)行的線程的編號
        // 1> (hello,1)

        //2.從文件讀取數(shù)據(jù)
//        String inputPath = "/path/to/hello.txt";
//        DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);

        //用 parameter tool 工具從程序的啟動參數(shù)中提取配置項
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        //從隊列讀取數(shù)據(jù)
        //從socket文本流讀取數(shù)據(jù)
        DataStream<String> inputDataStream = env.socketTextStream(host, port);


        //基于數(shù)據(jù)流進行轉(zhuǎn)換計算     // keyBey按照key的hash ,進行重分區(qū),不做計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .keyBy(0)
                .sum(1);

        //打印輸出
        resultStream.print();

        //執(zhí)行任務
        env.execute();

    }
}

2.啟動設置

# idea 設置
program arguments :  --host localhost --port 7777
  1. 啟動nc
nc -lk 7777
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容