1. Spark Streaming概述
1.1 什么是Spark Streaming

Spark Streaming類似于Apache Storm,用于流式數(shù)據(jù)的處理。根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象語言的語法如:map、reduce、join、window等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機(jī)器學(xué)習(xí))以及Graphx完美融合。

和Spark基于RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時(shí)間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此 得名“離散化”)。?

DStream 可以從各種輸入源創(chuàng)建,比如 Flume、Kafka 或者 HDFS。創(chuàng)建出來的DStream 支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會(huì)生成一個(gè)新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream 提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時(shí)間相關(guān)的新操作,比如滑動(dòng)窗口。
1.2 Spark Streaming的特點(diǎn)
易用?

容錯(cuò)?

易整合到Spark體系?

1.3 Spark 與 Storm 對比
1.3.1 對比
對比點(diǎn)StormSpark Streaming
實(shí)時(shí)計(jì)算模型純實(shí)時(shí),來一條數(shù)據(jù),處理一條數(shù)據(jù)準(zhǔn)實(shí)時(shí),對一個(gè)時(shí)間段內(nèi)的數(shù)據(jù)收集起來,作為一個(gè)RDD,再處理
實(shí)時(shí)計(jì)算延遲度毫秒級秒級
吞吐量低高
事務(wù)機(jī)制支持完善支持,但不夠完善
健壯性 / 容錯(cuò)性ZooKeeper,Acker,非常強(qiáng)Checkpoint,WAL,一般
動(dòng)態(tài)調(diào)整并行度支持不支持
1.3.2 Spark Streaming與Storm的應(yīng)用場景
Storm
建議在那種需要純實(shí)時(shí),不能忍受1秒以上延遲的場景下使用,比如實(shí)時(shí)金融系統(tǒng),要求純實(shí)時(shí)進(jìn)行金融交易和分析
此外,如果對于實(shí)時(shí)計(jì)算的功能中,要求可靠的事務(wù)機(jī)制和可靠性機(jī)制,即數(shù)據(jù)的處理完全精準(zhǔn),一條也不能多,一條也不能少,也可以考慮使用Storm
如果還需要針對高峰低峰時(shí)間段,動(dòng)態(tài)調(diào)整實(shí)時(shí)計(jì)算程序的并行度,以最大限度利用集群資源(通常是在小型公司,集群資源緊張的情況),也可以考慮用Storm
如果一個(gè)大數(shù)據(jù)應(yīng)用系統(tǒng),它就是純粹的實(shí)時(shí)計(jì)算,不需要在中間執(zhí)行SQL交互式查詢、復(fù)雜的transformation算子等,那么用Storm是比較好的選擇
Spark Streaming
如果對上述適用于Storm的三點(diǎn),一條都不滿足的實(shí)時(shí)場景,即,不要求純實(shí)時(shí),不要求強(qiáng)大可靠的事務(wù)機(jī)制,不要求動(dòng)態(tài)調(diào)整并行度,那么可以考慮使用Spark Streaming
考慮使用Spark Streaming最主要的一個(gè)因素,應(yīng)該是針對整個(gè)項(xiàng)目進(jìn)行宏觀的考慮,即,如果一個(gè)項(xiàng)目除了實(shí)時(shí)計(jì)算之外,還包括了離線批處理、交互式查詢等業(yè)務(wù)功能,而且實(shí)時(shí)計(jì)算中,可能還會(huì)牽扯到高延遲批處理、交互式查詢等功能,那么就應(yīng)該首選Spark生態(tài),用Spark Core開發(fā)離線批處理,用Spark SQL開發(fā)交互式查詢,用Spark Streaming開發(fā)實(shí)時(shí)計(jì)算,三者可以無縫整合,給系統(tǒng)提供非常高的可擴(kuò)展性
1.3.3 Spark Streaming與Storm的優(yōu)劣分析
事實(shí)上,Spark Streaming絕對談不上比Storm優(yōu)秀。這兩個(gè)框架在實(shí)時(shí)計(jì)算領(lǐng)域中,都很優(yōu)秀,只是擅長的細(xì)分場景并不相同。?
Spark Streaming僅僅在吞吐量上比Storm要優(yōu)秀,而吞吐量這一點(diǎn),也是歷來挺Spark Streaming,貶Storm的人著重強(qiáng)調(diào)的。但是問題是,是不是在所有的實(shí)時(shí)計(jì)算場景下,都那么注重吞吐量?不盡然。因此,通過吞吐量說Spark Streaming強(qiáng)于Storm,不靠譜。?
事實(shí)上,Storm在實(shí)時(shí)延遲度上,比Spark Streaming就好多了,前者是純實(shí)時(shí),后者是準(zhǔn)實(shí)時(shí)。而且,Storm的事務(wù)機(jī)制、健壯性 / 容錯(cuò)性、動(dòng)態(tài)調(diào)整并行度等特性,都要比Spark Streaming更加優(yōu)秀。?
Spark Streaming,有一點(diǎn)是Storm絕對比不上的,就是:它位于Spark生態(tài)技術(shù)棧中,因此Spark Streaming可以和Spark Core、Spark SQL無縫整合,也就意味著,我們可以對實(shí)時(shí)處理出來的中間數(shù)據(jù),立即在程序中無縫進(jìn)行延遲批處理、交互式查詢等操作。這個(gè)特點(diǎn)大大增強(qiáng)了Spark Streaming的優(yōu)勢和功能。
1.4 Spark Streaming關(guān)鍵抽象
Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示連續(xù)的數(shù)據(jù)流,可以是從源接收的輸入數(shù)據(jù)流,也可以是通過轉(zhuǎn)換輸入流生成的已處理數(shù)據(jù)流。在內(nèi)部,DStream由一系列連續(xù)的RDD表示,這是Spark對不可變分布式數(shù)據(jù)集的抽象。DStream中的每個(gè)RDD都包含來自特定時(shí)間間隔的數(shù)據(jù),如下圖所示。?

