Spark Streaming

Spark Streaming是核心Spark API的一個擴展,它并不會像Storm那樣一次一個地處理數據流,而是在處理前按時間間隔預先將其切分為一段一段的批處理作業(yè)。Spark針對持續(xù)性數據流的抽象稱為DStream(DiscretizedStream),一個DStream是一個微批處理(micro-batching)的RDD(彈性分布式數據集);而RDD則是一種分布式數據集,能夠以兩種方式并行運作,分別是任意函數和滑動窗口數據的轉換。

圖片

Storm, Flink, Spark Streaming的對比圖

Storm, Flink, Spark Streaming的選擇

如果你想要的是一個允許增量計算的高速事件處理系統(tǒng),Storm會是最佳選擇。

如果你必須有狀態(tài)的計算,恰好一次的遞送,并且不介意高延遲的話,那么可以考慮Spark Streaming,特別如果你還計劃圖形操作、機器學習或者訪問SQL的話,Apache Spark的stack允許你將一些library與數據流相結合(Spark SQL,Mllib,GraphX),它們會提供便捷的一體化編程模型。尤其是數據流算法(例如:K均值流媒體)允許Spark實時決策的促進。

Flink支持增量迭代,具有對迭代自動優(yōu)化的功能,在迭代式數據處理上,比Spark更突出,Flink基于每個事件一行一行地流式處理,真正的流式計算,流式計算跟Storm性能差不多,支持毫秒級計算,而Spark則只能支持秒級計算。

Spark Streaming 簡介

Spark Streaming 是Spark 核心API的一個擴展,可以實現高吞吐量的、具備容錯機制的實時流數據的處理。支持多種數據源獲取數據,包括Kafka、Flume、Zero MQ,Kinesis以及TCP Sockets,從數據源獲取數據之后,可以使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最后還可以將處理結果存儲到文件系統(tǒng),數據庫和現場儀表盤。

在”O(jiān)ne Stack rule them all”的基礎上,可以使用Spark的其他子框架,如集群學習、圖計算等,對流數據進行處理。

Spark的各個子框架都是基于Spark Core的,Spark Streaming在內部的處理機制是,接收實時流的數據,并根據一定的時間間隔拆分成一批批的數據,然后通過Spark Enging處理這些批數據,最終得到處理后的一批批結果數據。對應的批數據,在Spark內核對應一個RDD實例,因此,對應流數據的DStream可以看成是一組RDDS,即RDD的一個序列。通俗點理解的話,在流數據分成一批一批后,通過一個先進先出的隊列,然后Spark Enging從該隊列中依次取出一個個批數據,把批數據封裝成一個個RDD,然后進行處理,這是一個典型的生產者/消費者模型,對應的就有生產者消費者模型的問題,即如何協(xié)調生產速率和消費速率。

離散流(discretized stream)或DStream
這是SparkStraming對內部持續(xù)的實時數據流的抽象描述,即我們處理的一個實時數據流,在Spark Streaming中對應于一個DStream實例。

批數據(batch data)
這是化整為零的第一步,將實時流數據以時間片為單位進行分批,將流處理轉化為時間片數據的批處理。隨著持續(xù)時間的推移,這些處理結果就形成了對應的結果數據流了。

時間片或批處理時間間隔(batch interval)
這是人為地對數據流進行定量的標準,以時間片作為我們拆分數據流的依據。一個時間片的數據對應一個RDD實例。

窗口長度(window length)
一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數。

滑動時間間隔
前一個窗口到后一個窗口所經過的時間長度。必須是批處理時間間隔的倍數。

Input DStream
一個input DStream是一個特殊的DStream,將Spark Streaming連接到一個外部數據源來讀取數據。

Spark Streaming 架構

在Spark Streaming中,數據處理是按批進行的,而數據采集是逐條進行的,因此在Spark Streaming中會事先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把采集到的數據匯總起來稱為一批數據交個系統(tǒng)區(qū)處理。

對于窗口操作而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續(xù)時間,在窗口操作中,只有窗口的長度滿足了才會觸發(fā)批處理的處理。除了窗口的長度,窗口操作還有另一個重要的參數就是滑動間隔(slide duration),它指的是經過多長時間窗口滑動一次形成新的窗口,滑動窗口默認情況下和批次間隔的相同,而窗口間隔一般設置的要比它們兩個大。在這里必須注意的一點是滑動間隔和窗口間隔的大小一定得設置為批處理間隔的整數倍。


圖片

Spark Streaming是一個對實時數據流進行高通量、容錯處理的流式處理系統(tǒng),可以對多種數據源(如Kafka、Flume、Zero MQ和TCP套接字)進行類似Map、Reduce和Join等復雜操作,并將結果保存到外部文件系統(tǒng)、數據庫或應用到實時儀表盤。

