實戰(zhàn)案例介紹
本案例將實現(xiàn)一個“實時熱門商品”的需求,我們可以將“實時熱門商品”翻譯成程序員更好理解的需求:每隔5分鐘輸出最近一小時內(nèi)點擊量最多的前 N 個商品。

將這個需求進行分解我們大概要做這么幾件事情:
抽取出業(yè)務(wù)時間戳,告訴 Flink 框架基于業(yè)務(wù)時間做窗口
過濾出點擊行為數(shù)據(jù)
按一小時的窗口大小,每5分鐘統(tǒng)計一次,做滑動窗口聚合(Sliding Window)
按每個窗口聚合,輸出每個窗口中點擊量前N名的商品
數(shù)據(jù)準備
這里我們準備了一份淘寶用戶行為數(shù)據(jù)集(來自阿里云天池公開數(shù)據(jù)集)。本數(shù)據(jù)集包含了淘寶上某一天隨機一百萬用戶的所有行為(包括點擊、購買、加購、收藏)。數(shù)據(jù)集的組織形式和MovieLens-20M類似,即數(shù)據(jù)集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時間戳組成,并以逗號分隔。關(guān)于數(shù)據(jù)集中每一列的詳細描述如下:
| 列名稱 | 說明 |
|---|---|
| 用戶ID | 整數(shù)類型,加密后的用戶ID |
| 商品ID | 整數(shù)類型,加密后的商品ID |
| 商品類目ID | 整數(shù)類型,加密后的商品所屬類目ID |
| 行為類型 | 字符串,枚舉類型,包括('pv', 'buy', 'cart', 'fav') |
| 時間戳 | 行為發(fā)生的時間戳,單位秒 |
你可以通過下面的命令下載數(shù)據(jù)集到項目的 resources 目錄下:
$ cd my-flink-project/src/main/resources
$ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
這里是否使用 curl 命令下載數(shù)據(jù)并不重要,你也可以使用 wget 命令或者直接訪問鏈接下載數(shù)據(jù)。關(guān)鍵是,將數(shù)據(jù)文件保存到項目的 resources 目錄下,方便應(yīng)用程序訪問。
編寫程序
在 src/main/java/myflink 下創(chuàng)建 HotItems.java 文件:
package myflink;
public class HotItems {
public static void main(String[] args) throws Exception {
}
}
與上文一樣,我們會一步步往里面填充代碼。第一步仍然是創(chuàng)建一個 StreamExecutionEnvironment,我們把它添加到 main 函數(shù)中。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 為了打印到控制臺的結(jié)果不亂序,我們配置全局的并發(fā)為1,這里改變并發(fā)對結(jié)果正確性沒有影響
env.setParallelism(1);
創(chuàng)建模擬數(shù)據(jù)源
在數(shù)據(jù)準備章節(jié),我們已經(jīng)將測試的數(shù)據(jù)集下載到本地了。由于是一個csv文件,我們將使用 CsvInputFormat 創(chuàng)建模擬數(shù)據(jù)源。
注:雖然一個流式應(yīng)用應(yīng)該是一個一直運行著的程序,需要消費一個無限數(shù)據(jù)源。但是在本案例教程中,為了省去構(gòu)建真實數(shù)據(jù)源的繁瑣,我們使用了文件來模擬真實數(shù)據(jù)源,這并不影響下文要介紹的知識點。這也是一種本地驗證 Flink 應(yīng)用程序正確性的常用方式。
我們先創(chuàng)建一個 UserBehavior 的 POJO 類(所有成員變量聲明成public便是POJO類),強類型化后能方便后續(xù)的處理。
/** 用戶行為數(shù)據(jù)結(jié)構(gòu) **/
public static class UserBehavior {
public long userId; // 用戶ID
public long itemId; // 商品ID
public int categoryId; // 商品類目ID
public String behavior; // 用戶行為, 包括("pv", "buy", "cart", "fav")
public long timestamp; // 行為發(fā)生的時間戳,單位秒
}
接下來我們就可以創(chuàng)建一個 PojoCsvInputFormat 了, 這是一個讀取 csv 文件并將每一行轉(zhuǎn)成指定 POJO 類型(在我們案例中是 UserBehavior)的輸入器。
// UserBehavior.csv 的本地文件路徑
URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv");
Path filePath = Path.fromLocalFile(new File(fileUrl.toURI()));
// 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo
PojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);
// 由于 Java 反射抽取出的字段順序是不確定的,需要顯式指定下文件中字段的順序
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
// 創(chuàng)建 PojoCsvInputFormat
PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
下一步我們用 PojoCsvInputFormat 創(chuàng)建輸入源。
DataStream dataSource = env.createInput(csvInput, pojoType);
這就創(chuàng)建了一個 UserBehavior 類型的 DataStream。
EventTime 與 Watermark
當我們說“統(tǒng)計過去一小時內(nèi)點擊量”,這里的“一小時”是指什么呢? 在 Flink 中它可以是指 ProcessingTime ,也可以是 EventTime,由用戶決定。
ProcessingTime:事件被處理的時間。也就是由機器的系統(tǒng)時間來決定。
EventTime:事件發(fā)生的時間。一般就是數(shù)據(jù)本身攜帶的時間。
在本案例中,我們需要統(tǒng)計業(yè)務(wù)時間上的每小時的點擊量,所以要基于 EventTime 來處理。那么如果讓 Flink 按照我們想要的業(yè)務(wù)時間來處理呢?這里主要有兩件事情要做。
第一件是告訴 Flink 我們現(xiàn)在按照 EventTime 模式進行處理,F(xiàn)link 默認使用 ProcessingTime 處理,所以我們要顯式設(shè)置下。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二件事情是指定如何獲得業(yè)務(wù)時間,以及生成 Watermark。Watermark 是用來追蹤業(yè)務(wù)事件的概念,可以理解成 EventTime 世界中的時鐘,用來指示當前處理到什么時刻的數(shù)據(jù)了。由于我們的數(shù)據(jù)源的數(shù)據(jù)已經(jīng)經(jīng)過整理,沒有亂序,即事件的時間戳是單調(diào)遞增的,所以可以將每條數(shù)據(jù)的業(yè)務(wù)時間就當做 Watermark。這里我們用 AscendingTimestampExtractor 來實現(xiàn)時間戳的抽取和 Watermark 的生成。
注:真實業(yè)務(wù)場景一般都是存在亂序的,所以一般使用
BoundedOutOfOrdernessTimestampExtractor。
DataStream timedData = dataSource
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// 原始數(shù)據(jù)單位秒,將其轉(zhuǎn)成毫秒
return userBehavior.timestamp * 1000;
}
});
這樣我們就得到了一個帶有時間標記的數(shù)據(jù)流了,后面就能做一些窗口的操作。
過濾出點擊事件
在開始窗口操作之前,先回顧下需求“每隔5分鐘輸出過去一小時內(nèi)點擊量最多的前 N 個商品”。由于原始數(shù)據(jù)中存在點擊、加購、購買、收藏各種行為的數(shù)據(jù),但是我們只需要統(tǒng)計點擊量,所以先使用 FilterFunction 將點擊行為數(shù)據(jù)過濾出來。
DataStream pvData = timedData
.filter(new FilterFunction() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// 過濾出只有點擊的數(shù)據(jù)
return userBehavior.behavior.equals("pv");
}
});
窗口統(tǒng)計點擊量
由于要每隔5分鐘統(tǒng)計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鐘滑動一次。即分別要統(tǒng)計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)... 等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。
DataStream windowedData = pvData
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());
我們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每個商品做滑動窗口(1小時窗口,5分鐘滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數(shù)據(jù),減少 state 的存儲壓力。較之.apply(WindowFunction wf)會將窗口中的數(shù)據(jù)都存儲下來,最后一起計算要高效地多。aggregate()方法的第一個參數(shù)用于
這里的CountAgg實現(xiàn)了AggregateFunction接口,功能是統(tǒng)計窗口中的條數(shù),即遇到一條數(shù)據(jù)就加一。
/** COUNT 統(tǒng)計的聚合函數(shù)實現(xiàn),每出現(xiàn)一條記錄加一 */
public static class CountAgg implements AggregateFunction {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
.aggregate(AggregateFunction af, WindowFunction wf) 的第二個參數(shù)WindowFunction將每個 key每個窗口聚合后的結(jié)果帶上其他信息進行輸出。我們這里實現(xiàn)的WindowResultFunction將主鍵商品ID,窗口,點擊量封裝成了ItemViewCount進行輸出。
/** 用于輸出窗口的結(jié)果 */
public static class WindowResultFunction implements WindowFunction {
@Override
public void apply(
Tuple key, // 窗口的主鍵,即 itemId
TimeWindow window, // 窗口
Iterable aggregateResult, // 聚合函數(shù)的結(jié)果,即 count 值
Collector collector // 輸出類型為 ItemViewCount
) throws Exception {
Long itemId = ((Tuple1) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}
/** 商品點擊量(窗口操作的輸出類型) */
public static class ItemViewCount {
public long itemId; // 商品ID
public long windowEnd; // 窗口結(jié)束時間戳
public long viewCount; // 商品的點擊量
public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
ItemViewCount result = new ItemViewCount();
result.itemId = itemId;
result.windowEnd = windowEnd;
result.viewCount = viewCount;
return result;
}
}
現(xiàn)在我們得到了每個商品在每個窗口的點擊量的數(shù)據(jù)流。
TopN 計算最熱門商品
為了統(tǒng)計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據(jù)ItemViewCount中的windowEnd進行keyBy()操作。然后使用 ProcessFunction 實現(xiàn)一個自定義的 TopN 函數(shù) TopNHotItems 來計算點擊量排名前3名的商品,并將排名結(jié)果格式化成字符串,便于后續(xù)輸出。
DataStream topItems = windowedData
.keyBy("windowEnd")
.process(new TopNHotItems(3)); // 求點擊量前3名的商品
ProcessFunction 是 Flink 提供的一個 low-level API,用于實現(xiàn)更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我們將利用 timer 來判斷何時收齊了某個 window 下所有商品的點擊量數(shù)據(jù)。由于 Watermark 的進度是全局的,
在 processElement 方法中,每當收到一條數(shù)據(jù)(ItemViewCount),我們就注冊一個 windowEnd+1 的定時器(Flink 框架會自動忽略同一時間的重復注冊)。windowEnd+1 的定時器被觸發(fā)時,意味著收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的所有商品窗口統(tǒng)計值。我們在 onTimer() 中處理將收集的所有商品及點擊量進行排序,選出 TopN,并將排名信息格式化成字符串后進行輸出。
這里我們還使用了 ListState<ItemViewCount> 來存儲收到的每條 ItemViewCount 消息,保證在發(fā)生故障時,狀態(tài)數(shù)據(jù)的不丟失和一致性。ListState 是 Flink 提供的類似 Java List 接口的 State API,它集成了框架的 checkpoint 機制,自動做到了 exactly-once 的語義保證。
/** 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結(jié)果字符串 */
public static class TopNHotItems extends KeyedProcessFunction {
private final int topSize;
public TopNHotItems(int topSize) {
this.topSize = topSize;
}
// 用于存儲商品與點擊數(shù)的狀態(tài),待收齊同一個窗口的數(shù)據(jù)后,再觸發(fā) TopN 計算
private ListState itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 狀態(tài)的注冊
ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement(
ItemViewCount input,
Context context,
Collector collector) throws Exception {
// 每條數(shù)據(jù)都保存到狀態(tài)中
itemState.add(input);
// 注冊 windowEnd+1 的 EventTime Timer, 當觸發(fā)時,說明收齊了屬于windowEnd窗口的所有商品數(shù)據(jù)
context.timerService().registerEventTimeTimer(input.windowEnd + 1);
}
@Override
public void onTimer(
long timestamp, OnTimerContext ctx, Collector out) throws Exception {
// 獲取收到的所有商品點擊量
List allItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
// 提前清除狀態(tài)中的數(shù)據(jù),釋放空間
itemState.clear();
// 按照點擊量從大到小排序
allItems.sort(new Comparator() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) (o2.viewCount - o1.viewCount);
}
});
// 將排名信息格式化成 String, 便于打印
StringBuilder result = new StringBuilder();
result.append("====================================\n");
result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n");
for (int i=0;i
打印輸出
最后一步我們將結(jié)果打印輸出到控制臺,并調(diào)用env.execute執(zhí)行任務(wù)。
topItems.print();
env.execute("Hot Items Job");
運行程序
直接運行 main 函數(shù),就能看到不斷輸出的每個時間點的熱門商品ID。
本文通過實現(xiàn)一個“實時熱門商品”的案例,學習和實踐了 Flink 的多個核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的實現(xiàn)。希望本文能加深大家對 Flink 的理解,幫助大家解決實戰(zhàn)上遇到的問題。