應(yīng)用于DStream的任何操作都轉(zhuǎn)換為底層RDD上的操作。例如,在先前將行流轉(zhuǎn)換為字的示例中,flatMap操作應(yīng)用于linesDStream中的每個(gè)RDD 以生成DStream的 wordsRDD。如下圖所示。?

Spark Streaming接收實(shí)時(shí)輸入數(shù)據(jù)流并將數(shù)據(jù)分成批處理,然后由Spark引擎處理,以批量生成最終結(jié)果流。?

1.5 Spark Streaming 架構(gòu)
Spark Streaming使用“微批次”的架構(gòu),把流式計(jì)算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時(shí)間間隔創(chuàng)建出來。在每個(gè)時(shí)間區(qū)間開始的時(shí)候,一個(gè)新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí),批次停止增長。時(shí)間區(qū)間的大小是由批次間隔這個(gè)參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由應(yīng)用開發(fā)者配置。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。高層次的架構(gòu)如圖?

Spark Streaming在Spark的驅(qū)動(dòng)器程序—工作節(jié)點(diǎn)的結(jié)構(gòu)的執(zhí)行過程如下圖所示。Spark Streaming為每個(gè)輸入源啟動(dòng)對 應(yīng)的接收器。接收器以任務(wù)的形式運(yùn)行在應(yīng)用的執(zhí)行器進(jìn)程中,從輸入源收集數(shù)據(jù)并保存為 RDD。它們收集到輸入數(shù)據(jù)后會(huì)把數(shù)據(jù)復(fù)制到另一個(gè)執(zhí)行器進(jìn)程來保障容錯(cuò)性(默 認(rèn)行為)。數(shù)據(jù)保存在執(zhí)行器進(jìn)程的內(nèi)存中,和緩存 RDD 的方式一樣。驅(qū)動(dòng)器程序中的 StreamingContext 會(huì)周期性地運(yùn)行 Spark 作業(yè)來處理這些數(shù)據(jù),把數(shù)據(jù)與之前時(shí)間區(qū)間中的 RDD 進(jìn)行整合。?

