小白的新手學習筆記,請大佬輕噴
本文歸檔于
GitHub,歡迎大家批評指正
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
以上為 Flink 的官方定義,其核心表達為:Apache Flink 是一個分布式處理引擎框架,用于無邊界和有邊界的流式狀態(tài)計算,且有部署及性能等諸多優(yōu)勢。
Flink 的數(shù)據(jù)處理方法基于流式處理架構,是一種真正的流處理、流計算框架,其中的很多概念及思想模式對于大數(shù)據(jù)處理方法具有啟發(fā)意義。Flink 官網(wǎng)對于 Stream, State, Time 等組件做了詳細的解釋和說明。在下文中完成大數(shù)據(jù)版 Hello,World 的編寫與運行,同時繼續(xù)理解官方文檔。
Demo
本實例采用 JDK11 環(huán)境搭建。
Maven 依賴
首先,創(chuàng)建一個 Maven 項目,并添加必要的依賴。
<properties>
<flink.version>1.10.0</flink.version>
<scala.version>2.12</scala.version>
<log.version>2.0.0-alpha1</log.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<!--IDEA 運行時需要注釋-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
<!--IDEA 運行時需要注釋-->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${log.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${log.version}</version>
</dependency>
</dependencies>
官方教程中指出,所有這些依賴項的作用域都應該設置為 provided,這意味著需要這些依賴進行編譯,但不應將它們打包到項目生成的應用程序 jar 文件中– 因為這些依賴項是 Flink 的核心依賴,在應用啟動前已經(jīng)是可用的狀態(tài)了。
但是,將這些依賴的作用域設置為 provided 時運行程序會出現(xiàn) java.lang.NoClassDefFoundError 錯誤。所以,在使用 IDEA 運行程序時,將作用域更改為 compile。
批處理示例搭建
本次實例搭建采用官方的 WordCount 示例,進行少許的改動,代碼如下。
public class Main {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Tuple2<String, Integer>> counts =
// 把每一行文本切割成二元組,每個二元組為: (word,1)
text.flatMap(new Tokenizer())
// 根據(jù)二元組的第“0”位分組,然后對第“1”位求和
.groupBy(0)
.sum(1);
counts.print();
}
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 統(tǒng)一大小寫并把每一行切割為單詞
String[] tokens = value.toLowerCase().split("\\W+");
// 消費二元組
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
使用 IDEA 運行程序,在解決上述的依賴編譯問題之后,程序可以正常運行,并輸出如下結果。
(hear,1)
(ho,1)
(s,2)
(i,2)
(stand,1)
(who,2)
(them,1)
(there,2)
(think,1)
異常警告處理
雖然如上文所言,程序正常運行且輸出正確結果,但是同時也會有 log4j 的警告輸出。
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ExecutionEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
該錯誤由于沒有配置日志的輸出源導致,可以通過添加 log4j 的輸出源相關配置解決,添加配置文件 log4j.properties。
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.conversionPattern=%5p [%t] (%F:%L) - %m%n
日志輸出源的問題得到解決,但是由于 JDK9 之后禁止從類路徑上的代碼進行非法的反射訪問,會出現(xiàn)如下警告信息。
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.core.memory.MemoryUtils (file:org/apache/flink/flink-core/1.10.0/flink-core-1.10.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.flink.core.memory.MemoryUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
根據(jù)警告信息提示,通過設置啟動參數(shù) --illegal-access=warn 啟動進一步的警告信息,獲得詳細警告信息如下所示。
WARNING: Illegal reflective access by org.apache.flink.core.memory.MemoryUtils (file:org/apache/flink/flink-core/1.10.0/flink-core-1.10.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Illegal reflective access by org.apache.flink.core.memory.HybridMemorySegment (file:org/apache/flink/flink-core/1.10.0/flink-core-1.10.0.jar) to field java.nio.Buffer.address
WARNING: Illegal reflective access by WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:org/apache/flink/flink-runtime_2.12/1.10.0/flink-runtime_2.12-1.10.0.jar) to method java.nio.DirectByteBuffer.cleaner()
由警告信息可看出來,主要原因是 java.nio 的訪問或反射權限的問題,通過添加 --add-opens java.base/java.nio=ALL-UNNAMED 啟動參數(shù)來“打開”這個包的訪問權限,從而解決該警告問題。
最終,系統(tǒng)最終文件層次結構如下所示。
hello-flink\
│ pom.xml
│
└─src
├─main
│ ├─java
│ │ Main.java
│ │
│ └─resources
│ log4j.properties
│
└─test
└─java
流處理示例搭建
本次實例搭建采用官方 API 中的 WordCount Streaming 版示例,進行少許的改動,代碼如下。
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("127.0.0.1", 9000);
DataStream<Tuple2<String, Integer>> dataStream = text.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (s, collector) -> {
String[] tokens = s.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1);
dataStream.print();
env.execute("Java WordCount from SocketTextStream Example");
}
}
注意:
-
如果不啟動本地 Flink 集群,則需要先啟動 netcat:nc -l -p 9000,否則會出現(xiàn)如下錯誤:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(Main.java:54)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:451) at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:178) at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:321) at Main.main(Main.java:62) Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:350) at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:176) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:196) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:634) at Main.main(Main.java:54) 如果 JDK 版本為 9 以上,則啟動參數(shù)中添加
--add-opens java.base/java.lang=ALL-UNNAMED,原因同批處理。
Stream
Stream 可以通過任何形式、任何類型的數(shù)據(jù)形成,例如:系統(tǒng)日志、交易記錄等。它是流處理系統(tǒng)最核心要素,也是 Flink 框架的最基本組件。正如官方定義中所說的,它可以用于無邊界的流式計算也可以用于有邊界的流式計算。流的邊界及其處理方式成為學習理解 Flink 框架使用及其思想模式的重要內(nèi)容。
流的界限
流可以分為有界流與無界流。

