maven 依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
Flink 批處理能力
/**
* 批處理
*/
public class WordCount {
public static void main(String[] args) throws Exception{
//1.創(chuàng)建執(zhí)行環(huán)境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.從文件讀取數(shù)據(jù)
String inputPath = "/you/path/hello.txt";
DataSet<String> inputDataSet = env.readTextFile(inputPath);
//3.對數(shù)據(jù)集進行處理 ,拆成 單個單詞,轉(zhuǎn)換成 2元組(word ,1)進行統(tǒng)計
DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) //按照第一位置的word分組
.sum(1); //按照第二個位置上的數(shù)據(jù)求和
resultSet.print(); //輸出
}
//自定義類實現(xiàn)FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
//按照空格分詞
String[] words = value.split(" ");
//遍歷所有的 word ,包成而元組輸出
for (String word: words ) {
out.collect(new Tuple2<String,Integer>(word ,1));
}
}
}
}
Flink 流處理能力
/**
* 流處理
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception{
//env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默認的并行度,是按照的當前電腦的cpu
// env.getParallelism();
// env.setParallelism(4);
//前面的數(shù)字 1 ,代表當前我們現(xiàn)在環(huán)境當前并行執(zhí)行的線程的編號
// 1> (hello,1)
//2.從文件讀取數(shù)據(jù)
// String inputPath = "/path/to/hello.txt";
// DataStreamSource<String> inputDataStream = env.readTextFile(inputPath);
//用 parameter tool 工具從程序的啟動參數(shù)中提取配置項
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//從隊列讀取數(shù)據(jù)
//從socket文本流讀取數(shù)據(jù)
DataStream<String> inputDataStream = env.socketTextStream(host, port);
//基于數(shù)據(jù)流進行轉(zhuǎn)換計算 // keyBey按照key的hash ,進行重分區(qū),不做計算
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
//打印輸出
resultStream.print();
//執(zhí)行任務
env.execute();
}
}
2.啟動設置
# idea 設置
program arguments : --host localhost --port 7777
- 啟動nc
nc -lk 7777