1.6 背壓機(jī)制
???????默認(rèn)情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計(jì)算過程中會(huì)出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實(shí)際計(jì)算一個(gè)批次花費(fèi)時(shí)間, batch interval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊(duì)列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時(shí)間,會(huì)造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會(huì)溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來實(shí)現(xiàn),此舉雖然可以通過限制接收速率,來適配當(dāng)前的處理能力,防止內(nèi)存溢出,但也會(huì)引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當(dāng)前集群處理能力也高于maxRate,這就會(huì)造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機(jī)制(back-pressure),通過動(dòng)態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。
???????背壓機(jī)制: 根據(jù)JobScheduler反饋?zhàn)鳂I(yè)的執(zhí)行信息來動(dòng)態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機(jī)制,默認(rèn)值false,即不啟用。
在原架構(gòu)的基礎(chǔ)上加上一個(gè)新的組件RateController,這個(gè)組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted ”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).?

流量控制點(diǎn)
當(dāng)Receiver開始接收數(shù)據(jù)時(shí),會(huì)通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時(shí)將數(shù)據(jù)取走,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時(shí),要獲取許可(令牌)。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞,進(jìn)而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。?
其令牌投放采用令牌桶機(jī)制進(jìn)行, 原理如下圖所示:?

令牌桶機(jī)制
大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會(huì)不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會(huì)從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會(huì)超過桶的大小。當(dāng)進(jìn)行某操作時(shí)需要令牌時(shí)會(huì)從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作,否則阻塞。用完之后不用放回。
2. Spark Streaming 簡單應(yīng)用
2.1 安裝Telnet向端口發(fā)送消息
2.2 使用SparkStreaming監(jiān)控端口數(shù)據(jù)展示到控制臺(tái)
3. DStream 的輸入
Spark Streaming原生支持一些不同的數(shù)據(jù)源。一些“核心”數(shù)據(jù)源已經(jīng)被打包到Spark Streaming 的 Maven 工件中,而其他的一些則可以通過 spark-streaming-kafka 等附加工件獲取。每個(gè)接收器都以 Spark 執(zhí)行器程序中一個(gè)長期運(yùn)行的任務(wù)的形式運(yùn)行,因此會(huì)占據(jù)分配給應(yīng)用的 CPU 核心。此外,我們還需要有可用的 CPU 核心來處理數(shù)據(jù)。這意味著如果要運(yùn)行多個(gè)接收器,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計(jì)算所需要的核心數(shù)。例如,如果我們想要在流計(jì)算應(yīng)用中運(yùn)行 10 個(gè)接收器,那么至少需要為應(yīng)用分配 11 個(gè) CPU 核心。所以如果在本地模式運(yùn)行,不要使用local或者local[1]。?

3.1 文件數(shù)據(jù)源
文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進(jìn)行讀取?
Spark Streaming 將會(huì)監(jiān)控 dataDirectory 目錄并不斷處理移動(dòng)進(jìn)來的文件,記住目前不支持嵌套目錄。
文件需要有相同的數(shù)據(jù)格式
文件進(jìn)入 dataDirectory的方式需要通過移動(dòng)或者重命名來實(shí)現(xiàn)。
一旦文件移動(dòng)進(jìn)目錄,則不能再修改,即便修改了也不會(huì)讀取新數(shù)據(jù)。?
如果文件比較簡單,則可以使用 streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不需要接收器,不需要單獨(dú)分配CPU核。
案例實(shí)操
## 導(dǎo)入相應(yīng)的jar包
scala> import org.apache.spark.streaming._
## 創(chuàng)建StreamingContext操作對象
scala> val ssc = new StreamingContext(sc,Seconds(5))
scala> val lines = ssc.textFileStream("hdfs://master:9000/spark/data")
scala> val wordCount = lines.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_)
scala> wordCount.print
scala> ssc.start
3.2 自定義數(shù)據(jù)源
通過繼承Receiver,并實(shí)現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。
案例實(shí)操
3.3 RDD隊(duì)列
可以通過使用streamingContext.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個(gè)推送到這個(gè)隊(duì)列中的RDD,都會(huì)作為一個(gè)DStream處理。
案例實(shí)操
3.4 Kafka
Spark 與 Kafka集成指南?


下面我們進(jìn)行一個(gè)實(shí)例,演示SparkStreaming如何從Kafka讀取消息,如果通過連接池方法把消息處理完成后再寫會(huì)Kafka?

