1.Spark Streaming存在的問題
處理延遲較高
對(duì)狀態(tài)的支持不完美
對(duì)窗口的支持不靈活
不支持EventTime

image.png
2.flink的架構(gòu)

image.png
JobManager:
也稱之為Master,用于協(xié)調(diào)分布式執(zhí)行,它用來(lái)調(diào)度task,協(xié)調(diào)檢查點(diǎn),協(xié)調(diào)失敗時(shí)恢復(fù)等。Flink運(yùn)行時(shí)至少存在一個(gè)master,如果配置高可用模式則會(huì)存在多個(gè)master,它們其中有一個(gè)是leader,而其他的都是standby
TaskManager:
也稱之為Worker,用于執(zhí)行一個(gè)dataflow的task、數(shù)據(jù)緩沖和Data Streams的數(shù)據(jù)交換,F(xiàn)link運(yùn)行時(shí)至少會(huì)存在一個(gè)TaskManager。JobManager和TaskManager可以直接運(yùn)行在物理機(jī)上,或者運(yùn)行YARN這樣的資源調(diào)度框架,TaskManager通過(guò)網(wǎng)絡(luò)連接到JobManager,通過(guò)RPC通信告知自身的可用性進(jìn)而獲得任務(wù)分配
3.Sparkstreaming 和flink的對(duì)比

image.png
4.Wordcount案例
pom文件:
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn._51doit.flink.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
代碼:
public class Test1 {
/**
* Created with IntelliJ IDEA.
* Description:
* User:
* Date: 2021-01-11
* Time: 11:55
*/
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<String> flatMap = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(s1);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> map = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
//分組
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);
sum.print();
environment.execute("job");
}
}
5.提交的兩種方式
方案一:命令行的方式
bin/flink run -m linux03:8081 -p 4 -c org.apache.flink.streaming.examples.socket.SocketWindowWordCount jar包 --hostname node-1.51doit.cn --port 8888
參數(shù)說(shuō)明:
-m指定主機(jī)名后面的端口為JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
-p 指定是并行度
-c 指定main方法的全類名
方案二:通過(guò)web頁(yè)面提交