Apache Flink是一個可以在有限流數據流和無限流基礎上進行有狀態(tài)計算的大數據處理框架。Flink從下到上提供了不同層級的API抽象,并為常見的用例提供了專用的開發(fā)庫。
構建流處理應用程序 (Building Blocks for Streaming Applications)
流處理框架如何控制 Stream, State,Time等因素,決定了框架能構建和執(zhí)行什么類型的應用(The types of applications that can be built with and executed by a stream processing framework are defined by how well the framework controls streams, state, and time)。下面我們將逐一描述流處理應用的基礎組件,并闡述Flink處理他們的方法。
Streams (流)
很明顯,(數據) 流是流處理應用一個基本概念。然而一個流可以有不同的特性,這些特性將影響流能夠及應該如何處理數據。Flink是一個具有具有多種處理功能的強大數據處理框架,它可以處理多種類型的流。
- 有限流 和 無限流: 流可以是無限的 或 有限的。即,有固定數據大小或固定數據條數的數據集。Flink 具有處理無限流的強大特性,同時也有專用的算子 (Operators) 高效處理有限流(數據集)。
- 實時流 和 有順序流: 所有的數據都是以流的形式產生的。通常情況下處理數據有2種方式。當流數據產生時就立即處理,或者 將產生的流數據存儲起來,即先儲存到文件系統或存儲對象中,之后再處理。Flink可以處理存儲數據流或實時數據流。
Status (狀態(tài))
每一個非普通(non-trivial)流處理程序都是有狀態(tài)的,也就是說只有那些只針對單個事件進行單獨處理的應用流式應用才不需要關心應用狀態(tài)。換句話說,任何一個具體基本業(yè)務處理邏輯的應用程序都需要記錄 事件狀態(tài) 或中間結果,以便在今后業(yè)務需要時訪問這些狀態(tài)。

在Flink應用中應用狀態(tài)是一等公民。可以從Flink在上下文環(huán)境中提供的對狀態(tài)處理的所有特性就能證明這一點。
- 多種原始基礎狀態(tài): Flink為不同基礎數據結構提供了不同的狀態(tài)類型: 如 原子值,List,Map等數據結構。開發(fā)人員可以根據不同函數對不同狀態(tài)的訪問處理形式來選擇最高效,最適合的狀態(tài)類型。
- 插件式狀態(tài)后端:Flink應用的狀態(tài)被插件式的狀態(tài)后端管理并定期執(zhí)行Checkpoint操作。Flink具有不同類型的狀態(tài)后端,這些狀態(tài)后端把應用的狀態(tài)保存在內存(Memory) 或 RockDB(一種高效的嵌入式磁盤存儲)中。Flink也支持插件式的自定義的狀態(tài)后端。
- 精確一次的狀態(tài)一致性:Flink的Checkpoint機制和故障恢復算法保證了應用失敗后狀態(tài)恢復的數據一致性。因此,Flink故障處理是透明的,不會影響應用程序的正確性。
- 非常大的狀態(tài): 由于Flink的異步和增量的Checkpoint算法,使Flink能夠維持TB級的應用狀態(tài)。
- 應用擴展: Flink通過將狀態(tài)分配給不同規(guī)模Worker來支持有狀態(tài)應用的水平擴展。
Time
時間是流處理應用的另一個重要組成部分。多數的事件流都有內部的時間語義,因為流的每個事件都是在特定的時間點生成的。此外,許多通用的流計算都是基于時間的,例如窗口聚合,sessionization(會話計算), 模式匹配,以及基于時間的Join。流式計算的另一個重要方面是應用如何測量時間,即區(qū)分 事件時間 和 處理時間。
Flink提供了一組豐富的與時間有關的特性。
- 事件時間模式:使用事件時間定義的流處理應用基于事件內的timestamp(時間戳) 計算程序結果。因此不管是處理流式數據還是已經保存記錄下來的數據,基于事件時間的數據處理保證了結果的正確性和一致性。
- 水印支持: 在"事件時間"應用中Flink引入水印(Watermark)利用水印來衡量事件進展。水印是一種權衡結果延遲和完事性的一種靈活機制。
- 遲到數據處理: 在以"事件時間"為基準流處理中使用水印時,可能發(fā)生所有相關事件到達之前計算已經完成的情況。這種事件叫遲到數據。Flink提供了多種選項來處理遲到數據,例如:通過 側輸出 重新路由這些數據并更新之前的處理結果。
- 處理時間模式:除了事件模式外,Flink也支持處理時間語義,它是根據具體處理機器的本地時間來觸發(fā)計算的。處理時間模式適用于一些對數據延遲有嚴格要求但允許損失一定數據精度的應用。
APIs 分層 (Layered APIs)
Flink提供了3層APIs. 每層API 在表達性,簡潔性及不同使用場景之間提供了不同的權衡。