整合
1.引入jar包依賴
? ? org.apache.spark
? ? spark-streaming-kafka-0-10_2.11
? ? ${spark.version}
2.代碼編寫
//Stream2Kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Stream2Kafka extends App {
? //創(chuàng)建配置對象
? val conf = new SparkConf().setAppName("kafka").setMaster("local[3]")
? //創(chuàng)建SparkStreaming操作對象
? val ssc = new StreamingContext(conf,Seconds(5))
? //連接Kafka就需要Topic
? //輸入的topic
? val fromTopic = "source"
? //輸出的Topic
? val toTopic = "target"
? //創(chuàng)建brokers的地址
? val brokers = "master:9092,slave1:9092,slave3:9092,slave2:9092"
? //Kafka消費(fèi)者配置對象
? val kafkaParams = Map[String, Object](
? ? //用于初始化鏈接到集群的地址
? ? ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
? ? //Key與VALUE的序列化類型
? ? ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
? ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
? ? //用于標(biāo)識這個(gè)消費(fèi)者屬于哪個(gè)消費(fèi)團(tuán)體
? ? ConsumerConfig.GROUP_ID_CONFIG->"kafka",
? ? //如果沒有初始化偏移量或者當(dāng)前的偏移量不存在任何服務(wù)器上,可以使用這個(gè)配置屬性
? ? //可以使用這個(gè)配置,latest自動(dòng)重置偏移量為最新的偏移量
? ? ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"latest",
? ? //如果是true,則這個(gè)消費(fèi)者的偏移量會(huì)在后臺(tái)自動(dòng)提交
? ? ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->(false: java.lang.Boolean)
? )
? //創(chuàng)建DStream,連接到Kafka,返回接收到的輸入數(shù)據(jù)
? val inputStream = {
? ? KafkaUtils.createDirectStream[String, String](
? ? ? ssc,
? ? ? //位置策略(可用的Executor上均勻分配分區(qū))
? ? ? LocationStrategies.PreferConsistent,
? ? ? //消費(fèi)策略(訂閱固定的主題集合)
? ? ? ConsumerStrategies.Subscribe[String, String](Array(fromTopic), kafkaParams))
? }
? inputStream.map{record => "hehe--"+record.value}.foreachRDD { rdd =>
? ? //在這里將RDD寫回Kafka,需要使用Kafka連接池
? ? rdd.foreachPartition { items =>
? ? ? val kafkaProxyPool = KafkaPool(brokers)
? ? ? val kafkaProxy = kafkaProxyPool.borrowObject()
? ? ? for (item <- items) {
? ? ? ? //使用這個(gè)連接池
? ? ? ? kafkaProxy.kafkaClient.send(new ProducerRecord[String, String](toTopic, item))
? ? ? }
? ? ? kafkaProxyPool.returnObject(kafkaProxy)
? ? }
? }
? ssc.start()
? ssc.awaitTermination()
}
//Kafka連接池
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import org.apache.kafka.common.serialization.StringSerializer
//因?yàn)橐獙cala的集合類型轉(zhuǎn)換成Java的
import scala.collection.JavaConversions._
class KafkaProxy(broker:String){
? val conf = Map(
? ? //用于初始化鏈接到集群的地址
? ? ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,
? ? //Key與VALUE的序列化類型
? ? ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer],
? ? ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer]
? )
? val kafkaClient = new KafkaProducer[String,String](conf)
}
//創(chuàng)建一個(gè)創(chuàng)建KafkaProxy的工廠
class KafkaProxyFactory(broker:String) extends? BasePooledObjectFactory[KafkaProxy]{
? //創(chuàng)建實(shí)例
? override def create(): KafkaProxy = new KafkaProxy(broker)
? //包裝實(shí)例
? override def wrap(t: KafkaProxy): PooledObject[KafkaProxy] = new DefaultPooledObject[KafkaProxy](t)
}
object KafkaPool {
? private var kafkaPool:GenericObjectPool[KafkaProxy]=null
? def apply(broker:String): GenericObjectPool[KafkaProxy] ={
? ? if(kafkaPool == null){
? ? ? this.kafkaPool = new GenericObjectPool[KafkaProxy](new KafkaProxyFactory(broker))
? ? }
? ? kafkaPool
? }
}
3.啟動(dòng)zookeeper
4.啟動(dòng)kafka
kafka-server-start.sh /opt/apps/Kafka/kafka_2.11_2.0.0/config/server.properties &
5.創(chuàng)建兩個(gè)主題
[root@master ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic source
[root@master ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic target
6.啟動(dòng)producer 寫入數(shù)據(jù)到source
[root@master ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic source
7.啟動(dòng)consumer 監(jiān)聽target的數(shù)據(jù)
[root@master ~]# kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic target
手動(dòng)設(shè)置offset

4. DStream 轉(zhuǎn)換
DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的語法,如:updateStateByKey()、transform()以及各種Window相關(guān)的語法。
TransformationMeaning
map(func)將源DStream中的每個(gè)元素通過一個(gè)函數(shù)func從而得到新的DStreams。
flatMap(func)和map類似,但是每個(gè)輸入的項(xiàng)可以被映射為0或更多項(xiàng)。
filter(func)選擇源DStream中函數(shù)func判為true的記錄作為新DStreams
repartition(numPartitions)通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別。
union(otherStream)聯(lián)合源DStreams和其他DStreams來得到新DStream
count()統(tǒng)計(jì)源DStreams中每個(gè)RDD所含元素的個(gè)數(shù)得到單元素RDD的新DStreams。
reduce(func)通過函數(shù)func(兩個(gè)參數(shù)一個(gè)輸出)來整合源DStreams中每個(gè)RDD元素得到單元素RDD的DStreams。這個(gè)函數(shù)需要關(guān)聯(lián)從而可以被并行計(jì)算。
countByValue()對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream,其中Long值表明相應(yīng)的K在源DStream中每個(gè)RDD出現(xiàn)的頻率。
reduceByKey(func, [numTasks])對(K,V)對的DStream調(diào)用此函數(shù),返回同樣(K,V)對的新DStream,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來。Note:默認(rèn)情況下,這個(gè)操作使用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2,集群模式中的數(shù)量取決于配置參數(shù)spark.default.parallelism)。你也可以傳入可選的參數(shù)numTaska來設(shè)置不同數(shù)量的任務(wù)。
join(otherStream, [numTasks])兩DStream分別為(K,V)和(K,W)對,返回(K,(V,W))對的新DStream。
cogroup(otherStream, [numTasks])兩DStream分別為(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStreams
transform(func)將RDD到RDD映射的函數(shù)func作用于源DStream中每個(gè)RDD上得到新DStream。這個(gè)可用于在DStream的RDD上做任意操作。
updateStateByKey(func)得到”狀態(tài)”DStream,其中每個(gè)key狀態(tài)的更新是通過將給定函數(shù)用于此key的上一個(gè)狀態(tài)和新值而得到。這個(gè)可用于保存每個(gè)key值的任意狀態(tài)數(shù)據(jù)。
DStream 的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。
在無狀態(tài)轉(zhuǎn)化操作中,每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。常見的 RDD 轉(zhuǎn)化操作,例如 map()、filter()、reduceByKey() 等,都是無狀態(tài)轉(zhuǎn)化操作。
相對地,有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計(jì)算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動(dòng)窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。
4.1 無狀態(tài)轉(zhuǎn)化操作
無狀態(tài)轉(zhuǎn)化操作就是把簡單的 RDD 轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上,也就是轉(zhuǎn)化 DStream 中的每一個(gè) RDD。部分無狀態(tài)轉(zhuǎn)化操作列在了下表中。 注意,針對鍵值對的 DStream 轉(zhuǎn)化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。?

需要記住的是,盡管這些函數(shù)看起來像作用在整個(gè)流上一樣,但事實(shí)上每個(gè) DStream 在內(nèi)部是由許多 RDD(批次)組成,且無狀態(tài)轉(zhuǎn)化操作是分別應(yīng)用到每個(gè) RDD 上的。例如, reduceByKey() 會(huì)歸約每個(gè)時(shí)間區(qū)間中的數(shù)據(jù),但不會(huì)歸約不同區(qū)間之間的數(shù)據(jù)。?
舉個(gè)例子,在之前的wordcount程序中,我們只會(huì)統(tǒng)計(jì)1秒內(nèi)接收到的數(shù)據(jù)的單詞個(gè)數(shù),而不會(huì)累加。?
無狀態(tài)轉(zhuǎn)化操作也能在多個(gè) DStream 間整合數(shù)據(jù),不過也是在各個(gè)時(shí)間區(qū)間內(nèi)。例如,鍵 值對 DStream 擁有和 RDD 一樣的與連接相關(guān)的轉(zhuǎn)化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個(gè)批次分別執(zhí)行了對應(yīng)的 RDD 操作。?
我們還可以像在常規(guī)的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個(gè) DStream 的內(nèi)容合并起來,也可以使用 StreamingContext.union() 來合并多個(gè)流。
4.2 有狀態(tài)轉(zhuǎn)化操作
4.2.1 追蹤狀態(tài)變化UpdateStateByKey
UpdateStateByKey原語用于記錄歷史記錄,有時(shí),我們需要在 DStream 中跨批次維護(hù)狀態(tài)(例如流計(jì)算中累加wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個(gè)狀態(tài)變量的訪問,用于鍵值對形式的 DStream。給定一個(gè)由(鍵,事件)對構(gòu)成的 DStream,并傳遞一個(gè)指定如何根據(jù)新的事件 更新每個(gè)鍵對應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個(gè)新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài)) 對。?
updateStateByKey() 的結(jié)果會(huì)是一個(gè)新的 DStream,其內(nèi)部的 RDD 序列是由每個(gè)時(shí)間區(qū)間對應(yīng)的(鍵,狀態(tài))對組成的。?
updateStateByKey操作使得我們可以在用新信息進(jìn)行更新時(shí)保持任意的狀態(tài)。為使用這個(gè)功能,你需要做下面兩步:?
1. 定義狀態(tài),狀態(tài)可以是一個(gè)任意的數(shù)據(jù)類型。?
2. 定義狀態(tài)更新函數(shù),用此函數(shù)闡明如何使用之前的狀態(tài)和來自輸入流的新值對狀態(tài)進(jìn)行更新。?
使用updateStateByKey需要對檢查點(diǎn)目錄進(jìn)行配置,會(huì)使用檢查點(diǎn)來保存狀態(tài)。
4.2.2 Window
Window Operations有點(diǎn)類似于Storm中的State,可以設(shè)置窗口的大小和滑動(dòng)窗口的間隔來動(dòng)態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)。?
基于窗口的操作會(huì)在一個(gè)比 StreamingContext 的批次間隔更長的時(shí)間范圍內(nèi),通過整合多個(gè)批次的結(jié)果,計(jì)算出整個(gè)窗口的結(jié)果。?

所有基于窗口的操作都需要兩個(gè)參數(shù),分別為窗口時(shí)長以及滑動(dòng)步長,兩者都必須是 StreamContext 的批次間隔的整數(shù)倍。窗口時(shí)長控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù),其實(shí)就是最近的 windowDuration/batchInterval 個(gè)批次。如果有一個(gè)以 10 秒為批次間隔的源 DStream,要?jiǎng)?chuàng)建一個(gè)最近 30 秒的時(shí)間窗口(即最近 3 個(gè)批次),就應(yīng)當(dāng)把 windowDuration 設(shè)為 30 秒。而滑動(dòng)步長的默認(rèn)值與批次間隔相等,用來控制對新的 DStream 進(jìn)行計(jì)算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每兩個(gè)批次計(jì)算一次窗口結(jié)果, 就應(yīng)該把滑動(dòng)步長設(shè)置為 20 秒。?
假設(shè),你想拓展前例從而每隔十秒對持續(xù)30秒的數(shù)據(jù)生成word count。為做到這個(gè),我們需要在持續(xù)30秒數(shù)據(jù)的(word,1)對DStream上應(yīng)用reduceByKey。使用操作reduceByKeyAndWindow.