Structured Streaming 編程指南

鑒于有些詞沒有很好的中文對應(yīng),保留英文原詞 比如?exactly-once

概覽

--快速實例

--編程模型

----基本概念

----處理即時(event_time)和遲到數(shù)據(jù)

----容錯語法

使用DataSets(暫時翻譯為數(shù)據(jù)集,簡寫ds) 和 DataFrame(暫時翻譯數(shù)據(jù)幀,簡寫df)

--創(chuàng)建數(shù)據(jù)幀和數(shù)據(jù)集

----輸入源

----表概要推斷和流式數(shù)據(jù)集和數(shù)據(jù)幀的分區(qū)


概覽

? ?結(jié)構(gòu)化數(shù)據(jù)流(structed streaming,后續(xù)簡寫為ss)是一個基于spark sql引擎的 ,可變的,容錯的流式處理引擎。你可以用處理靜態(tài)數(shù)據(jù)批次計算的方式處理流式數(shù)據(jù)。spark SQL引擎 將會逐漸和持續(xù)的運行struced streaming,并且隨著數(shù)據(jù)持續(xù)到達更新最后的結(jié)果。你可以使用數(shù)據(jù)集或者數(shù)據(jù)幀提供的接口(scala,java,python 或者R 都可以),實現(xiàn)流式的聚合,即時窗口,流式到批式的聚合等。計算的流程在相同的Spark SQL引擎上執(zhí)行。最終系統(tǒng)通過checkpointing 和WAL(write ahead log ,預(yù)寫日志)確保end-to end,exactly-once,容錯。

快速示例

比方說,你現(xiàn)在需要 統(tǒng)計單詞數(shù)量,這些單詞是從一個監(jiān)聽tcp socker的服務(wù)上輸入的。讓我們來看一下你應(yīng)該使用ss如何實現(xiàn)。這塊比較簡單先不翻譯了。

編程模型

ss關(guān)鍵的思路是像處理一個表的數(shù)據(jù)一樣處理實時的數(shù)據(jù)流。然而這個表的數(shù)據(jù)是持續(xù)增加的。這就導(dǎo)致了一個新的處理模型,非常類似于目前的批處理模型。你可以像在靜態(tài)表的批量查詢一樣,實現(xiàn)你的流式計算就。并且spark 運行這種查詢是在在一個無限的輸入表中以一種遞增的方式查詢。讓我們一起更深入的理解這個模型。

基礎(chǔ)概念

把輸入的數(shù)據(jù)流當(dāng)做 Input Table(輸入表) ,到達數(shù)據(jù)流的每個數(shù)據(jù)項就像是一個新增到這個input table 新的一行。


對于輸入的查詢將會產(chǎn)生一個 Result Table (結(jié)果表)。每隔觸發(fā)器時間(比方說,每隔1s),新的行會被追加到輸入表中,這些新的行最終會更新結(jié)果表。無論任何時候結(jié)果表被更新,我們需要把那些變化的結(jié)果行寫到一個外部的存儲。


輸出 被定義為被寫入到外部存儲的內(nèi)容。輸出有多種不同的模式。

complete mode-全部更新的結(jié)果表將會被寫入到外部存儲。這個取決于存儲的連接決定如何處理全部表的寫入。

append mode-只有從上一次觸發(fā)后結(jié)果表中新增的行才會被寫入到外部存儲。適用于結(jié)果表中的數(shù)據(jù)不會改變的查詢場景。

update mode-只有從上一次觸發(fā)后結(jié)果表中被更新的行才會寫入到外部存儲(從2.1.1開始支持)。需要注意的是,這種模式不同于complete mode的地方是,這種模式只會輸出從上次觸發(fā)后改變的行。如果查詢不包含聚合操作,則等價于append mode。

注意:這些模式適用于一些特征的查詢場景。后續(xù)我們會討論。

