2022-03-22-Flink-43(二)

環(huán)境配置
    <properties>
        <maven.compiler.source>14</maven.compiler.source>
        <maven.compiler.target>14</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
批處理:DataSet API 實現(xiàn)一個word count案例

DataSet API將面臨淘汰,flink流處理才是核心,怎么用流實現(xiàn)批,提交的時候加一個參數(shù):BATCH模式

public class woodcut {

    public static void main(String[] args) throws Exception {
        /*創(chuàng)建執(zhí)行環(huán)境*/
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
        FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }

        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        flatMapOperator.groupBy(0).sum(1).print();

    }
}
流處理:有界流
public class bindedwordcount {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

       /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        ex.execute();

    }
}
流處理:無界流

模擬測試一下,需要安裝 nc,實現(xiàn)端口的監(jiān)聽 netcat 1.11

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}

從args中獲取參數(shù)
使用flink提供的ParameterTool.fromArgs(args);
注意寫法沒有 : --host localhost --port 6666

public class unbindedwordcount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        ParameterTool fromArgs = ParameterTool.fromArgs(args);
        String host = fromArgs.get("host");
        int port = fromArgs.getInt("port");
        DataStreamSource<String> streamSource = environment.socketTextStream(host, port);

        SingleOutputStreamOperator<Tuple2<String, Integer>>  flapWord =
                streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
            String[] split = line.split("\\s+");
            for (String s1 : split) {
                s.collect(Tuple2.of(s1, 1));
            }


        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        /* flapWord.keyBy(0)*/
        DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();

        environment.execute();
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容