Flink-1.13.0 Word Count 測試實(shí)例Demo

新建

基于DataSetApi 批處理word count

package com.flinktest.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 從文件中讀取數(shù)據(jù)
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        // 3. 將每行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換成二元組類型
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 將一行數(shù)據(jù)進(jìn)行分詞
            String[] words = line.split(" ");
            for (String word : words) {
                // 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照word進(jìn)行分組
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
        // 5. 分組內(nèi)聚合統(tǒng)計(jì)
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        // 6. 打印結(jié)果
        sum.print();
    }
}

基于DataStreamApi 批處理word count

package com.flinktest.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundedDataStreamWordCount {
    public static void main(String[] args) throws Exception{
        // 1. 創(chuàng)建流處理執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.讀取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("./input/words.txt");
        // 3.轉(zhuǎn)換計(jì)算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 將一行數(shù)據(jù)進(jìn)行分詞
            String[] words = line.split(" ");
            for (String word : words) {
                // 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照word進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5. 分組內(nèi)聚合統(tǒng)計(jì)
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6. 打印結(jié)果
        sum.print();
        // 7.啟動執(zhí)行
        env.execute();
    }
}

基于DataStreamApi 流處理word count

 package com.flinktest.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception{
        // 1. 創(chuàng)建流處理執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 參數(shù)中提取host&port
        ParameterTool params = ParameterTool.fromArgs(args);
        String host = params.get("host");
        Integer port = params.getInt("port");
        // 2.讀取文本流
        DataStreamSource<String> lineDataStreamSource = env.socketTextStream(host, port);
        // 3.轉(zhuǎn)換計(jì)算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 將一行數(shù)據(jù)進(jìn)行分詞
            String[] words = line.split(" ");
            for (String word : words) {
                // 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4.按照word進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5. 分組內(nèi)聚合統(tǒng)計(jì)
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6. 打印結(jié)果
        sum.print();
        // 7.啟動執(zhí)行
        env.execute();
    }
}

測試工具:nc -lk port

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容