為了演示一下這個模式的使用,讓我們一起理解一下上文中的快速示例。第一個lines 的dataFrame 就是輸入表,最后的wordCounts 的dataFrame 就是結(jié)果表。需要指出的是,從流式lines DataFrame到 wordCounts 的DataFrame 的查詢將會和靜態(tài)DataFrame的一樣。然而,當(dāng)查詢啟動的時候,Spark 將會持續(xù)的檢查來自socket連接的新的數(shù)據(jù)。如果有新的數(shù)據(jù),spark 將會運行一個"遞增"的查詢,這個查詢將會把以前的運行中的統(tǒng)計值和新到的數(shù)據(jù)進行一個計算,并更新統(tǒng)計值,就像下面演示的一樣。


這個模型明顯與其他的流式處理引擎不同。許多流式系統(tǒng)需要用戶自己保持一個運行時的聚合,因而必須考慮容錯,和數(shù)據(jù)的一致性(至少一次 at-least-once,至多一次 at-most-once,絕對一次 exactl-once)。在這個模型中,spark 負責(zé)當(dāng)新數(shù)據(jù)到達時,更新結(jié)果表,因而用戶不需要在考慮這些問題了。舉個例子,我們來看看 這個模型是如何處理即刻數(shù)據(jù)以及遲到數(shù)據(jù)的。

處理即刻和遲到數(shù)據(jù)

event-time 就是數(shù)據(jù)本身的時間。對很多應(yīng)用來說,你想要在event-time這個層面進行操作。舉個例子,如果你想要獲取每分鐘lot設(shè)備產(chǎn)生的事件數(shù)量,你可能需要使用的是數(shù)據(jù)產(chǎn)生的時間(數(shù)據(jù)中的即刻時間,好拗口),而不是spark 收到這些數(shù)據(jù)的時間。這個即刻時間在這個模型中天生支持-每個來自設(shè)備的事件是表中的一行,而event-time 是這行數(shù)據(jù)的一列。這樣就使基于窗口的聚合成為可能,由于把時間這個列作為一種特殊的grouping 和聚合。每個時間窗口就是一個group,并且每行數(shù)據(jù)都可以屬于多個組(或窗口)。因而,event-time-window-based 的聚合的查詢可以定義為靜態(tài)的數(shù)據(jù)集也可以是數(shù)據(jù)流,使得用戶的生活更簡單(原文就這么說的)。

另外 這個模型天生處理那些相比即刻數(shù)據(jù)遲到的數(shù)據(jù)。由于Spark 一直會更新結(jié)果表,它可以在有數(shù)據(jù)遲到時完全的控制更新老的聚合結(jié)果,并且為了限制中間狀態(tài)數(shù)據(jù)的大小,可以清除老的聚合數(shù)據(jù)。從spark 2.1 開始,我們支持wartermarking (水印),水印用于標(biāo)識 遲到數(shù)據(jù)的一個門限值,并且允許引擎以此來清除老狀態(tài)的數(shù)據(jù)。 在窗口操作的章節(jié)將會有更詳盡的介紹。

容錯語義

傳輸end-to-end exactly-once 語義是structed streaming設(shè)計的一個關(guān)鍵目標(biāo)。為了達到這個目標(biāo),我們設(shè)計了structed Streaming 輸入源,輸出和執(zhí)行引擎用來可靠的追蹤準(zhǔn)確的處理。因而它可以通過重啟或者重新運行來處理任何失敗。每個streaming的輸入源 都被假定有一個偏移量(類似kafka的偏移量,或者kinesis的序列號)用于追蹤。

API 使用DataSets 和 DataFrames

從spark 2.0,DataFrames 和Datasets 可以代表靜態(tài)的,有限的數(shù)據(jù),也可以是流式的無限的數(shù)據(jù)。和靜態(tài)的df,ds 類似的,你可以使用通用的入口 SparkSession從流式的輸入源中 去創(chuàng)建流式的df和ds,并且像在使用靜態(tài)的ds,df一樣。如果你不熟悉df和ds,強烈建議你去熟悉一下,可以使用這個DataFrame/Dataset Programming Guide.

創(chuàng)建流式的ds和流式的df

