簡介
Spark Streaming是核心Spark API的擴(kuò)展,可對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行可擴(kuò)展,高吞吐量,容錯(cuò)處理。實(shí)時(shí)流可以有許多數(shù)據(jù)來源(例如Kafka,F(xiàn)lume,Kinesis或TCP套接字)等,并可以使用高級(jí)功能(如map,reduce,join和window)組成的復(fù)雜算法來處理數(shù)據(jù)。經(jīng)過處理后的數(shù)據(jù)可以寫入到文件系統(tǒng)、數(shù)據(jù)庫、實(shí)時(shí)儀表盤等。
在Spark內(nèi)部,Spark Streaming接收實(shí)時(shí)輸入流,并將其分成若干批,這些批次被送進(jìn)Spark Engine中處理,最后按批次產(chǎn)生最后的結(jié)果。Spark Streaming的處理管道示意圖如下:

背景知識(shí)
什么是流處理
在正式介紹Spark Streaming之前,我們先來介紹一下什么叫做數(shù)據(jù)流。

流處理是連續(xù)處理新到來的數(shù)據(jù)以更新計(jì)算結(jié)果的行為。在流處理中,輸入數(shù)據(jù)是無邊界的,沒有預(yù)定的開始或結(jié)束。它是一系列到達(dá)流處理系統(tǒng)的事件(例如,信用卡交易,單擊網(wǎng)站動(dòng)作,或從物聯(lián)網(wǎng)I o T傳感器讀取的數(shù)據(jù)),用戶應(yīng)用程序?qū)Υ耸录骺梢詧?zhí)行各種查詢操作(例如,跟蹤每種事件類型的發(fā)生次數(shù),或?qū)⑦@些事件按照某時(shí)間窗口聚合)。應(yīng)用程序在運(yùn)行時(shí)將輸出多個(gè)版本的結(jié)果,或者在某外部系統(tǒng) (如鍵值存儲(chǔ))中持續(xù)保持最新的聚合結(jié)果。
當(dāng)然,我們可以將流處理(stream processing)與批處理(batch processing)進(jìn)行比較,批處理是在固定大小輸入數(shù)據(jù)集上進(jìn)行計(jì)算的。通常,這可能是數(shù)據(jù)倉庫中的大規(guī)模數(shù)據(jù)集,其包含來自應(yīng)用程序的所有歷史事件(例如,過去一個(gè)月的所有網(wǎng)站訪問記錄或傳感器記錄的 數(shù)據(jù))。批處理也可以進(jìn)行查詢計(jì)算,與流處理差不多,但只計(jì)算一次結(jié)果。
流處理的應(yīng)用場景
流處理系統(tǒng)主要有以下6個(gè)應(yīng)用場景:
- 通知和警報(bào)
- 實(shí)時(shí)報(bào)告
- 增量ETL(Extract-Transform-Load)
最常見的流處理應(yīng)用程序之一是減少公司在信息檢索時(shí)必須忍受的延遲時(shí)間,簡而言之,“把批處理任務(wù)用流處理方式執(zhí)行”。 - 實(shí)時(shí)數(shù)據(jù)更新來提供實(shí)時(shí)服務(wù)
- 實(shí)時(shí)決策
流處理系統(tǒng)的實(shí)時(shí)決策包括分析新的輸入,根據(jù)業(yè)務(wù)邏輯自動(dòng)作出決策來應(yīng)對(duì)新數(shù)據(jù)。 - 在線機(jī)器學(xué)習(xí)
結(jié)合用戶的實(shí)時(shí)流數(shù)據(jù)和歷史數(shù)據(jù)來進(jìn)行在線實(shí)時(shí)推斷。
流處理的優(yōu)點(diǎn)
在大多數(shù)情況下,批處理更容易理解、更容易調(diào)試、也更容易編寫應(yīng)用程序。此外,批量處理數(shù)據(jù)也使得數(shù)據(jù)處理的吞吐量大大高于許多流處理系統(tǒng)。然而,流處理對(duì)于以下兩種情況非常必要。
-
流處理可以降低系統(tǒng)延遲時(shí)間
當(dāng)你的應(yīng)用程序需要快速響應(yīng)時(shí)間(在分鐘、秒或毫秒級(jí)別上),你需要一個(gè)可以將狀態(tài)保存在內(nèi)存中的流處理系統(tǒng),以獲得更好的性能。 -
自動(dòng)增量計(jì)算,高效更新結(jié)果
流處理系統(tǒng)可以記住以前計(jì)算的狀態(tài),只計(jì)算新數(shù)據(jù)。而對(duì)于傳統(tǒng)批處理系統(tǒng)來說,必須加載所有時(shí)間段的數(shù)據(jù)才可以做到,并且還要添加額外的代碼來實(shí)現(xiàn)相應(yīng)功能。
Spark Streaming
Spark Streaming是用來處理實(shí)時(shí)數(shù)據(jù)流數(shù)據(jù)的,它是Spark Core API的一個(gè)非常有用的擴(kuò)展。Spark Streaming可以對(duì)實(shí)時(shí)數(shù)據(jù)流進(jìn)行高吞吐量、包含容錯(cuò)的流式處理。
Spark Streaming提供了一個(gè)高級(jí)抽象對(duì)象,名為discretized stream或者也叫DStream,用于代表連續(xù)的數(shù)據(jù)流。DStreams可以從輸入數(shù)據(jù)源比如Kafka, Flume, and Kinesis等中創(chuàng)建得到,也可以通過對(duì)已有的DStream進(jìn)行高階操作得到。DStream代表的其實(shí)是一系列的RDDs。
Spark Streaming特性
Spark 流處理主要有以下6個(gè)特點(diǎn):
- 可擴(kuò)展性
Spark Streaming可以很容易地拓展到成千上百個(gè)節(jié)點(diǎn)上 - 速度
Spark Streaming實(shí)現(xiàn)了低延遲 - 容錯(cuò)能力
Spark 能快讀地從錯(cuò)誤中恢復(fù) - 易集成到Spark體系
Spark集成了批處理和實(shí)時(shí)處理功能 - 商業(yè)分析
Spark Streaming可以用在商業(yè)數(shù)據(jù)分析中,用來追蹤用戶行為數(shù)據(jù)
Spark Streaming工作流程
Spark Streaming工作流程分為四個(gè)階段。第一個(gè)是從各種數(shù)據(jù)源中獲取流數(shù)據(jù)。這些數(shù)據(jù)源可以是流式數(shù)據(jù)源比如Akka, Kafka, Flume, AWS或者Parquet這樣的實(shí)時(shí)數(shù)據(jù)流;也包括HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB 以及 Cassandra這些用于靜態(tài)或者批量流的數(shù)據(jù)源。一旦得到了數(shù)據(jù)流,Spakr可以在其之上使用Spark MLib API來進(jìn)行機(jī)器學(xué)習(xí)算法處理,也可以使用Spark SQL來執(zhí)行相關(guān)操作。最終,這些流的輸出可以被存儲(chǔ)在各種類型的數(shù)據(jù)存儲(chǔ)系統(tǒng)中,比如HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS 以及本地文件系統(tǒng)。
Spark Streaming 基礎(chǔ)組件
1. Streaming Context
Streaming Context用來消耗數(shù)據(jù)流。通過對(duì)它注冊(cè)一個(gè)輸入數(shù)據(jù)流,可以產(chǎn)生一個(gè)Receiver對(duì)象。這是使用Spark Streaming流處理功能的入口。Spark對(duì)包括Twitter、Akka和ZeroMQ等一些列數(shù)據(jù)源提供了默認(rèn)的數(shù)據(jù)流讀取實(shí)現(xiàn)。
一個(gè)StreamContext對(duì)象可以從SparkContext對(duì)象構(gòu)造出來。SparkContext對(duì)象表示與Spark集群的連接關(guān)系,可以用來在該集群上創(chuàng)建RDD、累加器和廣播變量。
可以通過以下兩種方式來創(chuàng)建一個(gè)StreamingContext對(duì)象。
- 通過已有的SparkContext對(duì)象來創(chuàng)建:
import org.apache.spark._
import org.apache.spark.streaming._
var ssc = new StreamingContext(sc,Seconds(1)) //這個(gè)處理間隔時(shí)間要根據(jù)具體業(yè)務(wù)來設(shè)定
- 通過SparkCconf對(duì)象來創(chuàng)建:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1)) //這個(gè)處理間隔時(shí)間要根據(jù)具體業(yè)務(wù)來設(shè)定
在定義好StreamingContext對(duì)象之后,可以做以下事情:
- 通過創(chuàng)建輸入DStreams來定義輸入源
- 通過對(duì)DSteams應(yīng)用Transformation和Action操作來定義流相關(guān)操作
- 使用streamingContext.start()來開啟數(shù)據(jù)接收以及處理
- 使用streamingContext.awaitTermination()來等待數(shù)據(jù)處理停止(主動(dòng)停止或者錯(cuò)誤發(fā)生)
- 使用StreamContext.stop()來手動(dòng)停止數(shù)據(jù)處理
注意事項(xiàng):
- 一旦一個(gè)StreamingContext被啟動(dòng)了,就無法再繼續(xù)向它添加新的計(jì)算規(guī)則了
- 一旦一個(gè)StreamingContext被啟動(dòng)了,它就無法再被重新啟動(dòng)
- 同一個(gè)時(shí)間段,只能有一個(gè)StreamContext存在與JVM中
- 對(duì)StreamingContext執(zhí)行stop操作,同樣也適用于SparkContext。如果只想停止StreamContext,那么請(qǐng)使用stopSparkContext
- 一個(gè)SparkContext可以被多次使用來創(chuàng)建多個(gè)StreamingContext,只要之前的StreamingContext已經(jīng)被停止了。
2. DStream
Discretized Stream 或者 DStream 是Spark Streaming提供的基礎(chǔ)抽象對(duì)象。它代表了連續(xù)的數(shù)據(jù)流,可以是從數(shù)據(jù)源中獲取到的數(shù)據(jù)流,也可以是通過對(duì)輸入流進(jìn)行Transformation操作之后得到的處理后的數(shù)據(jù)流。在Spark內(nèi)部,DStream代表的是一個(gè)RDD序列。DStream中的每個(gè)RDD包含了數(shù)據(jù)流按照指定時(shí)間間隔分割后的某一段。如下圖所示:

