本文介紹了以 Pulsar 做流數(shù)據(jù)平臺(tái),使用 Spark 進(jìn)行批流一體數(shù)據(jù)處理的編程實(shí)踐。
(閱讀本文需要約 15 分鐘)
批流現(xiàn)狀
在大規(guī)模并行數(shù)據(jù)分析領(lǐng)域,AMPLab 的『One stack to rule them all』提出用 Apache Spark 作為統(tǒng)一的引擎支持批處理、流處理、交互查詢和機(jī)器學(xué)習(xí)等常見的數(shù)據(jù)處理場(chǎng)景。 2017 年 7 月,Spark 2.2.0 版本正式推出的 Spark structured streaming 將 Spark SQL 作為流處理、批處理底層統(tǒng)一的執(zhí)行引擎,提供對(duì)無界表(無邊界的源源不斷到達(dá)的流數(shù)據(jù))和有界表(靜態(tài)歷史數(shù)據(jù))的優(yōu)化查詢,而向用戶提供 Dataset/DataFrame API 對(duì)批流數(shù)據(jù)聯(lián)合處理,進(jìn)一步模糊了批流數(shù)據(jù)處理的邊界。
另一方面,Apache Flink 在 2016 年左右進(jìn)入大眾視野,憑借其當(dāng)時(shí)更優(yōu)的流處理引擎,原生的 Watermark 支持『Exaclty Once』的數(shù)據(jù)一致性保證,和批流一體計(jì)算等各種場(chǎng)景的支持,成為 Spark 的有力挑戰(zhàn)者。無論是使用 Spark 還是 Flink,用戶真正關(guān)心的是如何更好地使用數(shù)據(jù),更快地挖掘數(shù)據(jù)中的價(jià)值,流數(shù)據(jù)和靜態(tài)數(shù)據(jù)不再是分離的個(gè)體,而是一份數(shù)據(jù)的兩種不同表征方式。
然而在實(shí)踐中,構(gòu)建一個(gè)批流一體的數(shù)據(jù)平臺(tái)并不只是計(jì)算引擎層的任務(wù)。因?yàn)樵趥鹘y(tǒng)解決方案中,近實(shí)時(shí)的流、事件數(shù)據(jù)通常采用消息隊(duì)列(例如 RabbitMQ)、實(shí)時(shí)數(shù)據(jù)管道(例如 Apache Kafka)存儲(chǔ),而批處理所需要的靜態(tài)數(shù)據(jù)通常使用文件系統(tǒng)、對(duì)象存儲(chǔ)進(jìn)行保存。這就意味著,一方面,在數(shù)據(jù)分析過程中,為了保證結(jié)果的正確性和實(shí)時(shí)性,需要對(duì)分別存儲(chǔ)在兩類系統(tǒng)中數(shù)據(jù)進(jìn)行聯(lián)合查詢;另一方面,在運(yùn)維過程中,需要定期將流數(shù)據(jù)轉(zhuǎn)存到文件/對(duì)象存儲(chǔ)中,通過維持流形式的數(shù)據(jù)總量在閾值之下來保證消息隊(duì)列、數(shù)據(jù)管道的性能(因?yàn)檫@類系統(tǒng)的以分區(qū)為主的架構(gòu)設(shè)計(jì)緊耦合了消息服務(wù)和消息存儲(chǔ),而且多數(shù)都太過依賴文件系統(tǒng),隨著數(shù)據(jù)量的增加,系統(tǒng)性能會(huì)急劇下降),但人為的數(shù)據(jù)搬遷不但會(huì)提升系統(tǒng)的運(yùn)維成本,而且搬遷過程中的數(shù)據(jù)清洗、讀取、加載也是對(duì)集群資源的巨大消耗。
與此同時(shí),從 Mesos 和 YARN 的流行、Docker 的興起到現(xiàn)在的 Kubernetes 被廣泛采用,整個(gè)基礎(chǔ)架構(gòu)正在全面地向容器化方向發(fā)展,傳統(tǒng)緊耦合消息服務(wù)和消息計(jì)算的架構(gòu)并不能很好地適應(yīng)容器化的架構(gòu)。以 Kafka 為例,其以分區(qū)為中心的架構(gòu)緊耦合了消息服務(wù)和消息存儲(chǔ)。Kafka 的分區(qū)與一臺(tái)或者一組物理機(jī)強(qiáng)綁定,這帶來的問題是在機(jī)器失效或集群擴(kuò)容中,需要進(jìn)行昂貴且漫長(zhǎng)的分區(qū)數(shù)據(jù)重新均衡的過程;其以分區(qū)為粒度的存儲(chǔ)設(shè)計(jì)也不能很好利用已有的云存儲(chǔ)資源;此外,過于簡(jiǎn)單的設(shè)計(jì)導(dǎo)致其為了進(jìn)行容器化需要解決多租戶管理、IO 隔離等方面很多架構(gòu)上的缺陷。
Pulsar 簡(jiǎn)介
Apache Pulsar 是一個(gè)多租戶、高性能的企業(yè)級(jí)消息發(fā)布訂閱系統(tǒng),最初由 Yahoo 研發(fā), 2018 年 9 月從 Apache 孵化器畢業(yè),成為 Apache 基金會(huì)的頂級(jí)開源項(xiàng)目。Pulsar 基于發(fā)布訂閱模式(pub-sub)構(gòu)建,生產(chǎn)者(producer)發(fā)布消息(message)到主題(topic),消費(fèi)者可以訂閱主題,處理收到的消息,并在消息處理完成后發(fā)送確認(rèn)(Ack)。Pulsar 提供了四種訂閱類型,它們可以共存在同一個(gè)主題上,以訂閱名進(jìn)行區(qū)分:
- 獨(dú)享(exclusive)訂閱——一個(gè)訂閱名下同時(shí)只能有一個(gè)消費(fèi)者。
- 共享(shared)訂閱——可以由多個(gè)消費(fèi)者訂閱,每個(gè)消費(fèi)者接收其中一部分消息。
- 失效備援(failover)訂閱——允許多個(gè)消費(fèi)者連接到同一個(gè)主題,但只有一個(gè)消費(fèi)者能夠接收消息。只有在當(dāng)前消費(fèi)者發(fā)生失效時(shí),其他消費(fèi)者才開始接收消息。
- 鍵劃分(key-shared)訂閱(測(cè)試版功能)——多個(gè)消費(fèi)者連接到同一主題,相同 Key 總會(huì)發(fā)送給同一個(gè)消費(fèi)者。
Pulsar 從設(shè)計(jì)之初就支持多租戶(multi-tenancy)的概念,租戶(tenant)可以橫跨多個(gè)集群(clusters),每個(gè)租戶都有其認(rèn)證和鑒權(quán)方式,租戶也是存儲(chǔ)配額、消息生存時(shí)間(TTL)和隔離策略的管理單元。Pulsar 多租戶的特性可以在 topic URL 上得到充分體現(xiàn),其結(jié)構(gòu)是persistent://tenant/namespace/topic。命名空間(namespace)是 Pulsar 中最基本的管理單元,我們可以設(shè)置權(quán)限、調(diào)整復(fù)制選項(xiàng)、管理跨集群的數(shù)據(jù)復(fù)制、控制消息的過期時(shí)間或執(zhí)行其他關(guān)鍵任務(wù)。
Pulsar 獨(dú)特架構(gòu)
Pulsar 和其他消息系統(tǒng)的最根本區(qū)別在于其采用計(jì)算和存儲(chǔ)分離的分層架構(gòu)。Pulsar 集群由兩層組成:無狀態(tài)服務(wù)層,它由一組接受和傳遞消息的 broker 組成;分布式存儲(chǔ)層,它由一組名為 bookies 的 Apache BookKeeper 存儲(chǔ)節(jié)點(diǎn)組成,具備高可用、強(qiáng)一致、低延時(shí)的特點(diǎn)。
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-nKUt0340-1570764602625)(media/pulsar-spark/pulsar-partition-log-segment.png)]
和 Kafka 一樣,Pulsar 也是基于主題分區(qū)(Topic partition)的邏輯概念進(jìn)行主題數(shù)據(jù)的存儲(chǔ)。不同的是,Kafka 的物理存儲(chǔ)也是以分區(qū)為單位,每個(gè) partition 必須作為一個(gè)整體(一個(gè)目錄)被存儲(chǔ)在一個(gè) broker 上,而 Pulsar 的每個(gè)主題分區(qū)本質(zhì)上都是存儲(chǔ)在 BookKeeper 上的分布式日志,每個(gè)日志又被分成分段(Segment)。每個(gè) Segment 作為 BookKeeper 上的一個(gè) Ledger,均勻分布并存儲(chǔ)在多個(gè) bookie 中。存儲(chǔ)分層的架構(gòu)和以 Segment 為中心的分片存儲(chǔ)是 Pulsar 的兩個(gè)關(guān)鍵設(shè)計(jì)理念。以此為基礎(chǔ)為 Pulsar 提供了很多重要的優(yōu)勢(shì):無限制的主題分區(qū)、存儲(chǔ)即時(shí)擴(kuò)展,無需數(shù)據(jù)遷移 、無縫 broker 故障恢復(fù)、無縫集群擴(kuò)展、無縫的存儲(chǔ)(Bookie)故障恢復(fù)和獨(dú)立的可擴(kuò)展性。
消息系統(tǒng)解耦了生產(chǎn)者與消費(fèi)者,但實(shí)際的消息本質(zhì)上仍是有結(jié)構(gòu)的,因此生產(chǎn)者和消費(fèi)者之間需要一種協(xié)調(diào)機(jī)制,達(dá)到生產(chǎn)、消費(fèi)過程中對(duì)消息結(jié)構(gòu)的共識(shí),以達(dá)到類型安全的目的。Pulsar 有內(nèi)置的 Schema 注冊(cè)方式在消息系統(tǒng)端提供傳輸消息類型約定的方式,客戶端可以通過上傳 Schema 來約定主題級(jí)別的消息類型信息,而由 Pulsar 負(fù)責(zé)消息的類型檢查和有類型消息的自動(dòng)序列化、反序列化,從而降低多應(yīng)用間的消息解析代碼反復(fù)開發(fā)、維護(hù)的成本。當(dāng)然,Schema 定義與類型安全是一種可選的機(jī)制,并不會(huì)給非類型化消息的發(fā)布、消費(fèi)產(chǎn)生任何性能開銷。
在 Spark 中實(shí)現(xiàn)對(duì) Pulsar 數(shù)據(jù)的讀寫——Spark Pulsar Connector
自 Spark 2.2 版本 Structured Streaming 正式發(fā)布,Spark 只保留了 SparkSession 作為主程序入口,你只需編寫 DataSet/DataFrame API 程序,以聲明形式對(duì)數(shù)據(jù)的操作,而將具體的查詢優(yōu)化與批流處理執(zhí)行的細(xì)節(jié)交由 Spark SQL 引擎進(jìn)行處理。對(duì)于一個(gè)數(shù)據(jù)處理作業(yè),需要定義 DataFrame 的產(chǎn)生、變換和寫出三個(gè)部分,而將 Pulsar 作為流數(shù)據(jù)平臺(tái)與 Spark 進(jìn)行集成正是要解決如何從 Pulsar 中讀取數(shù)據(jù)(Source)和如何向 Pulsar 寫出運(yùn)算結(jié)果(Sink)兩個(gè)問題。
為了實(shí)現(xiàn)以 Pulsar 為源讀取批流數(shù)據(jù)與支持批流數(shù)據(jù)向 Pulsar 的寫入,我們構(gòu)建了 Spark Pulsar Connector。
對(duì) Structured Streaming 的支持
[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機(jī)制,建議將圖片保存下來直接上傳(img-qMYfG5Hn-1570764602651)(media/pulsar-spark/ssc.png)]
上圖展示了 Structured Streaming(以下簡(jiǎn)稱 SS )的主要組件:
- 輸入和輸出——為了提供細(xì)粒度的容錯(cuò),SS 要求輸入數(shù)據(jù)源(Source)是可重放(replayable)的;為了提供端到端的 Exactly-Once 的語義,需要輸出(Sink)支持冪等寫出(一條消息被多次寫入與一次寫入效果一致,可由 DBMS、KV 系統(tǒng)通過鍵約束的方式支持)。
- API——用戶通過編寫 Spark SQL 的 batch API(SQL 或 DataFrame)指定對(duì)一個(gè)或多個(gè)流、表的查詢,并定義一個(gè)輸出表保存所有的輸出結(jié)果,而引擎內(nèi)部決定如何將結(jié)果增量地寫到 Sink 中。為了支持流處理,SS 在原有的 Spark SQL API 上添加了一些接口:
- 觸發(fā)器(Trigger)——控制引擎觸發(fā)流處理執(zhí)行、在Sink中更新結(jié)果的頻率。
- 水印機(jī)制(Watermark policy)——用戶通過指定字段做 event time,來決定對(duì)晚到數(shù)據(jù)的處理。
- 有狀態(tài)算子(stateful operator)——用戶可以根據(jù) Key 跟蹤和更新算子內(nèi)部的可變狀態(tài),完成復(fù)雜的業(yè)務(wù)需求(例如,基于會(huì)話的窗口)。
- 執(zhí)行層——當(dāng)收到一個(gè)查詢時(shí),SS 決定它的增量執(zhí)行方式,進(jìn)行優(yōu)化、并開始執(zhí)行。SS 有兩種可選的執(zhí)行模型:
- Microbatch model(微批處理模式)——默認(rèn)的執(zhí)行方式,與 Spark Streaming 的 DStream 類似,將流切成 micro batch,對(duì)每個(gè) batch 分別處理。這種模式支持動(dòng)態(tài)負(fù)載均衡、故障恢復(fù)等機(jī)制,適合將吞吐率作為主要性能指標(biāo)的應(yīng)用。
- Continuous mode(持續(xù)模式)——在集群上啟動(dòng)長(zhǎng)時(shí)間運(yùn)行的算子,適合處理較為簡(jiǎn)單、延遲敏感類應(yīng)用。
- Log 和 State Store —— SS 利用兩種持久化存儲(chǔ)來提供容錯(cuò)保障:一個(gè) Write-ahead-Log(WAL),記錄被成功消費(fèi)且持久化寫出的每個(gè)數(shù)據(jù)源中的位置;一個(gè)大規(guī)模的 state store, 存儲(chǔ)長(zhǎng)期運(yùn)行的聚集算子內(nèi)部的狀態(tài)快照。當(dāng)故障發(fā)生時(shí),SS 會(huì)根據(jù)快照的位置,通過重放之后的消息完成流處理狀態(tài)的恢復(fù)。
具體到源碼層面,Source 接口定義了可重放數(shù)據(jù)源需要提供的功能。
trait Source {
def schema: StructType
def getOffset: Option[Offset]
def getBatch(start: Option[Offset], end: Offset): DataFrame
def commit(end: Offset): Unit
def stop(): Unit
}
trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
以 microbatch 執(zhí)行模式為例:
- 在每個(gè) microbatch 的最開始,SS 會(huì)向 source 詢問當(dāng)前的最新進(jìn)度(
getOffset),并將其持久化到 WAL 中。 - 隨后,source 根據(jù) SS 提供的
startend偏移量,提供區(qū)間范圍的數(shù)據(jù)(getBatch)。 - SS 觸發(fā)計(jì)算邏輯的優(yōu)化和編譯,把計(jì)算結(jié)果寫出給 sink(addBatch),這時(shí)才觸發(fā)實(shí)際的取數(shù)據(jù)操作以及計(jì)算過程。
- 在數(shù)據(jù)完整寫出到 sink 后,SS 通知 source 可以廢棄數(shù)據(jù)(
commit),并將成功執(zhí)行的batchId寫入內(nèi)部維護(hù)的 commitLog 中。
具體到 Pulsar 的 connector 實(shí)現(xiàn)中:
- 在所有批次開始執(zhí)行前,SS 會(huì)調(diào)用 schema 方法返回消息的結(jié)構(gòu)信息,在 schema 方法內(nèi)部,我們從 Pulsar 的 Schema Registry 提取出所有主題的 Schema,并進(jìn)行一致性檢查。
- 隨后,我們?yōu)槊總€(gè)主題分區(qū)創(chuàng)建一個(gè)消費(fèi)者,按照 (start, end] 返回主題分區(qū)中的數(shù)據(jù)。
- 當(dāng)收到 SS 的 commit 通知時(shí),通過
topics中的resetCursor向 Pulsar 標(biāo)志消息消費(fèi)的完成。Sink 中構(gòu)建的生產(chǎn)者則將 addBatch 中獲取的實(shí)際數(shù)據(jù)以消息形式追加寫入相應(yīng)的主題中。
對(duì)批處理作業(yè)的支持
在某個(gè)時(shí)間點(diǎn)執(zhí)行的批作業(yè),可以看作是對(duì) Pulsar 平臺(tái)中的流數(shù)據(jù)在一個(gè)時(shí)間點(diǎn)的快照進(jìn)行的數(shù)據(jù)分析。Spark 對(duì)歷史數(shù)據(jù)的查詢是以 Relation 為單位,Spark Pulsar Connector 提供 createRelation 方法的實(shí)現(xiàn)根據(jù)用戶指定的多個(gè)主題分區(qū)構(gòu)建表,并返回包含 Schema 信息的 DataSet。在查詢計(jì)劃階段,Connector 的功能分成兩步:首先,根據(jù)用戶提供的一個(gè)或多個(gè)主題,在 Pulsar Schema Registry 中查找主題 Schema,并檢查多個(gè)主題 Schema 的一致性;其次,將用戶指定的所有主題分區(qū)進(jìn)行任務(wù)劃分(Partition),得到的分片即是 Spark source task 的執(zhí)行粒度。
Pulsar 提供了兩層的接口對(duì)其中的數(shù)據(jù)進(jìn)行訪問,基于主題分區(qū)的 Consumer/Reader 接口,以傳統(tǒng)消息接收為語義的順序數(shù)據(jù)讀??;Segment 級(jí)的讀接口,提供對(duì) Segment 數(shù)據(jù)的直接讀取。因此,相應(yīng)地從 Pulsar 讀數(shù)據(jù)執(zhí)行批作業(yè)可以分成兩種粒度(即讀取數(shù)據(jù)的并行度)進(jìn)行:以主題分區(qū)為粒度(每個(gè)主題分區(qū)作為一個(gè)分片);以 Segment 為粒度(將一個(gè)主題分區(qū)的多個(gè) Segment 組織成一個(gè)分片,因此一個(gè)主題分區(qū)會(huì)有多個(gè)對(duì)應(yīng)的分片)。你可以按照批作業(yè)的并行度需求和可分配計(jì)算資源選擇合適的消息讀取的并行粒度。另一方面,將批作業(yè)的執(zhí)行存儲(chǔ)到 Pulsar 也很直觀,你只需指定寫入的主題和消息路由規(guī)則(RoundRobin 或者按 Key 劃分),在 Sink task 中創(chuàng)建的每個(gè)生產(chǎn)者會(huì)將待寫出的消息送至對(duì)應(yīng)的主題分區(qū)。
如何使用 Spark Pulsar Connector
- 根據(jù)一個(gè)或多個(gè)主題創(chuàng)建流處理 Source。
val df = spark
.readStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topicsPattern", "topic.*") // Subscribe to a pattern
// .option("topics", "topic1,topic2") // Subscribe to multiple topics
// .option("topic", "topic1"). //subscribe to a single topic
.option("startingOffsets", startingOffsets)
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
- 構(gòu)建批處理 Source。
val df = spark
.read
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("admin.url", "http://localhost:8080")
.option("topicsPattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
- 使用數(shù)據(jù)中本身的 topic 字段向多個(gè)主題進(jìn)行持續(xù) Sink。
val ds = df
.selectExpr("topic", "CAST(__key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.start()
- 將批處理結(jié)果寫回 Pulsar。
df.selectExpr("CAST(__key AS STRING)", "CAST(value AS STRING)")
.write
.format("pulsar")
.option("service.url", "pulsar://localhost:6650")
.option("topic", "topic1")
.save()
注意
由于 Spark Pulsar Connector 支持結(jié)構(gòu)化消息的消費(fèi)和寫入,為了避免消息負(fù)載中字段和消息元數(shù)據(jù)(event time、publish time、key 和 messageId)的潛在命名沖突,消息元數(shù)據(jù)字段在 Spark schema 中以雙下劃線做為前綴(例如,__eventTime)。
參考資料