flink socketTextStream window nc
測試工程POM.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
在windows cmd使用如下命令準備連接 9000端口發(fā)送數(shù)據(jù)
nc -lp 9000 -v
flink demo
package com.keke.test0827;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamWordCount2 {
public static void main(String[] args) throws Exception{
// 創(chuàng)建流處理執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// env.disableOperatorChaining();
// // 從文件中讀取數(shù)據(jù)
// String inputPath = "hello.txt";
// DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 從socket文本流讀取數(shù)據(jù)
DataStream<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
// DataStreamSource<String> inputDataStream = env.socketTextStream("127.0.0.1", 9000);
// 基于數(shù)據(jù)流進行轉(zhuǎn)換計算
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new MyFlatMapper())
.keyBy(0)
// .sum(1).setParallelism(2).slotSharingGroup("red");
// .sum(1);
resultStream.print();
// 執(zhí)行任務(wù)
env.execute();
}
// 自定義類,實現(xiàn)FlatMapFunction接口
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分詞
String[] words = value.split(" ");
// 遍歷所有word,包成二元組輸出
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
如上例子:
1 .sum(1).setParallelism(2).slotSharingGroup("red");
2 .sum(1);
快速,因為只啟動flink案例后如果連接不到9000端口(沒有先 執(zhí)行cmd nc -lp 9000 -v 命令)程序?qū)⒑芸旖K止 使用 windows cmd中命令 netstat -ano|findstr "9000" 查看 9000端口的啟動情況
如果選擇1 位置代碼, 無結(jié)果顯示。
如果選擇2 位置代碼, 結(jié)果顯示:
TCP 0.0.0.0:9000 0.0.0.0:0 LISTENING 4612