深入理解Spark Streaming的執(zhí)行模型(全局性word count)

本文譯自《Diving into Apache Spark Streaming’s Execution Model》,作者: Tathagata Das, Matei Zaharia , Patrick Wendell 發(fā)布于 ENGINEERING BLOGJuly 30, 2015

有了這么多的分布式流處理引擎,人們經(jīng)常問我們Apache Spark Streaming的獨特優(yōu)點。從早期開始,Apache Spark提供了一個統(tǒng)一的引擎,原生支持批處理和流工作負載。這不同于其他系統(tǒng),其具有僅設(shè)計用于流的處理引擎,或者具有類似的批處理和流API,但是在內(nèi)部編譯到不同的引擎。 Spark的單執(zhí)行引擎和統(tǒng)一的批處理和流式編程模型比其他傳統(tǒng)的流系統(tǒng)帶來了一些獨特的優(yōu)勢。特別是,四個主要方面是:

  • 從故障和分離器快速恢復(fù)
  • 更好的負載平衡和資源使用
  • 結(jié)合流數(shù)據(jù)與靜態(tài)數(shù)據(jù)集和交互式查詢
  • 與高級處理庫的本地集成(SQL,機器學(xué)習(xí),圖形處理)

在這篇文章中,我們概述了Spark Streaming的架構(gòu),并解釋它如何提供上述好處。我們還討論了項目中利用執(zhí)行模型的一些有趣的正在進行的工作。

Stream Processing Architectures – The Old and the New

在高級別,現(xiàn)代分布式流處理流水線執(zhí)行如下:

  1. 從數(shù)據(jù)源(例如活日志,系統(tǒng)遙測數(shù)據(jù),IoT設(shè)備數(shù)據(jù)等)接收流數(shù)據(jù)到像Apache Kafka,Amazon Kinesis等一些數(shù)據(jù)攝取系統(tǒng)。
  2. 在集群上并行處理數(shù)據(jù)。 這是流處理引擎設(shè)計的目的,我們將在下面詳細討論。
  3. 將結(jié)果輸出到下游系統(tǒng),如HBase,Cassandra,Kafka等。

為了處理數(shù)據(jù),大多數(shù)傳統(tǒng)的流處理系統(tǒng)設(shè)計有連續(xù)的運算符模型,其工作如下:

  • 有一組工作節(jié)點,每個節(jié)點運行一個或多個連續(xù)運算符。
  • 每個連續(xù)運算符一次處理流數(shù)據(jù)一個記錄,并將該記錄轉(zhuǎn)發(fā)到流水線中的其他運算符。
  • 存在用于從攝取系統(tǒng)接收數(shù)據(jù)的“源”運算符和輸出到下游系統(tǒng)的“sink”運算符。

連續(xù)運算符是一個簡單而自然的模型。然而,隨著當今趨向于更大規(guī)模和更復(fù)雜的實時分析,這種傳統(tǒng)架構(gòu)也遇到了一些挑戰(zhàn)。我們設(shè)計Spark Streaming以滿足以下要求:

  • 快速故障和分段恢復(fù) - 隨著規(guī)模的增大,集群節(jié)點出現(xiàn)故障或不可預(yù)測的減速(即分段)的可能性更高。系統(tǒng)必須能夠自動從故障和分離器中恢復(fù),以實時提供結(jié)果。不幸的是,連續(xù)運算符到工作節(jié)點的靜態(tài)分配使得傳統(tǒng)系統(tǒng)很難從故障和分離器中恢復(fù)。
  • 負載平衡 - 工人之間處理負載的不均勻分配可能導(dǎo)致連續(xù)運營商系統(tǒng)中的瓶頸。這更可能發(fā)生在大型集群和動態(tài)變化的工作負載中。系統(tǒng)需要能夠基于工作負載動態(tài)地調(diào)整資源分配。
  • 流式處理,批處理和交互式工作負載的統(tǒng)一 - 在許多使用情況下,以交互方式查詢流式數(shù)據(jù)(畢竟流式處理系統(tǒng)在內(nèi)存中擁有所有內(nèi)容)或?qū)⑵渑c靜態(tài)數(shù)據(jù)集(例如預(yù)先計算楷模)。這在連續(xù)運營商系統(tǒng)中是困難的,因為它們不被設(shè)計為動態(tài)地引入用于ad-hoc查詢的新運營商。這需要一個可以組合批量,流式和交互式查詢的引擎。
  • 高級分析(如機器學(xué)習(xí)和SQL查詢) - 更復(fù)雜的工作負載需要不斷學(xué)習(xí)和更新數(shù)據(jù)模型,甚至可以使用SQL查詢查詢流式數(shù)據(jù)的“最新”視圖。同樣,在這些分析任務(wù)之間有一個共同的抽象,使開發(fā)人員的工作更容易。

為了滿足這些要求,Spark Streaming使用一種稱為離散化流的新架構(gòu),它直接利用Spark引擎的豐富庫和容錯。