我們簡要介紹一下每層API,討論他們的應用,并展示一段樣例代碼。
The ProcessFunctions
ProcessFunctions 是Flink提供的最具有表達力的函數接口。Flink的ProcessFunctions函數接口用來處理1或2個流中獨立的事件,或處理聚合窗口中的事件。ProcessFuntions函數提供了對時間和狀態(tài)的細粒度控制。一個ProcessFunctions可以隨意修改它自身的狀態(tài),注冊時間定時器,時間定時器在將來的某個時刻觸發(fā)函數回調。因此,許多基于事件狀態(tài)驅動的應用,ProcessFunctions根據要求實現 基于單個事件的、復雜的業(yè)務處理。
下面展示了KeyedStream流中使用KeyedProcessFunction函數匹配符合START和END事件的例子。當START事件到達函數時,函數將事件的timestamp(時間戳)記錄在state(狀態(tài))中,并啟動一個4小時后觸發(fā)的定時器。如果一個END事件在定時器觸發(fā)之前到達,函數計算START事件和END事件之間的時間差值,清除狀態(tài),并返回時間差值。否則觸發(fā)定時器并清除狀態(tài)值。
/**
* Matches keyed START and END events and computes the difference between
* both elements' timestamps. The first String field is the key attribute,
* the second String attribute marks START and END events.
*/
public static class StartEndDuration
extends KeyedProcessFunction<String, Tuple2<String, String>, Tuple2<String, Long>> {
private ValueState<Long> startTime;
@Override
public void open(Configuration conf) {
// obtain state handle
startTime = getRuntimeContext()
.getState(new ValueStateDescriptor<Long>("startTime", Long.class));
}
/** Called for each processed event. */
@Override
public void processElement(
Tuple2<String, String> in,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
switch (in.f1) {
case "START":
// set the start time if we receive a start event.
startTime.update(ctx.timestamp());
// register a timer in four hours from the start event.
ctx.timerService()
.registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000);
break;
case "END":
// emit the duration between start and end event
Long sTime = startTime.value();
if (sTime != null) {
out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime));
// clear the state
startTime.clear();
}
default:
// do nothing
}
}
/** Called when a timer fires. */
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) {
// Timeout interval exceeded. Cleaning up the state.
startTime.clear();
}
}
示例說明了KeyedProcessFunction的表達力,但高亮部分也說明他是一個冗長的接口。
The DataStream API
DataStream API 為許多長見的流處理操作提供語義支持,例如 窗口操作,事件轉換,通過查詢外部存儲來豐富事件等。DataStream API支持Java和Scala,同時DataStream是基于函數的,例如 map(), reduce(),aggregate()。可以實現接口來定義函數,也可以是Java或Scala支持的lambda函數。
下面例子展示了如何定義一個clickstream,并統計每個會話的點擊次數。
// a stream of website clicks
DataStream<Click> clicks = ...
DataStream<Tuple2<String, Long>> result = clicks
// project clicks to userId and add a 1 for counting
.map(
// define function by implementing the MapFunction interface.
new MapFunction<Click, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Click click) {
return Tuple2.of(click.userId, 1L);
}
})
// key by userId (field 0)
.keyBy(0)
// define session window with 30 minute gap
.window(EventTimeSessionWindows.withGap(Time.minutes(30L)))
// count clicks per session. Define function as lambda function.
.reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
SQL & Table API
Flink提供了兩種關系型API,Table API 和 SQL。兩個API為批處理和流處理提供了統一的API,也就是說,查詢在無限的數據流,或有限的記錄流上以相同的語義執(zhí)行,并產生相同的結果。Table API 和 SQL 利用 Apache Calcite來解析,驗證,執(zhí)行查詢優(yōu)化。Table API 和 SQL 可以與 DataStream 和 DataSet API無縫集成,并支持用戶自下定義 標量函數,聚合函數,和 Table函數。
Flink關系型API的目的是要簡化數據分析,數據管道,ETL應用的定義。
下面的示例展示了使用SQL查詢一個基于Session窗口的點擊流,并統計每個Session的點擊次數。本例和DataStream API示例中的例子相同。
SELECT userId, COUNT(*)
FROM clicks
GROUP BY SESSION(clicktime, INTERVAL '30' MINUTE), userId
Libraries
Flink為常用的數據處理用例提供了若干類庫。 這些類庫通常嵌入在一個API中,而不是完全獨立的。這使得類庫API可以從Flink提供其他特征API中獲益,并與其他類庫集成。
復雜事件處理(Complex Event Processing (CEP)): 模式檢測是事件流處理中一個常見用例。Flink的CEP庫提供了一個API來指定事件模式(考慮正則表達式或狀態(tài)機制)。Flink CEP 庫與 Flink DataStream API集成,這樣可以在DataStream上執(zhí)行模式計算。CEP 庫的應用包括網絡入侵檢測,業(yè)務處理監(jiān)控 和 欺詐檢測。
DataSet API: 略。
Gelly: 略。