內(nèi)容
- sparkStreaming簡介
- spark Streaming和Storm區(qū)別
- Spark Streaming算子
- Spark checkpointing
- Spark和kafka整合
Spark Streaming
- SparkStreaming是一種流式處理框架,是SparkAPI的擴展,支持可擴展、高吞吐、容錯的準實時數(shù)據(jù)處理,實時數(shù)據(jù)的來源可以事:
- Kafka、Flume、Twitter,zeroMQ或者TCP sockets;
- 并且可以使用高級功能的復(fù)雜算子來處理流數(shù)據(jù),例如:map,reduce,jion,window;最終處理后的數(shù)據(jù)可以來存放在文件系統(tǒng),數(shù)據(jù)庫,方便實時展現(xiàn);

SparkStreaming與Storm的區(qū)別
- Storm是純實時的流式處理框架,SparkStreaming是準實時的處理框架(微批處理);因為微批處理,SparkStreaming的吞吐量比Storm要高;
- Storm的事務(wù)機制要比SparkStreaming的要完善;
- Storm支持的動態(tài)資源調(diào)度(Spark1.2及以后也支持)
- SparkStreaming擅長復(fù)雜的業(yè)務(wù)處理,Storm不擅長復(fù)雜的業(yè)務(wù)處理,擅長簡單的匯總計算
SparkStreaming初始
SparkStreaming初始理解
Spark的各個子框架,都基于核心Spark的,SparkStreaming在內(nèi)部的機制如下:SparkStreaming接收到實時input數(shù)據(jù)流,并將數(shù)據(jù)分批成batch,然后由Spark Engine進行處理最后生成batch的輸出結(jié)果流