Architecture of Spark Streaming: Discretized Streams

Spark Streaming不是一次處理流數(shù)據(jù)一條記錄,而是將流數(shù)據(jù)離散化成微小的亞秒級微批次。 換句話說,Spark Streaming的接收器并行接受數(shù)據(jù)并將其緩存在Spark的工作節(jié)點的內(nèi)存中。 然后,延遲優(yōu)化的Spark引擎運行短任務(wù)(幾十毫秒)來處理批次并將結(jié)果輸出到其他系統(tǒng)。 注意,與傳統(tǒng)的連續(xù)運算符模型不同,其中計算被靜態(tài)地分配給一個節(jié)點,Spark任務(wù)根據(jù)數(shù)據(jù)的位置和可用資源被動態(tài)地分配給工作者。 這可以實現(xiàn)更好的負載平衡和更快的故障恢復(fù),我們將在下面說明。

此外,每批數(shù)據(jù)是彈性分布式數(shù)據(jù)集(RDD),它是Spark中容錯數(shù)據(jù)集的基本抽象。 這允許使用任何Spark代碼或庫處理流數(shù)據(jù)。

Benefits of Discretized Stream Processing

讓我們看看這個架構(gòu)如何允許Spark Streaming實現(xiàn)我們之前設(shè)置的目標。

Dynamic load balancing

將數(shù)據(jù)劃分為小的微批次允許對資源的計算的細粒度分配。 例如,考慮一個簡單的工作負載,其中輸入數(shù)據(jù)流需要通過密鑰進行分區(qū)和處理。 在大多數(shù)其他系統(tǒng)采用的傳統(tǒng)的一次記錄方法中,如果其中一個分區(qū)比其他分區(qū)的計算密集程度更高,則靜態(tài)分配以處理該分區(qū)的節(jié)點將成為瓶頸并減慢管道。 在Spark Streaming中,作業(yè)的任務(wù)將自然地在工作負載之間平衡負載 - 一些工作負載將處理幾個更長的任務(wù),另一些工作將處理更多的更短任務(wù)。

Fast failure and straggler recovery

在節(jié)點故障的情況下,傳統(tǒng)系統(tǒng)必須在另一個節(jié)點上重新啟動故障的連續(xù)運算符,并重放數(shù)據(jù)流的某些部分以重新計算丟失的信息。 注意,只有一個節(jié)點正在處理重新計算,并且管道不能繼續(xù),直到新節(jié)點在重放之后趕上。 在Spark中,計算已經(jīng)離散化為小的,確定性的任務(wù),可以在任何地方運行而不影響正確性。 因此,故障任務(wù)可以在集群中的所有其他節(jié)點上并行重新啟動,從而在所有節(jié)點上均勻分布所有重新計算,并且比傳統(tǒng)方法更快地從故障中恢復(fù)。

Unification of batch, streaming and interactive analytics

Spark Streaming中的關(guān)鍵編程抽象是DStream或分布式流。 每批流數(shù)據(jù)由RDD表示,RDD是Spark對分布式數(shù)據(jù)集的概念。 因此,DStream只是一系列的RDD。 這種通用表示允許批處理和流工作負載無縫地互操作。 用戶可以對每批流數(shù)據(jù)應(yīng)用任意Spark函數(shù):例如,很容易將DStream與預(yù)先計算的靜態(tài)數(shù)據(jù)集(作為RDD)連接。

// Create data set from Hadoop file
val dataset = sparkContext.hadoopFile("file")
// Join each batch in stream with the dataset
kafkaDStream.transform { batchRDD =>
  batchRDD.join(dataset).filter(...)
}

由于批處理的流數(shù)據(jù)存儲在Spark的工作內(nèi)存中,因此可以按需交互式查詢。 例如,您可以通過Spark SQL JDBC服務(wù)器公開所有的流狀態(tài),我們將在下一節(jié)中顯示。 Spark中批處理,流式處理和交互式工作負載的這種統(tǒng)一非常簡單,但在沒有這些工作負載的通用抽象的系統(tǒng)中很難實現(xiàn)。

Advanced analytics like machine learning and interactive SQL

Spark互操作性擴展到豐富的庫,如MLlib(機器學(xué)習(xí)),SQL,DataFrames和GraphX。 讓我們探討一些用例:

Streaming + SQL and DataFrames

DStreams生成的RDD可以轉(zhuǎn)換為DataFrames(Spark SQL的編程接口),并使用SQL查詢。 例如,使用Spark SQL的JDBC服務(wù)器,您可以將流的狀態(tài)公開給任何談?wù)揝QL的外部應(yīng)用程序。
val hiveContext = new HiveContext(sparkContext)

// ...
wordCountsDStream.foreachRDD { rdd =>
  // Convert RDD to DataFrame and register it as a SQL table
  val wordCountsDataFrame = rdd.toDF("word", "count") 
  wordCountsDataFrame.registerTempTable("word_counts") 
}
// ...
// Start the JDBC server
HiveThriftServer2.startWithContext(hiveContext)

