環(huán)境配置
<properties>
<maven.compiler.source>14</maven.compiler.source>
<maven.compiler.target>14</maven.compiler.target>
<flink.version>1.13.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
批處理:DataSet API 實現(xiàn)一個word count案例
DataSet API將面臨淘汰,flink流處理才是核心,怎么用流實現(xiàn)批,提交的時候加一個參數(shù):BATCH模式
public class woodcut {
public static void main(String[] args) throws Exception {
/*創(chuàng)建執(zhí)行環(huán)境*/
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = executionEnvironment.readTextFile("src/main/resources/a.txt");
FlatMapOperator<String, Tuple2<String, Integer>> flatMapOperator = source.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
flatMapOperator.groupBy(0).sum(1).print();
}
}
流處理:有界流
public class bindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment ex= StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = ex.readTextFile("src/main/resources/a.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord = streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
ex.execute();
}
}
流處理:無界流
模擬測試一下,需要安裝 nc,實現(xiàn)端口的監(jiān)聽 netcat 1.11
public class unbindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = environment.socketTextStream("localhost", 6666);
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord =
streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
environment.execute();
}
}
從args中獲取參數(shù)
使用flink提供的ParameterTool.fromArgs(args);
注意寫法沒有 : --host localhost --port 6666
public class unbindedwordcount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool fromArgs = ParameterTool.fromArgs(args);
String host = fromArgs.get("host");
int port = fromArgs.getInt("port");
DataStreamSource<String> streamSource = environment.socketTextStream(host, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> flapWord =
streamSource.flatMap((String line, Collector<Tuple2<String, Integer>> s) -> {
String[] split = line.split("\\s+");
for (String s1 : split) {
s.collect(Tuple2.of(s1, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
/* flapWord.keyBy(0)*/
DataStreamSink<Tuple2<String, Integer>> streamSink = flapWord.keyBy(data -> data.f0).sum(1).print();
environment.execute();
}
}