Spark Streaming將產(chǎn)生高度分離的數(shù)據(jù)流叫DStream(discretized Stream);DStream既可以從輸入數(shù)據(jù)源創(chuàng)建得來,(如Kafka、Flume或者Kinesis)也可以從其他DStream經(jīng)過一些算子操作得來;
在內(nèi)部,一個DStream就包含一系列的RDDs
(對python來說,有部分API尚不支持,或者和Scala、Java不同

- receiver task是一直在執(zhí)行的,一直在接受數(shù)據(jù),將一段時間內(nèi)接收到的數(shù)據(jù)保存到batch中(默認為5秒)那么會將接受來的數(shù)據(jù)每隔5秒封裝到一個batch中,batch沒有分布式計算的特性,這一個batch的數(shù)據(jù)又被封裝到一個RDD中最終封裝到一個DStream中
- 例如:
- 假設(shè)批處理間隔(batchInterval)為5秒,每隔5秒通過SparkStreaming將得到的一個DStream,在第6秒的時候開始計算這個DStream;假設(shè)執(zhí)行任務(wù)的時間事3秒,那么第6-9秒一邊接受數(shù)據(jù),一邊計算任務(wù),一邊在計算任務(wù),9~10秒只是在接收數(shù)據(jù)。然后在第11秒的時候重復(fù)上面的操作
- 如果job的執(zhí)行的時間大于批處理間隔,接收到的數(shù)據(jù)會越積越多,最好可能導(dǎo)致OOM;
DStream 它表示的事連續(xù)數(shù)據(jù)流,可以是源數(shù)據(jù)接收的輸入流,也可以事通過轉(zhuǎn)換輸入流生成的已處理的數(shù)據(jù)流;在內(nèi)部,DStream由一系列的RDD組成;DStream中的每個RDD都包含來自特定間隔的數(shù)據(jù)

任何作用于DStream的算子,其實都會被轉(zhuǎn)化為對其內(nèi)部RDD的操作。例如,我們將 lines 這個DStream轉(zhuǎn)成words DStream對象,其實作用于lines上的flatMap算子,會施加于lines中的每個RDD上,并生成新的對應(yīng)的RDD,而這些新生成的RDD對象就組成了words這個DStream對象。其過程如下圖所示

Spark Streaming主要的兩種內(nèi)建的流式數(shù)據(jù)源:
- 基礎(chǔ)數(shù)據(jù)源:在StreamingContext API中可以直接使用源:如文件系統(tǒng),套接字連接或者Akka actor
- 高級數(shù)據(jù)源:需要依賴額外工具類的源,如:Kafka、Flume;
SparkStreaming算子
- foreachRDD
- output operation算子,必須對抽去出來的RDD執(zhí)行action類算子,代碼才能執(zhí)行
- transfor
- transformation類算子
- 可以通過transform算子,對Dstream做RDD到RDD的任意操作
- updateStateByKey
- 為SparkStreaming中的每一個key維護一份state狀態(tài),state類型可以事任意類型,可以是一個自定義的對象,更新函數(shù)也是自定義的
- 通過更新函數(shù)對該key的狀態(tài)不斷更新,對每個新的batch而言,SparkStreaming會在使用updateStateByKey的時候為已經(jīng)存在的key進行state的狀態(tài)更新;
- 使用updateStateByKey要開啟checkpoint機制和功能
- 如果批處理間隔設(shè)置的時間小于10,那么10秒寫入磁盤一份
- 如果批處理間隔設(shè)置的大于10秒,就會在批處理間隔時間寫入磁盤一份
窗口操作(Window Operations)
Spark Streaming提供了窗口操作,你可以在滑動的窗口對數(shù)據(jù)使用transformations算子進行操作

每次窗口滑動時,DStream中落入窗口的RDD就會被合并計算成新的windowed DStream
- 參數(shù):
- window length:窗口覆蓋的時間長度
- sliding interval:窗口啟動的時間間隔
- 注意 : 這兩個參數(shù)必須事batchInterval的整數(shù)倍;
窗口優(yōu)化

優(yōu)化后的window操作要保存狀態(tài)所以要設(shè)置checkpoint路徑,沒有優(yōu)化的window操作可以不設(shè)置chekpoint路徑
//Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
Driver HA(Standalone或者Mesos)
因為SparkStream是7*24小時運行的,Driver只是一個簡單的進程,有可能掛掉,所以實現(xiàn)Driver的HA就又必要(如果使用Client模式就無法實現(xiàn)DriverHA)Yarn平臺cluster模式提交任務(wù),AM(AplicationMaster)相當于Driver,如果掛掉會自動啟動AM。這里所說的DriverHA針對的是Spark standalone和Mesos資源調(diào)度的情況下。實現(xiàn)Driver的高可用有兩個步驟:
- 提交任務(wù)層面,在提交任務(wù)的時候加上選項 --supervise,當Driver掛掉的時候會自動重啟Driver;
- 代碼層面,使用JavaStreamingContex。getOrCreate(checkpoint)路徑,JavaStreamingContextFactory
- Driver中元數(shù)據(jù)包括:
- 創(chuàng)建應(yīng)用程序的配置信息;
- DStream的操作邏輯;
- job中沒有完成的批次數(shù)據(jù),也就是job的執(zhí)行進度
- Driver中元數(shù)據(jù)包括:
SparkStreaming2.2以前+Kafka
receiver模式
在SparkStreaming程序運行后,Executor中會又receiver task接收kafka推送過來的數(shù)據(jù),數(shù)據(jù)會被持久化,默認級別為MEMORY_AND_DISK_SER_2,這個級別可以修改;receiver task對接收到的數(shù)據(jù)進行存儲和備份,這個過程會又節(jié)點之間的數(shù)據(jù)傳輸,備份完成后zookeeper中更新偏移量,然后向Driver中的receiver tracket匯報數(shù)據(jù)的位置,最后Driver根據(jù)數(shù)據(jù)本地化將task分發(fā)到不同節(jié)點上執(zhí)行;

- receiver模式采用了Receiver接收器模式,需要一個線程一直接受數(shù)據(jù),將數(shù)據(jù)接收到Executor中,默認存儲級別是MEMORY_AND_DISK_SER_2
- receiver模式自動使用zookeeper管理消費者offset
- receiver模式底層讀取kafka采用High Level Consumer API實現(xiàn),這種模式不關(guān)心offset,只要數(shù)據(jù);
- receiver模式當Driver掛掉時有數(shù)據(jù)丟失問題,可以開啟WAL機制,避免丟失數(shù)據(jù),但是開啟后加大數(shù)據(jù)延遲,并存在數(shù)據(jù)重復(fù)消費等風(fēng)險;
- receiver模式并行度由spark.stream.blockInterbal=200ms,可以減少這個參數(shù)增大并行度,最小不能低于50ms;
- Receiver模式不被使用
- 被動將數(shù)據(jù)接受到Executor,當有任務(wù)堆積實,數(shù)據(jù)存在問題
- 這種模式不能手動維護消費者offset
Direct模式

無接收器(receiver-less)的直接(driect)方式,以確保更強的端對端傳輸;該方法不需要接收器來接收數(shù)據(jù),而是定期向kafka查詢每個topic和partition中的最新偏移量,并且相應(yīng)定義了每個批次要處理的偏移量范圍;
- direct模式?jīng)]有receiver,每批次處理數(shù)據(jù)直接獲取當前批次數(shù)據(jù)處理;
- direct模式?jīng)]有使用zookeeper管理消費者offset,使用的是Spark自己管理,默認存在內(nèi)存中,也可以設(shè)置checkpoint,也會保存到checkpoint中一份;
- 簡化并行性:無需創(chuàng)建多個Kafka流并將它們合并,使用directStream,Spark Streaming將創(chuàng)建(partition)與要使用的Kafka分區(qū)(topic)一樣多的RDD分區(qū),這些分區(qū)一一對應(yīng);
- direct模式底層讀取kafka使用Simple Consumer API,可以手動維護消費者offset
- 可以使用設(shè)置chechpoint的方式管理消費著offset,使用StreamingContext.getOrCreate(ckDir,CreateStreamingContext)恢復(fù)
- 當代碼邏輯改變時,無法從checkpoint來恢復(fù)offset
- 將偏移量輸出到外部系統(tǒng),如redis,hbase
- 當從checkpoint中恢復(fù)數(shù)據(jù)時,有可能造成重復(fù)的消費,需要我們寫代碼來保證數(shù)據(jù)的輸出冪等;
- 保證輸出的冪等性或使用事務(wù)
- 當代碼邏輯改變時,無法從checkpoint來恢復(fù)offset
Kafka0.11
kafka0.8.2消費者offset存儲在zookeeper中,對于zookeeper而言每次操作代價很昂貴的,而且zookeeper集群實不能擴展寫能力,kafka0.11版本默認使用新等消費者api,消費者offset會更新到一個kafka自帶等topic[__consumer_offsets]中
SparkStream2.3 + Kafka0.11
- 丟掉receiver模式
- 采用了新消費者api實現(xiàn):類似于0.8中等Diect Stream方式;(API未來可能會發(fā)生更改)
LocationStrategies(路由策略):
- 新等kafka api將消息預(yù)存到緩沖區(qū)中,因此出于性能考慮,Spark將消費者產(chǎn)生等消息存在在excutors上
- 大多數(shù)情況下,SparkStreaming讀取數(shù)據(jù)使用LocationStrategies.PreferConsistent,可以在可用的Executor平均分配分區(qū);
- 如果你Executor和Kafka代理在同一臺主機上,請使用PreferBrokers,它將首選在Kafka leader上為數(shù)據(jù)計劃分區(qū),如果分區(qū)間等復(fù)合存在明顯差異等話請使用PreferFixed,可以通過一個map(explicit mapping 顯式映射)將topic分區(qū)指定到對應(yīng)哪些主機(hosts);
- 默認等消費者緩存大小為64Kb,默認緩存在Executor中:
- spark.streaming.kafka.consumer.cache.maxCapacity :增大緩存
- spark.streaming.kafka.consumer.cache.enabled : 設(shè)置成false關(guān)閉緩存機制
消費者策略(ConsumerStrategies.Subscribe)
即使從checkpoint重新啟動,spark也可以獲取正確配置等消費者
- ConsumerStrategies.Subscribe:允許你訂閱topics的固定collection,
- SubscribePattern運行你使用正則表達式來指定特定的topics;
- 與0.8不同在運行期間使用Subscribe或SubscribePattern應(yīng)該響應(yīng)(respond)添加的分區(qū)
- Assign:允許指定固定等分區(qū)集合;
偏移量(Offset)
-
如果設(shè)置了checkpoint,offset就會存儲在checkpoint中:
- 當代碼邏輯改變實,無法從checkpoint中恢復(fù)offset;
- 當從checkpoint中恢復(fù)數(shù)據(jù)時,有可能造成重復(fù)等消費,需要我們寫代碼來保證輸出冪等
-
依靠kafka來存儲消費者offset,kafka中有一個特殊等topic來存儲消費者offset,新的消費者api中,會定期提交offset,自動提交offset等頻率由參數(shù)auto.commit.interval.ms決定,默認5s;為了保證消費者數(shù)據(jù)的精準性,我們可以關(guān)閉自動提交,改成異步等手動提交消費者offset;
- offset存儲在kafka中由參數(shù)offsets.retention.minutes=1440控制是否過期參數(shù)(60247=1440,默認7天刪除),如果停機沒有消費達到時長,存儲在kafka中等消費者組會被清空連同offset;
- 無法保證有且僅有一次語義,因為偏移量等提交時異步的,所以結(jié)果等輸出依然要自己實現(xiàn)冪等性;
自己存儲offset:處理邏輯時,保存數(shù)據(jù)處理等事務(wù);如果在失敗等情況下,就不保存offset,處理成功則保存offset,這樣就可以做到數(shù)據(jù)等一致性;