然后,您可以使用Spark附帶的beeline客戶端或Tableau等工具,通過JDBC服務(wù)器交互式查詢不斷更新的“word_counts”表。

show tables;
+--------------+--------------+
| tableName | isTemporary |
+--------------+--------------+
| word_counts | true |
+--------------+--------------+
1 row selected (0.102 seconds)

select * from word_counts;
+-----------+--------+
| word | count |
+-----------+--------+
| 2015 | 264 |
| PDT | 264 |
| 21:45:41 | 27 |

Streaming + MLlib

使用MLlib離線生成的機器學(xué)習(xí)模型可以應(yīng)用于流數(shù)據(jù)。 例如,以下代碼使用一些靜態(tài)數(shù)據(jù)訓(xùn)練KMeans聚類模型,然后使用該模型對Kafka數(shù)據(jù)流中的事件進行分類。

// Learn model offline
val model = KMeans.train(dataset, ...)
// Apply model online on stream
val kafkaStream = KafkaUtils.createDStream(...)
kafkaStream.map { event => model.predict(featurize(event)) }

我們在我們的Spark Summit 2014 Databricks演示中演示了這種離線學(xué)習(xí)在線預(yù)測。 從那時起,我們還在MLLib中添加了流機器學(xué)習(xí)算法,可以從標記的數(shù)據(jù)流中連續(xù)訓(xùn)練。 其他Spark庫也可以很容易地從Spark Streaming中調(diào)用。

Performance

鑒于Spark Streaming的獨特設(shè)計,它運行速度有多快?實際上,Spark Streaming對批處理數(shù)據(jù)和利用Spark引擎的能力導(dǎo)致與其他流系統(tǒng)相當或更高的吞吐量。在延遲方面,Spark Streaming可以實現(xiàn)低至幾百毫秒的延遲。開發(fā)人員有時會問,微批處理是否會增加太多的延遲。在實踐中,批處理延遲只是端到端流水線延遲的一個小組件。例如,許多應(yīng)用程序通過滑動窗口計算結(jié)果,并且甚至在連續(xù)的操作員系統(tǒng)中,該窗口僅周期性地更新(例如,每2秒滑動的20秒窗口)。許多管道從多個源收集記錄,并等待短時間來處理延遲或亂序數(shù)據(jù)。最后,任何自動觸發(fā)算法都傾向于等待一段時間來觸發(fā)觸發(fā)。因此,與端到端延遲相比,批處理很少增加顯著的開銷。事實上,DStream的吞吐量增益通常意味著您需要更少的機器來處理相同的工作負載。
Future Directions for Spark Streaming
Spark Streaming是Spark中使用最廣泛的組件之一,而且流媒體用戶還有更多的選擇。我們團隊正在處理的一些最重要的項目將在下面討論。你可以期望這些在接下來的幾個版本的Spark:

  • 背壓 - 流工作負載通??梢杂型话l(fā)數(shù)據(jù)(例如在奧斯卡期間tweet中的突然尖峰),并且處理系統(tǒng)必須能夠優(yōu)雅地處理它們。在即將到來的Spark 1.5版本(下個月)中,Spark將添加更好的反壓機制,允許Spark Streaming動態(tài)控制此類突發(fā)的吞吐速率。此功能代表我們在Databricks和Typesafe的工程師之間的聯(lián)合工作。
  • 動態(tài)縮放 - 控制攝取速率可能不足以處理數(shù)據(jù)速率的較長期變化(例如白天夜間的持續(xù)較高的推特率)。可以通過基于處理需求動態(tài)地縮放集群資源來處理這樣的變化。這在Spark Streaming架構(gòu)中非常容易做到 - 由于計算已經(jīng)分為小任務(wù),如果從集群管理器(YARN,Mesos,Amazon EC2等)獲取更多節(jié)點,它們可以動態(tài)重新分配到更大的集群, 。我們計劃添加對自動動態(tài)縮放的支持。
  • 事件時間和亂序數(shù)據(jù) - 在實踐中,用戶有時會記錄不按順序傳送,或者時間戳與提取時間不同。 Spark流將通過允許用戶定義的時間提取功能支持“事件時間”。這將包括延遲或亂序數(shù)據(jù)的松弛持續(xù)時間。
  • UI增強 - 最后,我們希望讓開發(fā)人員輕松調(diào)試其流應(yīng)用程序。為此,在Spark 1.4中,我們向流式Spark UI添加了新的可視化,使開發(fā)人員密切監(jiān)視其應(yīng)用程序的性能。在Spark 1.5中,我們通過顯示更多輸入信息(例如每個批處理中處理的Kafka偏移量)來進一步改進。

要了解有關(guān)Spark Streaming的更多信息,請閱讀官方編程指南或介紹其執(zhí)行和容錯模型的Spark Streaming研究論文。

完。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容