流式的df 可以通過DataStreamReader 的接口 SparkSession.readStream().如果是R語言,使用read.stream() 方法。與創(chuàng)建靜態(tài)的df相似,你可以定義一個輸入的詳情,數(shù)據(jù)的格式,綱要和選項等。

輸入源

在spark2.0中,有許多支持的數(shù)據(jù)源。

File Source - 讀取一個文件夾下的文件作為數(shù)據(jù)流 支持的文件格式有 text,csv,json,parquet。參考DataStreamReader的文檔獲得最新的支持列表,和其他文件格式的一些選項。需要指出的是,所有的文件必須自動放置在給定的目錄下,

Kafka source-從kafka獲取數(shù)據(jù),支持kafka 0.10.0和更高的版本。更詳細的細節(jié)參考.Kafka Integration Guide

Socket source(for testing) -從Socketde 連接中讀取UTF-8格式的數(shù)據(jù)。堅挺的服務(wù)端的socket在dirver端。注意這個僅僅用于測試,不支持end-to-end的容錯保證。

一些來源不能保證容錯因為他們不能在失敗的時候使用checkpointed 的偏移量來保證數(shù)據(jù)可以重放。查看前面的章節(jié)fault-tolerance semantics。下面是所有spark的輸入源的細節(jié)



輸入源? ? 選項? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?容錯? ? ? ? ? ? ? ? ? ? ? ? ? ?注意


文件源? ?path:輸入的文件目錄,對所有的格式都是一樣的。? ? yes? ? 支持glob paths,但是不支持多個逗號分隔的文件夾。

socket 源 :host:必填

? ? ? ? ? ? ? ? ? ? ? port:必填

kafka: 參考Kafka Integration Guide.

示例:

valspark:SparkSession=...// Read text from socketvalsocketDF=spark.readStream

.format("socket")

.option("host","localhost")

.option("port",9999).load()

socketDF.isStreaming// Returns True for DataFrames that have streaming sourcessocketDF.printSchema// Read all the csv files written atomically in a directoryvaluserSchema=newStructType().add("name","string").add("age","integer")

val csvDF=spark.readStream.option("sep",";")

.schema(userSchema)// Specify schema of the csv files

.csv("/path/to/directory")// Equivalent to format("csv").load("/path/to/directory")

語義推斷和流式df/ds的分區(qū)

默認,基于文件源的strucete streaming 需要指定(schema),而不是自動依賴spark。這個限制使得使用streaming query時候的schema保持一致,甚至當(dāng)發(fā)生失敗的情況。對于點對點的使用情況,你可以通過設(shè)置spark.sql.streaming.schemaInference 為true的方式開啟語義推斷。

當(dāng)以/key=value/命名的子目錄準(zhǔn)備好的的時候,分區(qū)發(fā)現(xiàn)開始執(zhí)行,并且自動的遞歸進這些目錄。如果這些列在用戶提供的表頭中出現(xiàn),這些數(shù)據(jù)將會被spark根據(jù)文件讀入的路徑進行填充。?當(dāng)查詢開始的時候決定分區(qū)的目錄必須是準(zhǔn)備好的并且是靜態(tài)的。比方說,?當(dāng)前是 /data/year=2015/的時候,添加/data/year=2016/是ok的。但是改變分區(qū)的列是無效的,比如創(chuàng)建目錄?/data/date=2016-04-17/

基本操作:選擇,設(shè)計,聚合




實時的窗口操作

使用structed? Streaming 在滑動窗口上的聚合操作是直截了當(dāng)?shù)?,和分組的聚合很相似。在一個分組的聚合中,對于每個用戶指定的分組列中特定值,這些聚合的值都會被保持?;诖翱诘木酆希總€窗口的聚合值隨著每行數(shù)據(jù)的進入都會保持著。通過示例來學(xué)習(xí)一下。

假設(shè)快速示例改變了,現(xiàn)在流包含產(chǎn)生時間的多行數(shù)據(jù),這次我們不計算單詞的數(shù)量,我們想要統(tǒng)計10分鐘窗口的單詞數(shù)量,并且每五分鐘更新一次。





最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容