新建
基于DataSetApi 批處理word count
package com.flinktest.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2. 從文件中讀取數(shù)據(jù)
DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
// 3. 將每行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換成二元組類型
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 將一行數(shù)據(jù)進(jìn)行分詞
String[] words = line.split(" ");
for (String word : words) {
// 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4.按照word進(jìn)行分組
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5. 分組內(nèi)聚合統(tǒng)計(jì)
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
// 6. 打印結(jié)果
sum.print();
}
}
基于DataStreamApi 批處理word count
package com.flinktest.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class BoundedDataStreamWordCount {
public static void main(String[] args) throws Exception{
// 1. 創(chuàng)建流處理執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2.讀取文件
DataStreamSource<String> lineDataStreamSource = env.readTextFile("./input/words.txt");
// 3.轉(zhuǎn)換計(jì)算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 將一行數(shù)據(jù)進(jìn)行分詞
String[] words = line.split(" ");
for (String word : words) {
// 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4.按照word進(jìn)行分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
// 5. 分組內(nèi)聚合統(tǒng)計(jì)
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6. 打印結(jié)果
sum.print();
// 7.啟動執(zhí)行
env.execute();
}
}
基于DataStreamApi 流處理word count
package com.flinktest.wc;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static void main(String[] args) throws Exception{
// 1. 創(chuàng)建流處理執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 參數(shù)中提取host&port
ParameterTool params = ParameterTool.fromArgs(args);
String host = params.get("host");
Integer port = params.getInt("port");
// 2.讀取文本流
DataStreamSource<String> lineDataStreamSource = env.socketTextStream(host, port);
// 3.轉(zhuǎn)換計(jì)算
SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
// 將一行數(shù)據(jù)進(jìn)行分詞
String[] words = line.split(" ");
for (String word : words) {
// 將每個(gè)單詞轉(zhuǎn)換成二元組輸出
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4.按照word進(jìn)行分組
KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
// 5. 分組內(nèi)聚合統(tǒng)計(jì)
SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
// 6. 打印結(jié)果
sum.print();
// 7.啟動執(zhí)行
env.execute();
}
}
測試工具:nc -lk port