對(duì)DStream的Transformation操作與RDD類似,并且在普通RDD上的Transformation操作大多數(shù)在DStream上也可用。


這里再補(bǔ)充一些Transformation操作:
transform操作
Transform允許在DStream 上執(zhí)行任意的 RDD-to-RDD 函數(shù),即使這些函數(shù)并沒有在 DStream 的 API 中暴露出來,通過該函數(shù)可以方便的擴(kuò)展 Spark API。該函數(shù)每一批次調(diào)度一次。其實(shí)也就是對(duì) DStream 中的 RDD 應(yīng)用轉(zhuǎn)換。join操作
兩個(gè)流之間的 join 需要兩個(gè)流的批次大小一致,這樣才能做到同時(shí)觸發(fā)計(jì)算。計(jì)算過程就是 對(duì)當(dāng)前批次的兩個(gè)流中各自的 RDD 進(jìn)行 join,與兩個(gè) RDD的join效果相同。除此之外,也可以同樣對(duì)DStream使用leftOuterJoin, rightOuterJoin, fullOuterJoin等操作。-
UpdateStateByKey操作
UpdateStateByKey 原語用于記錄歷史記錄,有時(shí),我們需要在 DStream中跨批次維護(hù)狀態(tài)(例 如流計(jì)算中累加 wordcount)。針對(duì)這種情況,updateStateByKey()為我們提供了對(duì)一個(gè)狀態(tài)變量的訪問,用于鍵值對(duì)形式的 DStream。給定一個(gè)由(鍵,事件)對(duì)構(gòu)成的 DStream,并傳遞一個(gè)指定如何根據(jù)新的事件更新每個(gè)鍵對(duì)應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個(gè)新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài)) 對(duì)。UpdateStateByKey允許開發(fā)者保存任意信息,并且根據(jù)新的數(shù)據(jù)對(duì)信息進(jìn)行持續(xù)更新。要想使用這個(gè)功能,分為以下兩步:- 定義狀態(tài): 狀態(tài)可以是任意類型的
- 定義狀態(tài)更新函數(shù): 定義一個(gè)函數(shù),它來決定如何根據(jù)歷史數(shù)據(jù)和從輸入流中得到的新數(shù)據(jù)來對(duì)狀態(tài)進(jìn)行更新
在每個(gè)批次中,Spark都會(huì)對(duì)所有的key應(yīng)用狀態(tài)更新函數(shù),不管這些key在一個(gè)批次中是否有數(shù)據(jù)到來。如果更新函數(shù)返回None,那么這個(gè)key-value將會(huì)被刪除。
注意:使用 updateStateByKey 需要對(duì)檢查點(diǎn)目錄進(jìn)行配置,因?yàn)檫@個(gè)操作會(huì)使用檢查點(diǎn)來保存狀態(tài)。
-
Window操作
Spark Streaming也提供了windowed操作,它允許開發(fā)者在一個(gè)滑動(dòng)窗口上應(yīng)用Transformation操作。示意圖如下:
如上圖所示,每當(dāng)窗口在源DStreram上滑動(dòng)時(shí),落入到窗口內(nèi)的源RDD就會(huì)被合并起來,然后進(jìn)行相應(yīng)操作,最終落入到windowed DStream中。在上圖的例子中,操作被應(yīng)用在了最近的3個(gè)時(shí)間單位,并且窗口每次向前滑動(dòng)2個(gè)時(shí)間單位。這表明任何的window操作都需要指定兩個(gè)參數(shù):window操作- 窗口長度: 即窗口的持續(xù)時(shí)間,即窗口包含的時(shí)間單位,上圖中是3
- 滑動(dòng)間隔時(shí)間: 即執(zhí)行窗口操作的周期,即窗口每滑動(dòng)多少個(gè)時(shí)間單位就執(zhí)行一次窗口操作,上圖中是2
注意:
這兩個(gè)參數(shù)必須是源DStream批處理周期的整數(shù)倍。
關(guān)于Window操作還有以下方法:
(1)window(windowLength, slideInterval): 基于對(duì)源 DStream 窗化的批次進(jìn)行計(jì)算返回一個(gè)新的 Dstream
(2)countByWindow(windowLength, slideInterval): 返回一個(gè)滑動(dòng)窗口計(jì)數(shù)流中的元素個(gè)數(shù)
(3)reduceByWindow(func, windowLength, slideInterval): 通過使用自定義函數(shù)整合滑動(dòng)區(qū)間流元素來創(chuàng)建一個(gè)新的單元素流
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 當(dāng)在一個(gè)(K,V)對(duì)的 DStream 上調(diào)用此函數(shù),會(huì)返回一個(gè)新(K,V)對(duì) DStream,此處通過對(duì)滑動(dòng)窗口中批次數(shù)據(jù)使用 reduce 函數(shù)來整合每個(gè) key 的 value 值。
3. Input DStream
Input DStream是代表從原始數(shù)據(jù)流源頭得到的數(shù)據(jù)流。每個(gè)Input DStream都有一個(gè)Receiver對(duì)象與之搭配,Receiver對(duì)象負(fù)責(zé)從源頭獲取數(shù)據(jù)流然后保存在Spark內(nèi)存中用于處理。
Spark Streaming的內(nèi)置數(shù)據(jù)源有兩種分類,如下:
- 基礎(chǔ)源:這些數(shù)據(jù)源直接在SparkContext中可用,比如內(nèi)置的文件系統(tǒng)和Socket連接
- 高級(jí)數(shù)據(jù)源:比如像 Kafka, Flume, Kinesis等,這些數(shù)據(jù)源需要通過單獨(dú)的工具程序類獲取
4. Output DStream
輸出操作允許將DStream的數(shù)據(jù)寫入到外部的系統(tǒng)中,比如數(shù)據(jù)庫,文件系統(tǒng)。與RDD中的惰性求值類似,OutPut操作執(zhí)行的時(shí)候,才會(huì)觸發(fā)所有應(yīng)用在DStream上的Transformation操作的實(shí)際執(zhí)行。
輸出操作一般有:
- print()
在運(yùn)行流程序的驅(qū)動(dòng)結(jié)點(diǎn)上打印DStream中每一批次數(shù)據(jù)的最開始10個(gè)元素。這用于開發(fā)和調(diào)試. - saveAsTextFiles(prefix, [suffix])
以 text 文件形式存儲(chǔ)這個(gè) DStream 的內(nèi)容。每一批次的存儲(chǔ)文件名基于參數(shù)中的 prefix 和 suffix。 - saveAsObjectFiles(prefix, [suffix])
以 Java 對(duì)象序列化的方式將 Stream 中的數(shù)據(jù)保存為SequenceFiles . 每一批次的存儲(chǔ)文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]". - saveAsHadoopFiles(prefix, [suffix])
將 Stream 中的數(shù)據(jù)保存為 Hadoop files. 每一批次的存儲(chǔ)文件名基于參數(shù)中的為"prefix-TIME_IN_MS[.suffix]" - foreachRDD(func)
這是最通用的輸出操作,即將函數(shù) func 用于產(chǎn)生于 stream 的每一個(gè)RDD。其中參數(shù)傳入的函數(shù) func 應(yīng)該實(shí)現(xiàn)將每一個(gè) RDD 中數(shù)據(jù)推送到外部系統(tǒng),如將RDD 存入文件或者通過網(wǎng)絡(luò)將其寫入數(shù)據(jù)庫。
5. Caching
DStream可以允許開發(fā)者在內(nèi)存中緩存或者持久化流數(shù)據(jù),如果DStream中的數(shù)據(jù)會(huì)被多次計(jì)算的話,這通常是很有用的一種做法??梢酝ㄟ^調(diào)用DStream的persist()方法來實(shí)現(xiàn)緩存操作。
對(duì)于從網(wǎng)絡(luò)中接收到的數(shù)據(jù)流,比如Kafka,F(xiàn)lume,Socket等,默認(rèn)的數(shù)據(jù)持久化級(jí)別是將數(shù)據(jù)復(fù)制兩份,然后存儲(chǔ)到兩個(gè)節(jié)點(diǎn)中以實(shí)現(xiàn)容錯(cuò)機(jī)制。
6. CheckPoint
一個(gè)流程序必須24/7全天候運(yùn)行,因此必須能夠抵抗與應(yīng)用程序邏輯無關(guān)的故障,比如系統(tǒng)錯(cuò)誤、JVM崩潰等。為了實(shí)現(xiàn)這種功能,Spark Streaming需要保存足夠的信息到容錯(cuò)存儲(chǔ)系統(tǒng)中,以便能夠從故障中恢復(fù)。CheckPoint有兩種類型:
- Metadata checkpointing
- Data checkpointing
簡單描述一下,Metadata checkpointing 主要是用于從驅(qū)動(dòng)錯(cuò)誤等中恢復(fù);對(duì)于某些有狀態(tài)的Transformation操作,如果期間出現(xiàn)了錯(cuò)誤,可以使用Data checkpointing從錯(cuò)誤中恢復(fù)。
如果應(yīng)用程序有以下要求,那么必須使用Checkpointing技術(shù):
- 使用了有狀態(tài)的Transformation操作
比如使用了updateStateByKey或者reduceByKeyAndWindow等操作,那么就必須提供checkpoint的存儲(chǔ)路徑,以便允許定期執(zhí)行RDD checkpointing操作,從而保存相應(yīng)的信息。 - 需要從應(yīng)用程序的驅(qū)動(dòng)錯(cuò)誤中恢復(fù)
對(duì)于普通的流式程序,即沒有執(zhí)行帶狀態(tài)的Transformation操作,那么無需打開checkpointing操作。
通過在容錯(cuò)的、可靠的文件系統(tǒng)上設(shè)置一個(gè)檢查點(diǎn)目錄即可啟動(dòng)Spark的CheckPointing功能,即可以保存檢查點(diǎn)的信息,以便令程序可以從錯(cuò)誤中恢復(fù)??梢酝ㄟ^執(zhí)行streamingContext.checkpoint(checkpointDirectory)來實(shí)現(xiàn)這個(gè)功能。如果應(yīng)用程序要使用帶狀態(tài)的Transformation操作,這一步是必須的。創(chuàng)建Checkpointing的示意代碼如下:
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory目錄存在,那么context會(huì)從checkpoint數(shù)據(jù)中重建;如果目錄不存在,那么函數(shù)functionToCreateContext會(huì)被調(diào)用以創(chuàng)建一個(gè)新的context對(duì)象,并且設(shè)置DStream。