計算流程

Spark Streaming是將流式計算分解成一系列短小的批處理作業(yè)。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distrbute Dataset),然后將Spark Streaming中對DStream的Transformation操作變?yōu)獒槍park中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業(yè)務的需求可以對中間的結果進行疊加或者存儲到外部設備。

容錯性

對于流式計算來說,容錯性至關重要。首先我們要明確一下Spark中RDD的容錯性機制。每一個RDD都是一個不可變的分布式可重算的數據集,其記錄著確定性的操作繼承關系(lineage),所以只要輸入數據是可容錯的,那么任意一個RDD的分區(qū)(Partition)出錯或不可用,都是可以利用原始輸入數據通過轉換操作而重新算出的。

對于Spark Streaming來說,其RDD的傳承關系如下圖所示,圖中的每一個橢圓形表示一個RDD,橢圓形中的每個圓形代表一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最后一個RDD則表示每一個Batch Size鎖產生的中間結果RDD。我們可以看到圖中的每一個RDD都是通過lineage相連接的,由于Spark Streaming輸入數據可以來自磁盤,例如HDFS(多份拷貝)或是來自與網絡的數據流(Spark Streaming會將網絡輸入數據的每一個數據流拷貝兩份到其他的機器)都能保證容錯性,所以RDD中任意的Partition出錯,都可以并行地在其他機器上將缺失的Partition計算出來。這個容錯恢復方式比連續(xù)計算模型(如Storm)的效率更高。

實時性

對于實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對于每一段數據的處理都會經過Spark DAG圖分解以及Spark的任務集的調度過程。對于目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5 ~ 2秒之間(Stom目前最小的延遲在100ms左右),所以Spark Streaming能夠滿足除對實時性要求非常高(如高頻實時交易)之外的所有流式準實時計算場景。

擴展性與吞吐量

Spark目前在EC2上已經能夠線性擴展到100個節(jié)點(每個節(jié)點4Core),可以以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,以下是Berkeley利用WordCount和Grep兩個用例所做的測試。

Spark Streaming 持久化

與RDD一樣,DStream同樣也能通過persist()方法將數據流存放在內存中,默認的持久化方法是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣做的好處是遇到需要多次迭代計算的程序時,速度優(yōu)勢十分的明顯。而對于一些基于窗口的操作,如reduceByWindow、reduceByKeyAndWindow,以及基于狀態(tài)的操作,如updateStateByKey,其默認的持久化策略就是保存在內存中。

對于來自網絡的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩臺機器上,這也是為了容錯性而設計的。

另外,對于窗口和有狀態(tài)的操作必須checkpont,通過StreamingContext的checkpoint來指定目錄,通過DStream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。

Spark Streaming 性能優(yōu)化

1,優(yōu)化運行時間

增加并行度
確保使用整個集群的資源,而不是把任務集中在幾個特定的節(jié)點上。對于包含shuffle的操作,增加其并行度以確保更為充分的使用集群資源。

減少數據序列化,反序列化的負擔
Spark Streaming默認將接受到的數據序列化后存儲,以減少內存的使用。但是序列化和反序列化需要更多的CPU時間,因此更加高效的序列化方式和自定義的序列化接口以更高效的使用CPU。

設置合理的batch duration(批處理時間)
在Spark Streaming中,Job之間有可能存在依賴關系,后面的Job必須確保前面的作業(yè)執(zhí)行結束后才能提交。若前面的Job執(zhí)行的時間超出了批處理時間間隔,那么后面的Job就無法按時提交,這樣就會進一步拖延接下來的Job,造成后續(xù)Job的阻塞。因此設置一個合理的批處理間隔以確保作業(yè)能夠在這個批處理間隔內結束是必須的。

2,優(yōu)化內存使用
控制batch size(批處理間隔內的數據量)
Spark Streaming會把批處理間隔內接收到的所有數據存放在Spark內部的可用內存區(qū)域中,因此必須確保當前節(jié)點Spark的可用內存中至少能容納這個批處理時間間隔內的所有數據,否則必須增加新的資源以提高集群的處理能力。

及時清理不再使用的數據
前面講到Spark Streaming會將接受的數據應及時清理,以確保Spark Streaming有富余的可用內存空間。通過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數需要小心設置以免后續(xù)操作中所需要的數據被超時錯誤處理。

觀察及適當調整GC策略
GC會影響Job的正常運行,可能延長Job的執(zhí)行時間,引起一系列不可預料的問題。觀察GC的運行情況,采用不同的GC策略以進一步減小內存回收對Job運行的影響。

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容