正如上文所提到的系統(tǒng)日志和交易記錄,這兩種流可以理解為無界流,因為這種數(shù)據(jù)只定義了開始,而沒有定義結束時間。它會隨著時間的推移,無休止地產(chǎn)生數(shù)據(jù),逐漸膨脹,且不會有盡頭。因此,無界流必須被持續(xù)地處理,即當事件發(fā)生或者數(shù)據(jù)產(chǎn)生時就立刻進行計算。
有界流則不同,它有始有終。例如本次完成的 WrodCount 的文檔版次數(shù)統(tǒng)計,就是一種有界流處理。再比如說傳統(tǒng)的數(shù)據(jù)分析系統(tǒng) OLAP,通過對歷史數(shù)據(jù)的多維展示達到數(shù)據(jù)分析的目的。有界流有一個重要的特點,就是可以對流或者數(shù)據(jù)進行精確地度量,比如 1000 詞的文檔,或者 1TB 的歷史數(shù)據(jù)。因此,有界流可以在某個時間節(jié)點精準的開始和結束。
數(shù)據(jù)處理方式
數(shù)據(jù)流分為有界和無界,同時也會對應不同的數(shù)據(jù)處理方式:批處理 和 流處理。
在 Flink 的架構觀中,一切都是流。所以在流式處理中,輸入數(shù)據(jù)來一個處理一個,并流式輸出處理結果,如下圖所示。

Flink 的批處理依然是基于流式的處理。簡而言之,就是把輸入數(shù)據(jù)切割為一段段有界流,然后經(jīng)過數(shù)據(jù)引擎的處理之后流式地輸出多段有界流的結果。

為了加深對 Flink 流式的架構觀,進行 Spark 批處理思想的對比,一種完全不同的架構觀念。
在 Spark 的架構觀,一切都是“批”或者“批次”。會在存儲一定量的批數(shù)據(jù)之后進行統(tǒng)一的數(shù)據(jù)計算處理,也就是所謂的來一批處理一批,并批量輸出結果。

相比于 Flink 的流式處理,Spark Streaming 可以稱為準流式處理,也就是微批處理。就是將批次做的足夠小,即一個為一批,可以做到來一個處理一批。

State
只有在每一個單獨的事件上進行轉(zhuǎn)換操作的應用才不需要狀態(tài),換言之,每一個具有一定復雜度的流處理應用都是有狀態(tài)的。任何運行基本業(yè)務邏輯的流處理應用都需要在一定時間內(nèi)存儲所接收的事件或中間結果,以供后續(xù)的某個時間點(例如收到下一個事件或者經(jīng)過一段特定時間)進行訪問并進行后續(xù)處理。
如批處理實例中的 flatMap 操作將文本切割成二元組,并作為狀態(tài)存在內(nèi)存中,以提供后續(xù)分組求和時使用。

Time
時間是流處理應用另一個重要的組成部分。因為事件總是在特定時間點發(fā)生,所以大多數(shù)的事件流都擁有事件本身所固有的時間語義。進一步而言,許多常見的流計算都基于時間語義,例如窗口聚合、會話計算、模式檢測和基于時間的 join。流處理的一個重要方面是應用程序如何衡量時間,即區(qū)分事件時間(event-time)和處理時間(processing-time)。