簡單寫一下自己讀了Spark Streaming 2.1.0 Programming Guide之后的體驗,也可以說是自己對該編程指南的理解與翻譯。
https://spark.apache.org/docs/2.1.0/streaming-programming-guide.html
Overview
Spark Streaming(下稱streaming)是Spark core的拓展,一個易擴展、高吞吐、高容錯的流式數(shù)據(jù)處理系統(tǒng)。

streaming接收輸入數(shù)據(jù)(kafka等)然后根據(jù)設置的處理時長batch interval將其切割為一個個的小數(shù)據(jù)集,然后對小數(shù)據(jù)集進行spark core/sql/mllib的操作,最后將處理后的小數(shù)據(jù)集輸出。

streaming具有一個高度抽象概念叫離散化的流(即DStream),代表了一塊連續(xù)的數(shù)據(jù)流。
A DStream is represented as a sequence of RDDs.
A Quick Example

Basic Concepts
Linking
- jar依賴,高級源kafka、flume等
Initializing StreamingContext
- 可以用已有的SparkContext創(chuàng)建
val ssc = new StreamingContext(sc, Seconds(1)) - ssc創(chuàng)建之后,
- 定義數(shù)據(jù)源以產(chǎn)生DStreams(定義開始點)
- 使用transformation和output operations算子來計算(定義中間過程,定義結(jié)束點)
- 利用ssc.start()來啟動步驟1的和步驟2
- 利用ssc.awaitTermination(-1L)來hold住整個streaming程序(讓其超時關閉,或者自然報錯關閉)
- ssc.stop()用來關閉ssc或者sc
- 幾點注意,
- 一個JVM里面僅有一個ssc
- sc可以重復用來創(chuàng)建ssc,只要前ssc被關閉了
Discretized Streams (DStreams)
DStream可以是來自于接收到的上游source(kafka),也可以是經(jīng)過transformating轉(zhuǎn)換后的DStream。
Input DStreams and Receivers
Input DStream通過Receiver接收上游source的數(shù)據(jù),receiver負責將上游數(shù)據(jù)接住,同時將其保存在spark的內(nèi)存系統(tǒng)中以供后續(xù)transformation處理。
streaming提供的兩種內(nèi)建源和自定義源:
- 基礎源,文件系統(tǒng),socket連接
- 高級源,kafka,flume,kinesis(需要額外的jar依賴)
- 自定義源,extends Receiver來實現(xiàn)自定義源
如果streaming程序需要并行接收多個數(shù)據(jù)源,可以創(chuàng)建多個receiver。但是因為一個receiver是一個長期的任務伴隨著streaming的開始和結(jié)束,所以其會始終占用一個core。所以,streaming程序要分配足夠的core來接收數(shù)據(jù)(#receiver)和處理數(shù)據(jù)(#processer)。
注意:本地跑streaming程序,不要使用local或者local[1]。因為兩種設置都是只分配一個core/thread給streaming程序,而該core會被receiver占用,但processer就沒有額外的core來驅(qū)動,導致整個程序只接收數(shù)據(jù),但是不能夠處理數(shù)據(jù)。所以通常設置為local[n], n > #receiver。
Receiver Reliability
根據(jù)是否能夠發(fā)出acknowledgment(ack)到source來區(qū)分接收器的reliable/unreliable。
Transformation on DStreams
與RDD的transformation類似,是一種lazy操作。輸入的DStream可以經(jīng)過transformation轉(zhuǎn)換成另一種DStream。
| Transformation | Meaning |
|---|---|
| map | 作用于DStream里面的每一個元素 |
| flatMap | 先調(diào)用map,然后調(diào)用flatten展平 |
| filter | 符合filter條件的則保留 |
| repartition | 通過shuffle來修改并行度 |
| union | 合流,將多個DStream合并成一個DStream,多job合并可以提高并行度 |
| reduce | 所有元素及其中間結(jié)果逐一順序執(zhí)行,最后得到一個結(jié)果 |
| countByValue | 計算key[T]的frequency, DStream(T, Long) |
| reduceByKey | 根據(jù)key分組,再對每個key的pairs應用reduce |
| join | DStream(k1, v1) join DStream(k1, v2) = DStream(k1, (v1,v2)) |
| cogroup | DStream(k1, v1) join DStream(k1, v2) = DStream(k1, Seq[v1], Seq[v2]) |
| updateStateByKey | 記錄狀態(tài)的操作,需要initial state和定義state update function,需要開啟checkpoint |
| transform | 作用于DStream里面的每一個RDD |
| windows | 基于窗寬的窗口函數(shù) |

插入Spark Structured Streaming關于窗函數(shù)的使用
在流式處理中,有兩個時間概念,
- event time,即事件發(fā)生時間,如該日志產(chǎn)生的時間
- process time,即處理事件的實際時間,一般是Streaming程序當前batch的運行時間

上圖time1, time2, time3是process time,圖中方塊中的數(shù)字代表這個event time??赡苡捎诰W(wǎng)絡抖動導致部分機器的日志收集產(chǎn)生了延遲,在time3的batch中包含了event time為2的日志。kafka中不同partition的消息也是無序的,在實時處理過程中也就產(chǎn)生了兩個問題,
- Streaming從kafka中拉取的一批數(shù)據(jù)里面可能包含多個event time的數(shù)據(jù)
- 同一event time的數(shù)據(jù)可能出現(xiàn)在多個batch interval中
Structured Streaming可以在實時數(shù)據(jù)上進行sql查詢聚合,如查看不同設備的信號量的平均大小
avgSignalDf = eventsDF
.groupby("deviceId")
.avg("signal")
進一步地,如果不是在整個數(shù)據(jù)流上做聚合,而是想在時間窗口上聚合。如查看每過去5分鐘的不同平均信號量,這里的5分鐘時間指的是event time,而不是process time,
windowedAvgSignalDF1 = eventsDF
.groupBy("deviceId", window("eventTime", "5 minute"))
.count()

更進一步要求,每5分鐘統(tǒng)計過去10分鐘內(nèi)所有設備產(chǎn)生日志的條數(shù),也是按照event time聚合,
windowedAvgSignalDF2 = eventsDF
.groupBy("deviceId", window("eventTime", "10 minute", "5 minute"))
.count()

如果一條日志因為網(wǎng)絡原因遲到了怎么辦?Structured Streaming還是會將其統(tǒng)計到屬于它的分組里面。

上面強大的有狀態(tài)功能是通過Spark Sql內(nèi)部維護一個高容錯的中間狀態(tài)存儲,key-value pairs,key就是對應分組,value就是對應每次增量統(tǒng)計后的一個聚合結(jié)果。每次增量統(tǒng)計,就對應key-value的一個新版本,狀態(tài)就從舊版本遷移到新版本,所以才認為是有狀態(tài)的。
有狀態(tài)的數(shù)據(jù)存儲在內(nèi)存中是不可靠的,spark sql內(nèi)部使用write ahead log(WAL, 預寫式日志),然后間斷的進行checkpoint。如果系統(tǒng)在某個時間點上crash了,就從最近的checkpoint點恢復,再開始使用WAL進行重放replay。checkpoint的點更新了以后,才將WAL清空clean,然后重新累積WAL,再flush到checkpoint,再clean(類似于es的translog)。

當然,streaming的數(shù)據(jù)源是一個流,這個數(shù)據(jù)是無限的,為了資源和性能考慮,只能保存有限的狀態(tài)。即落后多久以后的數(shù)據(jù),即便來了,系統(tǒng)也不要了,watermarking概念就是用來定義這個等待時間。例如,如果系統(tǒng)最大延遲是10分鐘,意味著event time落后process time 10分鐘內(nèi)的日志會被拿來使用;如果超出10分鐘,該日志就會被丟棄。如現(xiàn)在process time = 12:33,那么12:23之前的key-value pair的狀態(tài)就不會再有改變,也就可以不用維護其狀態(tài)了。
windowedAvgSignalDF4 = eventsDF
.withWatermark("eventTime", "10 minutes")
.groupBy("deviceId", window("eventTime", "10 minute", "5 minute"))
.count()

x軸是process time,y軸是event time。然后有一條動態(tài)的水位線,如果在水位線下面的日志,Streaming系統(tǒng)就丟棄。
Output Operations on DStreams
將DStream推送至外部系統(tǒng),db,hdfs。是action,會trigger the actual execution of all the DStream transformations
| Output Operation | Meaning |
|---|---|
| 在driver端打印每個batch的前10個元素 | |
| saveAsTextFiles | 保存DStream內(nèi)容為文本文件 |
| saveAsObjectFiles | 保存DStream內(nèi)容為序列化對象文件 |
| saveAsHadoopFiles | 保存為hdfs文件 |
| foreachRDD | 作用于DStream里面的所有RDD,需要里面包含RDD的action算子才會被執(zhí)行 |
其中foreachRDD常用于寫DStream內(nèi)容到外部DB中,需要用到網(wǎng)絡連接,示例如下,

上面的是錯誤實例,因為connection產(chǎn)生在driver,但connection不能序列化到executor,所以
connection.send(record)報錯。

上面是不推薦方式,因為需要為DStream里面的每一個元素都產(chǎn)生和銷毀connection,而產(chǎn)生和銷毀connection是昂貴的操作。

上面的方式,為每個rdd的partition產(chǎn)生一個connection,該connection產(chǎn)生于executor,可以用于send數(shù)據(jù)。

上面的方式,有別于推薦方式1,利用連接池概念,每一個batch interval都可以重復利用這些connection(后續(xù)的每個batch都會利用該連接池,而非后續(xù)batch一直new connection下去)。連接池要求懶加載和設置超時,具體可以參考這個stackoverflow answer。
注意,
- 如果Streaming程序沒有output operation,或者有output operation但是里面沒有RDD的action算子,那么DSTream不會被執(zhí)行。系統(tǒng)僅僅接收數(shù)據(jù),然后丟棄之
- 默認情況下,output operation是串行執(zhí)行
DataFrame and SQL Operations
DStream可以使用core、sql、mllib
MLlib Operations
DStream可以使用core、sql、mllib,eg. StreamingLinearRegressionWithSGD
Caching/ Persistence
DStream.persist()可以持久化DStream里面的每一個RDD。其中reduceByWindow、reduceByKeyAndWindow、updateStateByKey是隱式帶上持久化的,不需要顯式調(diào)用persist()。
Checkpointing
為了解決24/7程序的容錯問題,需要checkpoint(cp)兩類數(shù)據(jù),
- Metadata,包括configuration,DStream operations,Incomplete batches。一般用于driver的恢復。
- RDDs,將生成的rdd保存到cp點,為了減少rdd lineage鏈的長度,也便于快速恢復
需要開啟cp的應用場景,
- driver需要自動恢復的場景
- 帶狀態(tài)轉(zhuǎn)換算子(stateful transformations);需要組合多個batch的數(shù)據(jù),如窗函數(shù),stateUpdateFunc
如何開啟cp,
- 設置cp目錄(用于帶狀態(tài)轉(zhuǎn)換算子)
- 設置functionToCreateContext(用于driver恢復)

cp的間隔時間需要謹慎設置,太頻繁會影響性能;相反太久會導致lineage鏈和task size太大。dstream.checkpoint(checkpointInterval),一般是窗寬的5到10倍比較好。
Accumulators, Broadcast Variables, and Checkpoints
累加器和廣播變量不能從cp中恢復,但是通過lazily instantiated singleton instances單例懶加載可以從cp中重新實例化。
Deploying Applications
Streaming應用的部署
Requirements
- 帶管理者的集群
- 編譯code為jar包
- 為executors分配足夠的內(nèi)存,received data must be stored in memory。如果窗寬是10分鐘,那么系統(tǒng)必須支持將不少于10分鐘的數(shù)據(jù)保存在內(nèi)存中
- 設置checkpoint,如果需要
- 配置driver的自動恢復,如果需要
- 配置WAL,如果需要,接收到的數(shù)據(jù)會先預寫到cp點,這可能會降低系統(tǒng)吞吐量,但是可以通過并行多個receiver來緩解。另外,開啟了WAL,那么spark的replication建議設置為0。
spark.streaming.receiver.writeAheadLog.enable,MEMORY_AND_DISK_SER_2 - 設置最大接收速率,防止process time大于batch interval,導致數(shù)據(jù)堆積,
spark.streaming.receiver.maxRate、spark.streaming.kafka.maxRatePerPartition。也可以開啟反壓機制來自動控速,spark.streaming.backpressure.enabled
Upgrading Application Code
如果需要更新running狀態(tài)的streaming程序的代碼或者配置,
- 新程序與舊程序同時運行,然后等新程序ready之后,kill掉舊程序。注意下游是否符合滿足冪等操作;否則需要設置兩個不同的output路徑,將數(shù)據(jù)發(fā)送到兩個不同的目的地(新舊各一個)
- 平滑關閉舊程序(不再接收新數(shù)據(jù),但是已接收的數(shù)據(jù)會處理完),然后啟動新程序接著舊程序的點開始處理。如果是帶狀態(tài)/窗寬大于batch interval的話,利用cp來恢復?如果不需要記錄狀態(tài)/窗寬,可以使用另外的cp目錄或者刪除舊cp目錄
Monitoring Applications
- Processing Time < Batch Interval 才算正常
- Scheduling Delay 越小越好

- In Input Rate row, you can show and hide details of each input stream
- Scheduling Delay is the time spent from when the collection of streaming jobs for a batch was submitted to when the first streaming job was started
- Processing Time is the time spent to complete all the streaming jobs of a batch
- Batch interval is user defined. such as 10s, 5s, 1s, etc.
- Total Delay is the time spent from submitting to complete all jobs of a batch
- Active Batches section presents waitingBatches and runningBatches together
- Completed Batches section presents retained completed batches (using completedBatchUIData)

Performance Tuning
- 減少每個batch interval的Processing Time
- 設置正確的batch size(每個batch interval的數(shù)據(jù)量大?。?/li>
Reducing the Batch Processing Times
Level of Parallelism in Data Receiving
- 創(chuàng)建多個receiver,并行接收單個source的數(shù)據(jù)或者多個source的數(shù)據(jù)
- 減少block interval,接收數(shù)據(jù)在存入spark前,是合并成一個個block的,一個batch interval里面的#block = batch interval/ block interval * #receiver,而#block = #task,task數(shù)量決定了processing的并行度
spark.streaming.blockInterval - 如果不設置block interval,可以使用repartition來設置并行度,但是所引起的shuffle耗時需要引起注意
Level of Parallelism in Data Processing
如果parallel task不足,那么core利用率不高。通過提高默認并行度來加速spark.default.parallelism,task數(shù)量也不宜過多,太多了,task的序列化與反序列化耗時也更高,適得其反。建議是#executors * #core_per_executor * 4
Data Serialization
- XXX_SER,使用帶序列化的持久化策略,數(shù)據(jù)序列化為字節(jié)數(shù)組以減少GC耗時
- 使用Kryo的序列化方式,需要注冊自定義類
- 在batch size不大的情況下,可以關閉序列化策略,這樣可以減少CPU的序列化與反序列化耗時
Task Launching Overheads
任務數(shù)不宜過多,driver發(fā)送任務也需耗時。
Setting the Right Batch Interval
一般以5~10s為初始值,然后觀察Streaming UI的Scheduling Delay和Processing time來調(diào)整。
Memory Tuning
內(nèi)存用量與GC策略的調(diào)優(yōu),
- XXX_SER這樣的帶序列化性質(zhì)的持久化策略有利于降低內(nèi)存用量與降低GC耗時,另外
spark.rdd.compress可以進一步降低內(nèi)存用量,但是CPU耗時會升高 - 清理舊數(shù)據(jù),Streaming程序會自動清理所有的輸入原數(shù)據(jù)與持久化過的RDDs。清理周期取決于該batch interval數(shù)據(jù)的使用時長(如窗寬/stateful),另外可以設置
streamingContext.remember來保存更長時間 - CMS收集器或者G1收集器
- 用堆外內(nèi)存來持久化RDDs,堆外沒有GC
- 使用more executors with small heap來替代less executors with large heap,heap小有助于GC快速回收
注意事項
- 一個DStream與一個receiver關聯(lián),為了增加系統(tǒng)吞吐量,可以增加receiver數(shù)量,而一個receiver占用一個core
- receiver接收到數(shù)據(jù)之后會產(chǎn)生一個個的block,每一個block interval都會產(chǎn)生一個新的block,在一個batch interval里,一共產(chǎn)生了N個block,N=batch interval/ block interval,N也即task數(shù)量,與Processing的并行度相關聯(lián)
- 如果block interval == batch interval,那么就會產(chǎn)生一個task,一個partition,并且很可能會在本地就被處理
- 更大的block interval,意味著更大的block數(shù)據(jù)塊,更高的
spark.locality.wait可以增加該任務slot的數(shù)據(jù)本地性的命中概率,但是等待時間也可能更高(PROCESS_LOCAL -> NODE_LOCAL -> RACK_LOCAL -> ANY) - 如果有多個DStreams,那么根據(jù)job是串行執(zhí)行的性質(zhì),會先處理第一個DStream,再處理另一個DStream,這樣不利于并行化,可以通過union來避免,這樣unionDStream被視為一個job而已
-
spark.streaming.receiver.maxRate來限制讀取source的速率,避免Processing Time大于batch interval,否則executor的內(nèi)存終會爆掉
Fault-tolerance Semantics
容錯語義
Background
RDD是不可變、明確可重復計算的、分布式的數(shù)據(jù)集合。每個RDD會記錄其確定性的操作血統(tǒng)lineage,這個血統(tǒng)用于在容錯的輸入數(shù)據(jù)集上恢復該RDD。
為了spark內(nèi)部產(chǎn)生的RDDs高容錯,設置replication,然后將該RDDs及其副本分發(fā)到不同的executor上。如果產(chǎn)生crash,那么有兩類數(shù)據(jù)恢復途徑,
- 從副本恢復
- 沒有副本的話,從數(shù)據(jù)源恢復,再根據(jù)lineage rebuild該RDD
這兩類錯誤需要關注,
- executor failure,executor里面的in-memory數(shù)據(jù)會lost
- driver failure,SparkContext會lost,然后所有executors的in-memory數(shù)據(jù)也會lost
Definitions
- at most once, 最多被執(zhí)行一次
- at least once, 至少被執(zhí)行一次
- exactly once, 有且僅有被執(zhí)行一次
Basic Semantics
每一個Streaming程序都可以分為三步,
- receiving the data
- transforming the data
- pushing out the data
如果一個系統(tǒng)要實現(xiàn)端到端的exactly once語義,那么上面三步的每一步都要保證是exactly once的。
Semantics of Received Data
- files
- reliable receiver, with ack
- unreliable receiver, without ack
- direct kafka api (1.3+),所有接收到的kafka數(shù)據(jù)都是exactly once的
為了避免丟失過去接收過的數(shù)據(jù),Spark引入了WAL,負責將接收到的數(shù)據(jù)保存到cp/log中,有了WAL和reliable receiver,我們可以做到零數(shù)據(jù)丟失和exactly once語義

Semantics of output operations
output operation輸出算子,如foreachRDD是at least once語義的,即同一份transformed數(shù)據(jù)在woker failure的情況下,可能會被多次寫入外部DB系統(tǒng),為了實現(xiàn)其exactly once語義,有以下做法,
- 冪等操作,如
saveAs***Files將數(shù)據(jù)保存到hdfs中,可以容忍被寫多次的,因為文件會被相同的數(shù)據(jù)覆蓋?如果兩個job同時寫一份數(shù)據(jù)呢?(不能,因為job串行。如果是開啟了speculation呢?) - 事務性的更新,利用一個唯一標識來控制輸出操作
val uniqueId = generateUniqueId(time.milliseconds, TaskContext.get.partitionId())