首先,附上 Github 鏈接
LakeSoul:https://github.com/meta-soul/LakeSoul,可搜索公眾號元靈數(shù)智,在底部菜單了解我們 - 用戶交流獲取官方技術(shù)交流群二維碼,進群與業(yè)內(nèi)大佬進行技術(shù)交流。
在之前的公眾號文章《重磅!開源湖倉平臺 LakeSoul 設(shè)計理念詳解》中,我們介紹了 LakeSoul 開源流批一體表存儲框架的設(shè)計理念和部分實現(xiàn)原理。LakeSoul 設(shè)計的初衷,是為了解決在流批一體的業(yè)務(wù)場景下,傳統(tǒng)的 Hive 數(shù)倉難以解決的各類問題,包括 Upsert 更新、Merge on Read、并發(fā)寫等。今天我們以一個典型的應(yīng)用場景:構(gòu)建實時機器學習樣本庫來展示 LakeSoul 的核心功能。
一、業(yè)務(wù)需求背景
1.1 在線推薦系統(tǒng)
在互聯(lián)網(wǎng)、金融等行業(yè),很多的業(yè)務(wù)場景都可以歸納為一個在線個性化推薦系統(tǒng),包括搜索、廣告、推薦、風控等。例如,在電商業(yè)務(wù)中,通過搭建個性化推薦系統(tǒng),可以實現(xiàn)千人千面的猜你喜歡推薦,提升用戶的點擊率、購買率等;在廣告業(yè)務(wù)中,個性化推薦是實現(xiàn)精準定向,提升 ROI 的核心系統(tǒng);在金融風控領(lǐng)域,需要實現(xiàn)對用戶償還能力、逾期可能性的實時預(yù)測,為每個用戶提供個性化的信貸額度、還貸周期等。
可以看到,推薦系統(tǒng)在各個行業(yè)領(lǐng)域都有著廣泛應(yīng)用。搭建一個工業(yè)級在線推薦系統(tǒng),需要很多的環(huán)節(jié)和系統(tǒng)相互銜接,有比較大的開發(fā)工作量。元靈數(shù)智平臺研發(fā)的 MetaSpore 框架提供了一站式的推薦系統(tǒng)開發(fā)解決方案,詳細介紹可以看我們之前的公眾號文章《重磅!基于新一代 MetaSpore 平臺快速搭建工業(yè)級推薦系統(tǒng)》。
本文著重介紹如何構(gòu)建一個實時的樣本庫,從而實現(xiàn) “用戶反饋 -- 模型迭代” 的完整閉環(huán),讓推薦系統(tǒng)能夠自我學習迭代更新,快速捕捉用戶的興趣變化。
1.2 什么是推薦系統(tǒng)機器學習樣本庫
在推薦系統(tǒng)中,核心部分是一個個性化排序的算法模型。模型訓練首先要構(gòu)造樣本,通過各類特征和用戶行為反饋標簽,去學習每一個用戶的偏好。樣本庫往往包括幾個部分:
用戶特征(User Feature):包括用戶的基本屬性,用戶的歷史行為和最近的實時行為等。其中用戶基本屬性特征可能是來自于在線的實時請求,也可能來自離線的 DMP 挖掘出的行為標簽。用戶的歷史行為、實時行為一般包括用戶歷史上有過反饋行為的事件,以及相關(guān)的一些聚合統(tǒng)計指標;
物品特征(Item Feature):物品是要給用戶推薦的對象,可以是商品、新聞、廣告等。特征一般包括物品的各類屬性,包括離散的屬性和統(tǒng)計值連續(xù)屬性等;
用戶反饋:即算法模型中的標簽(Label)。標簽是各類用戶的反饋行為,例如展示、點擊、轉(zhuǎn)化等。算法模型需要通過特征和標簽的關(guān)系,來學習建模用戶的偏好。
1.3 機器學習樣本庫構(gòu)建的挑戰(zhàn)
在構(gòu)建機器學習樣本庫時,往往會遇到這樣幾類問題和挑戰(zhàn):
實時性要求。業(yè)內(nèi)主流推薦系統(tǒng)的模型學習已經(jīng)往在線、實時化發(fā)展。模型更新越及時,對用戶興趣變化的捕捉就越快,從而能夠給出更加精準的推薦結(jié)果,提升業(yè)務(wù)效果。這就需要樣本庫能夠支撐很高的寫入吞吐的能力。
多流更新。在線上通過模型進行排序計算后,會有大量的在線特征,需要實時回流供進一步模型訓練使用。而用戶反饋也同樣需要回流到樣本庫,通常用戶反饋會有多個實時流。在這種情況下,會有多個實時流,同時并發(fā)寫入更新樣本庫的不同列。傳統(tǒng)的 Hive 數(shù)倉一般是無法支持實時更新,需要通過全量 Join 的方式來實現(xiàn),而在 Join 窗口較大的情況下,運行效率很低,還會帶來大量的數(shù)據(jù)重復(fù)冗余。使用 Flink 的窗口 Join,也同樣存在狀態(tài)數(shù)據(jù)龐大,運行維護成本較高的問題。
并行實驗。在實際業(yè)務(wù)開發(fā)中,算法工程師往往需要同時進行多組模型并行實驗,以對比效果。不同的模型可能需要不同的特征以及標簽列,這些列會通過不同的方式更新,例如一部分特征來自于離線的批量作業(yè)計算生成,這些批量數(shù)據(jù)也需要插入到特征庫中。
特征回溯。在算法業(yè)務(wù)開發(fā)中,有的時候需要增加特征,而模型需要重新回溯。這就要求將新特征批量更新到歷史的數(shù)據(jù)中。在 Hive 中也很難高效地實現(xiàn)。
可以看到,在推薦系統(tǒng)算法場景,構(gòu)建一個實時的樣本庫,存在比較多的挑戰(zhàn)。而這些挑戰(zhàn)的主要問題是 Hive 數(shù)倉的功能、性能較弱,對流批一體、增量更新、并發(fā)寫入等場景不能很好的支持。之前字節(jié)跳動等公司分享過基于 Hudi 的方案來進行流批一體構(gòu)建推薦系統(tǒng)樣本《字節(jié)跳動基于 Apache Hudi 構(gòu)建實時數(shù)據(jù)湖平臺實踐》,然而 Hudi 在實際使用中仍然存在并發(fā)更新等問題。
由數(shù)元靈開發(fā)并開源的流批一體表存儲框架,可以很好地解決以上這幾類問題。下面我們詳細介紹如何使用 LakeSoul 來構(gòu)建一個工業(yè)級推薦系統(tǒng)的樣本庫。
二、構(gòu)建實時機器學習樣本庫
LakeSoul 是一個為流批一體場景設(shè)計的表存儲框架,具有如下幾個關(guān)鍵特性:
行列級別更新(Upsert)
支持 Merge on Read,在讀時進行數(shù)據(jù)合并,提高寫的吞吐
支持對象存儲,不需要文件語義
并發(fā)寫入,可以支持多個流、批作業(yè)對同一分區(qū)進行更新
分布式元數(shù)據(jù)管理,提高元數(shù)據(jù)的可擴展性
Schema 演進,可以對表的列進行增刪
通過 LakeSoul 來搭建機器學習樣本庫的總體設(shè)計是,使用 Upsert 代替 Join,將多組特征、標簽分別通過流、批的方式寫入同一個表,通過 LakeSoul 獲得高并發(fā)的寫入、高讀寫吞吐能力。我們來詳細講解具體的實現(xiàn)流程和關(guān)鍵原理。
2.1 主鍵設(shè)計
為了能夠支持高效的 Merge,LakeSoul 提供了設(shè)置主鍵的功能。對于一個表中的主鍵列,會根據(jù)哈希分桶數(shù),均勻的分片到指定個數(shù)的哈希桶中,而在每個桶內(nèi),對主鍵列進行排序后寫入。在這種情況下,讀時只需要對若干個增量文件按有序主鍵進行歸并,就可以得到 Merge 結(jié)果。
在推薦系統(tǒng)樣本庫的場景,通常在線請求時會生成一個請求 ID,所有特征、標簽的回流,都會帶上這個 ID。實際在離線 Join 的場景,也是通過這個 ID 作為 Join Key。因此我們可以使用請求 ID 作為 LakeSoul 樣本表的主鍵,并且以小時作為 Range 分區(qū)。我們可以通過下面的方式在 Spark 作業(yè)中創(chuàng)建 LakeSoul 表:
```scala
LakeSoulTable.createTable(data, path).shortTableName("sample").hashPartitions("request_id").hashBucketNum(100).rangePartitions("hour").create()
```
這樣就創(chuàng)建了一個以 `request_id` 作為主鍵,并且哈希分桶為 100,以小時作為 Range 分區(qū)的表。
2.2 數(shù)據(jù)寫入和并發(fā)更新
由于特征、標簽分別來自不同的流和批,我們需要多個流或批的作業(yè)并發(fā)對 sample 表進行更新。每一份數(shù)據(jù)都需要有 request_id 列和 hour 列 ,在執(zhí)行 LakeSoulTable.upsert 時,LakeSoul Spark Writer 會自動根據(jù) request_id 進行 repartition 分桶,并根據(jù) hour 列寫入對應(yīng)分區(qū)的對應(yīng)桶中,一批寫入數(shù)據(jù)可以存在多個 Range 分區(qū)的值。
LakeSoul 支持多流并發(fā)的 Upsert,可以很好地滿足樣本庫多流實時更新的需求。例如有兩個流,分別是特征回流和標簽回流數(shù)據(jù),只需要執(zhí)行 Upsert,就能夠?qū)崟r更新到樣本庫中:
```scala
// 讀取特征回流,更新樣本表
val featureStreamDF = spark.readStream...
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(featureStreamDF)
// 讀取標簽回流,更新樣本表
val labelStreamDF = spark.readStream...
val lakeSoulTable = LakeSoulTable.forPath(tablePath)
lakeSoulTable.upsert(labelStreamDF)
```
由于寫入時并不需要進行 Merge 操作,只需要將當前的增量數(shù)據(jù)寫入即可,因此寫入可以有很高的吞吐,實際測試在云商對象存儲上可以達到每個 core 寫入速率 30MB/s 以上,即 30 個 Spark Executor,每個分配 1 個 CPU Core,就可以達到 1GB 的寫入速度。
2.3 Merge On Read
LakeSoul 對于 Upsert 的數(shù)據(jù),在讀取時會自動進行 Merge。因此讀取的接口和讀一個表沒有區(qū)別:
```scala
vallakeSoulTable = LakeSoulTable.forPath(path)lakeSoulTable.toDF.select("*").show()
```
也可以通過 SQL Select 語句來查詢。在底層實現(xiàn)中,對于每個哈希桶,由于主鍵已經(jīng)有序,只需要進行多個有序表的外部歸并,示意如下:

可以看到,樣本流、標簽流都分別執(zhí)行了多次 Upsert,而某個時刻有一個讀作業(yè)讀取時,LakeSoul 會根據(jù)元數(shù)據(jù)服務(wù)中的更新記錄,自動找到增量更新的文件,并執(zhí)行有序外部歸并。LakeSoul 實現(xiàn)了對 Parquet 文件的有序歸并,并通過優(yōu)化的小頂堆設(shè)計來提升多路有序歸并的性能。
2.4 數(shù)據(jù)回溯(backfill)
由于 LakeSoul 支持任意 Range 分區(qū)數(shù)據(jù)的 Upsert,因此回溯和流式寫入沒有區(qū)別,將要插入的數(shù)據(jù)準備好之后,通過 Spark 執(zhí)行 Upsert 就可以更新歷史數(shù)據(jù),LakeSoul 會自動識別 Schema 的變化,更新表的元信息,實現(xiàn) Schema 演進。LakeSoul 提供了完整的數(shù)倉表的存儲功能,每一個歷史分區(qū)都是可以查詢和更新的,相對于 Flink 進行窗口 Join 的方案,解決了中間狀態(tài)不可見的問題,可以很方便的實現(xiàn)歷史數(shù)據(jù)的大批量更新回溯。
結(jié)束語
本文介紹了 LakeSoul 在一個典型的流批一體場景:構(gòu)建推薦系統(tǒng)機器學習樣本庫 中的應(yīng)用。通過 LakeSoul 流批一體、Merge on Read 的能力,能夠支撐大規(guī)模、大窗口的多流實時更新,解決了 Hive 數(shù)倉大批 Join 和 Flink 窗口 Join 等方案存在的一些問題。
官方資料GitHub:
LakeSou: https://github.com/meta-soul/LakeSoul
MetaSpore: https://github.com/meta-soul/MetaSpore
官網(wǎng):元靈數(shù)智-云原生一站式數(shù)據(jù)智能平臺-北京數(shù)元靈科技有限公司 (dmetasoul.com)
官方交流群:微信群:關(guān)注公眾號,點擊“了解我們-用戶交流”或掃描下方二維碼
Slack:https://join.slack.com/t/dmetasoul-user/shared_invite/zt-1681xagg3-4YouyW0Y4wfhPnvji~OwFg