Spark Streaming

1. Spark Streaming概述

1.1 什么是Spark Streaming

Spark Streaming類似于Apache Storm,用于流式數(shù)據(jù)的處理。根據(jù)其官方文檔介紹,Spark Streaming有高吞吐量和容錯(cuò)能力強(qiáng)等特點(diǎn)。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數(shù)據(jù)輸入后可以用Spark的高度抽象語言的語法如:map、reduce、join、window等進(jìn)行運(yùn)算。而結(jié)果也能保存在很多地方,如HDFS,數(shù)據(jù)庫等。另外Spark Streaming也能和MLlib(機(jī)器學(xué)習(xí))以及Graphx完美融合。

和Spark基于RDD的概念很相似,Spark Streaming使用離散化流(discretized stream)作為抽象表示,叫作DStream。DStream 是隨時(shí)間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個(gè)時(shí)間區(qū)間收到的數(shù)據(jù)都作為 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(因此 得名“離散化”)。?

DStream 可以從各種輸入源創(chuàng)建,比如 Flume、Kafka 或者 HDFS。創(chuàng)建出來的DStream 支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會(huì)生成一個(gè)新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream 提供了許多與 RDD 所支持的操作相類似的操作支持,還增加了與時(shí)間相關(guān)的新操作,比如滑動(dòng)窗口。

1.2 Spark Streaming的特點(diǎn)

易用?

容錯(cuò)?

易整合到Spark體系?

1.3 Spark 與 Storm 對比

1.3.1 對比

對比點(diǎn)StormSpark Streaming

實(shí)時(shí)計(jì)算模型純實(shí)時(shí),來一條數(shù)據(jù),處理一條數(shù)據(jù)準(zhǔn)實(shí)時(shí),對一個(gè)時(shí)間段內(nèi)的數(shù)據(jù)收集起來,作為一個(gè)RDD,再處理

實(shí)時(shí)計(jì)算延遲度毫秒級秒級

吞吐量低高

事務(wù)機(jī)制支持完善支持,但不夠完善

健壯性 / 容錯(cuò)性ZooKeeper,Acker,非常強(qiáng)Checkpoint,WAL,一般

動(dòng)態(tài)調(diào)整并行度支持不支持

1.3.2 Spark Streaming與Storm的應(yīng)用場景

Storm

建議在那種需要純實(shí)時(shí),不能忍受1秒以上延遲的場景下使用,比如實(shí)時(shí)金融系統(tǒng),要求純實(shí)時(shí)進(jìn)行金融交易和分析

此外,如果對于實(shí)時(shí)計(jì)算的功能中,要求可靠的事務(wù)機(jī)制和可靠性機(jī)制,即數(shù)據(jù)的處理完全精準(zhǔn),一條也不能多,一條也不能少,也可以考慮使用Storm

如果還需要針對高峰低峰時(shí)間段,動(dòng)態(tài)調(diào)整實(shí)時(shí)計(jì)算程序的并行度,以最大限度利用集群資源(通常是在小型公司,集群資源緊張的情況),也可以考慮用Storm

如果一個(gè)大數(shù)據(jù)應(yīng)用系統(tǒng),它就是純粹的實(shí)時(shí)計(jì)算,不需要在中間執(zhí)行SQL交互式查詢、復(fù)雜的transformation算子等,那么用Storm是比較好的選擇

Spark Streaming

如果對上述適用于Storm的三點(diǎn),一條都不滿足的實(shí)時(shí)場景,即,不要求純實(shí)時(shí),不要求強(qiáng)大可靠的事務(wù)機(jī)制,不要求動(dòng)態(tài)調(diào)整并行度,那么可以考慮使用Spark Streaming

考慮使用Spark Streaming最主要的一個(gè)因素,應(yīng)該是針對整個(gè)項(xiàng)目進(jìn)行宏觀的考慮,即,如果一個(gè)項(xiàng)目除了實(shí)時(shí)計(jì)算之外,還包括了離線批處理、交互式查詢等業(yè)務(wù)功能,而且實(shí)時(shí)計(jì)算中,可能還會(huì)牽扯到高延遲批處理、交互式查詢等功能,那么就應(yīng)該首選Spark生態(tài),用Spark Core開發(fā)離線批處理,用Spark SQL開發(fā)交互式查詢,用Spark Streaming開發(fā)實(shí)時(shí)計(jì)算,三者可以無縫整合,給系統(tǒng)提供非常高的可擴(kuò)展性

1.3.3 Spark Streaming與Storm的優(yōu)劣分析

事實(shí)上,Spark Streaming絕對談不上比Storm優(yōu)秀。這兩個(gè)框架在實(shí)時(shí)計(jì)算領(lǐng)域中,都很優(yōu)秀,只是擅長的細(xì)分場景并不相同。?

Spark Streaming僅僅在吞吐量上比Storm要優(yōu)秀,而吞吐量這一點(diǎn),也是歷來挺Spark Streaming,貶Storm的人著重強(qiáng)調(diào)的。但是問題是,是不是在所有的實(shí)時(shí)計(jì)算場景下,都那么注重吞吐量?不盡然。因此,通過吞吐量說Spark Streaming強(qiáng)于Storm,不靠譜。?

事實(shí)上,Storm在實(shí)時(shí)延遲度上,比Spark Streaming就好多了,前者是純實(shí)時(shí),后者是準(zhǔn)實(shí)時(shí)。而且,Storm的事務(wù)機(jī)制、健壯性 / 容錯(cuò)性、動(dòng)態(tài)調(diào)整并行度等特性,都要比Spark Streaming更加優(yōu)秀。?

Spark Streaming,有一點(diǎn)是Storm絕對比不上的,就是:它位于Spark生態(tài)技術(shù)棧中,因此Spark Streaming可以和Spark Core、Spark SQL無縫整合,也就意味著,我們可以對實(shí)時(shí)處理出來的中間數(shù)據(jù),立即在程序中無縫進(jìn)行延遲批處理、交互式查詢等操作。這個(gè)特點(diǎn)大大增強(qiáng)了Spark Streaming的優(yōu)勢和功能。

1.4 Spark Streaming關(guān)鍵抽象

Discretized Stream或DStream是Spark Streaming提供的基本抽象。它表示連續(xù)的數(shù)據(jù)流,可以是從源接收的輸入數(shù)據(jù)流,也可以是通過轉(zhuǎn)換輸入流生成的已處理數(shù)據(jù)流。在內(nèi)部,DStream由一系列連續(xù)的RDD表示,這是Spark對不可變分布式數(shù)據(jù)集的抽象。DStream中的每個(gè)RDD都包含來自特定時(shí)間間隔的數(shù)據(jù),如下圖所示。?

應(yīng)用于DStream的任何操作都轉(zhuǎn)換為底層RDD上的操作。例如,在先前將行流轉(zhuǎn)換為字的示例中,flatMap操作應(yīng)用于linesDStream中的每個(gè)RDD 以生成DStream的 wordsRDD。如下圖所示。?

Spark Streaming接收實(shí)時(shí)輸入數(shù)據(jù)流并將數(shù)據(jù)分成批處理,然后由Spark引擎處理,以批量生成最終結(jié)果流。?

1.5 Spark Streaming 架構(gòu)

Spark Streaming使用“微批次”的架構(gòu),把流式計(jì)算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時(shí)間間隔創(chuàng)建出來。在每個(gè)時(shí)間區(qū)間開始的時(shí)候,一個(gè)新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會(huì)被添加到這個(gè)批次中。在時(shí)間區(qū)間結(jié)束時(shí),批次停止增長。時(shí)間區(qū)間的大小是由批次間隔這個(gè)參數(shù)決定的。批次間隔一般設(shè)在500毫秒到幾秒之間,由應(yīng)用開發(fā)者配置。每個(gè)輸入批次都形成一個(gè)RDD,以 Spark 作業(yè)的方式處理并生成其他的 RDD。 處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。高層次的架構(gòu)如圖?


Spark Streaming在Spark的驅(qū)動(dòng)器程序—工作節(jié)點(diǎn)的結(jié)構(gòu)的執(zhí)行過程如下圖所示。Spark Streaming為每個(gè)輸入源啟動(dòng)對 應(yīng)的接收器。接收器以任務(wù)的形式運(yùn)行在應(yīng)用的執(zhí)行器進(jìn)程中,從輸入源收集數(shù)據(jù)并保存為 RDD。它們收集到輸入數(shù)據(jù)后會(huì)把數(shù)據(jù)復(fù)制到另一個(gè)執(zhí)行器進(jìn)程來保障容錯(cuò)性(默 認(rèn)行為)。數(shù)據(jù)保存在執(zhí)行器進(jìn)程的內(nèi)存中,和緩存 RDD 的方式一樣。驅(qū)動(dòng)器程序中的 StreamingContext 會(huì)周期性地運(yùn)行 Spark 作業(yè)來處理這些數(shù)據(jù),把數(shù)據(jù)與之前時(shí)間區(qū)間中的 RDD 進(jìn)行整合。?

1.6 背壓機(jī)制

???????默認(rèn)情況下,Spark Streaming通過Receiver以生產(chǎn)者生產(chǎn)數(shù)據(jù)的速率接收數(shù)據(jù),計(jì)算過程中會(huì)出現(xiàn)batch processing time > batch interval的情況,其中batch processing time 為實(shí)際計(jì)算一個(gè)批次花費(fèi)時(shí)間, batch interval為Streaming應(yīng)用設(shè)置的批處理間隔。這意味著Spark Streaming的數(shù)據(jù)接收速率高于Spark從隊(duì)列中移除數(shù)據(jù)的速率,也就是數(shù)據(jù)處理能力低,在設(shè)置間隔內(nèi)不能完全處理當(dāng)前接收速率接收的數(shù)據(jù)。如果這種情況持續(xù)過長的時(shí)間,會(huì)造成數(shù)據(jù)在內(nèi)存中堆積,導(dǎo)致Receiver所在Executor內(nèi)存溢出等問題(如果設(shè)置StorageLevel包含disk, 則內(nèi)存存放不下的數(shù)據(jù)會(huì)溢寫至disk, 加大延遲)。Spark 1.5以前版本,用戶如果要限制Receiver的數(shù)據(jù)接收速率,可以通過設(shè)置靜態(tài)配制參數(shù)“spark.streaming.receiver.maxRate”的值來實(shí)現(xiàn),此舉雖然可以通過限制接收速率,來適配當(dāng)前的處理能力,防止內(nèi)存溢出,但也會(huì)引入其它問題。比如:producer數(shù)據(jù)生產(chǎn)高于maxRate,當(dāng)前集群處理能力也高于maxRate,這就會(huì)造成資源利用率下降等問題。為了更好的協(xié)調(diào)數(shù)據(jù)接收速率與資源處理能力,Spark Streaming 從v1.5開始引入反壓機(jī)制(back-pressure),通過動(dòng)態(tài)控制數(shù)據(jù)接收速率來適配集群數(shù)據(jù)處理能力。

???????背壓機(jī)制: 根據(jù)JobScheduler反饋?zhàn)鳂I(yè)的執(zhí)行信息來動(dòng)態(tài)調(diào)整Receiver數(shù)據(jù)接收率。通過屬性“spark.streaming.backpressure.enabled”來控制是否啟用backpressure機(jī)制,默認(rèn)值false,即不啟用。

在原架構(gòu)的基礎(chǔ)上加上一個(gè)新的組件RateController,這個(gè)組件負(fù)責(zé)監(jiān)聽“OnBatchCompleted ”事件,然后從中抽取processingDelay 及schedulingDelay信息. Estimator依據(jù)這些信息估算出最大處理速度(rate),最后由基于Receiver的Input Stream將rate通過ReceiverTracker與ReceiverSupervisorImpl轉(zhuǎn)發(fā)給BlockGenerator(繼承自RateLimiter).?

流量控制點(diǎn)

當(dāng)Receiver開始接收數(shù)據(jù)時(shí),會(huì)通過supervisor.pushSingle()方法將接收的數(shù)據(jù)存入currentBuffer等待BlockGenerator定時(shí)將數(shù)據(jù)取走,包裝成block. 在將數(shù)據(jù)存放入currentBuffer之時(shí),要獲取許可(令牌)。如果獲取到許可就可以將數(shù)據(jù)存入buffer, 否則將被阻塞,進(jìn)而阻塞Receiver從數(shù)據(jù)源拉取數(shù)據(jù)。?

其令牌投放采用令牌桶機(jī)制進(jìn)行, 原理如下圖所示:?

令牌桶機(jī)制

大小固定的令牌桶可自行以恒定的速率源源不斷地產(chǎn)生令牌。如果令牌不被消耗,或者被消耗的速度小于產(chǎn)生的速度,令牌就會(huì)不斷地增多,直到把桶填滿。后面再產(chǎn)生的令牌就會(huì)從桶中溢出。最后桶中可以保存的最大令牌數(shù)永遠(yuǎn)不會(huì)超過桶的大小。當(dāng)進(jìn)行某操作時(shí)需要令牌時(shí)會(huì)從令牌桶中取出相應(yīng)的令牌數(shù),如果獲取到則繼續(xù)操作,否則阻塞。用完之后不用放回。

2. Spark Streaming 簡單應(yīng)用

2.1 安裝Telnet向端口發(fā)送消息

2.2 使用SparkStreaming監(jiān)控端口數(shù)據(jù)展示到控制臺(tái)

3. DStream 的輸入

Spark Streaming原生支持一些不同的數(shù)據(jù)源。一些“核心”數(shù)據(jù)源已經(jīng)被打包到Spark Streaming 的 Maven 工件中,而其他的一些則可以通過 spark-streaming-kafka 等附加工件獲取。每個(gè)接收器都以 Spark 執(zhí)行器程序中一個(gè)長期運(yùn)行的任務(wù)的形式運(yùn)行,因此會(huì)占據(jù)分配給應(yīng)用的 CPU 核心。此外,我們還需要有可用的 CPU 核心來處理數(shù)據(jù)。這意味著如果要運(yùn)行多個(gè)接收器,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計(jì)算所需要的核心數(shù)。例如,如果我們想要在流計(jì)算應(yīng)用中運(yùn)行 10 個(gè)接收器,那么至少需要為應(yīng)用分配 11 個(gè) CPU 核心。所以如果在本地模式運(yùn)行,不要使用local或者local[1]。?

3.1 文件數(shù)據(jù)源

文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進(jìn)行讀取?

Spark Streaming 將會(huì)監(jiān)控 dataDirectory 目錄并不斷處理移動(dòng)進(jìn)來的文件,記住目前不支持嵌套目錄。

文件需要有相同的數(shù)據(jù)格式

文件進(jìn)入 dataDirectory的方式需要通過移動(dòng)或者重命名來實(shí)現(xiàn)。

一旦文件移動(dòng)進(jìn)目錄,則不能再修改,即便修改了也不會(huì)讀取新數(shù)據(jù)。?

如果文件比較簡單,則可以使用 streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不需要接收器,不需要單獨(dú)分配CPU核。

案例實(shí)操

## 導(dǎo)入相應(yīng)的jar包

scala> import org.apache.spark.streaming._

## 創(chuàng)建StreamingContext操作對象

scala> val ssc = new StreamingContext(sc,Seconds(5))

scala> val lines = ssc.textFileStream("hdfs://master:9000/spark/data")

scala> val wordCount = lines.flatMap(_.split("\t")).map(x=>(x,1)).reduceByKey(_+_)

scala> wordCount.print

scala> ssc.start

3.2 自定義數(shù)據(jù)源

通過繼承Receiver,并實(shí)現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集。

案例實(shí)操

3.3 RDD隊(duì)列

可以通過使用streamingContext.queueStream(queueOfRDDs)來創(chuàng)建DStream,每一個(gè)推送到這個(gè)隊(duì)列中的RDD,都會(huì)作為一個(gè)DStream處理。

案例實(shí)操

3.4 Kafka

Spark 與 Kafka集成指南?

下面我們進(jìn)行一個(gè)實(shí)例,演示SparkStreaming如何從Kafka讀取消息,如果通過連接池方法把消息處理完成后再寫會(huì)Kafka?

整合

1.引入jar包依賴


? ? org.apache.spark

? ? spark-streaming-kafka-0-10_2.11

? ? ${spark.version}

2.代碼編寫

//Stream2Kafka

import kafka.serializer.StringDecoder

import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.SparkConf

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

import org.apache.spark.streaming.{Seconds, StreamingContext}

object Stream2Kafka extends App {

? //創(chuàng)建配置對象

? val conf = new SparkConf().setAppName("kafka").setMaster("local[3]")

? //創(chuàng)建SparkStreaming操作對象

? val ssc = new StreamingContext(conf,Seconds(5))

? //連接Kafka就需要Topic

? //輸入的topic

? val fromTopic = "source"

? //輸出的Topic

? val toTopic = "target"

? //創(chuàng)建brokers的地址

? val brokers = "master:9092,slave1:9092,slave3:9092,slave2:9092"

? //Kafka消費(fèi)者配置對象

? val kafkaParams = Map[String, Object](

? ? //用于初始化鏈接到集群的地址

? ? ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,

? ? //Key與VALUE的序列化類型

? ? ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],

? ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],

? ? //用于標(biāo)識這個(gè)消費(fèi)者屬于哪個(gè)消費(fèi)團(tuán)體

? ? ConsumerConfig.GROUP_ID_CONFIG->"kafka",

? ? //如果沒有初始化偏移量或者當(dāng)前的偏移量不存在任何服務(wù)器上,可以使用這個(gè)配置屬性

? ? //可以使用這個(gè)配置,latest自動(dòng)重置偏移量為最新的偏移量

? ? ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"latest",

? ? //如果是true,則這個(gè)消費(fèi)者的偏移量會(huì)在后臺(tái)自動(dòng)提交

? ? ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->(false: java.lang.Boolean)

? )

? //創(chuàng)建DStream,連接到Kafka,返回接收到的輸入數(shù)據(jù)

? val inputStream = {

? ? KafkaUtils.createDirectStream[String, String](

? ? ? ssc,

? ? ? //位置策略(可用的Executor上均勻分配分區(qū))

? ? ? LocationStrategies.PreferConsistent,

? ? ? //消費(fèi)策略(訂閱固定的主題集合)

? ? ? ConsumerStrategies.Subscribe[String, String](Array(fromTopic), kafkaParams))

? }

? inputStream.map{record => "hehe--"+record.value}.foreachRDD { rdd =>

? ? //在這里將RDD寫回Kafka,需要使用Kafka連接池

? ? rdd.foreachPartition { items =>

? ? ? val kafkaProxyPool = KafkaPool(brokers)

? ? ? val kafkaProxy = kafkaProxyPool.borrowObject()

? ? ? for (item <- items) {

? ? ? ? //使用這個(gè)連接池

? ? ? ? kafkaProxy.kafkaClient.send(new ProducerRecord[String, String](toTopic, item))

? ? ? }

? ? ? kafkaProxyPool.returnObject(kafkaProxy)

? ? }

? }

? ssc.start()

? ssc.awaitTermination()

}

//Kafka連接池

import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}

import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}

import org.apache.kafka.common.serialization.StringSerializer

//因?yàn)橐獙cala的集合類型轉(zhuǎn)換成Java的

import scala.collection.JavaConversions._

class KafkaProxy(broker:String){

? val conf = Map(

? ? //用于初始化鏈接到集群的地址

? ? ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> broker,

? ? //Key與VALUE的序列化類型

? ? ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer],

? ? ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG->classOf[StringSerializer]

? )

? val kafkaClient = new KafkaProducer[String,String](conf)

}

//創(chuàng)建一個(gè)創(chuàng)建KafkaProxy的工廠

class KafkaProxyFactory(broker:String) extends? BasePooledObjectFactory[KafkaProxy]{

? //創(chuàng)建實(shí)例

? override def create(): KafkaProxy = new KafkaProxy(broker)

? //包裝實(shí)例

? override def wrap(t: KafkaProxy): PooledObject[KafkaProxy] = new DefaultPooledObject[KafkaProxy](t)

}

object KafkaPool {

? private var kafkaPool:GenericObjectPool[KafkaProxy]=null

? def apply(broker:String): GenericObjectPool[KafkaProxy] ={

? ? if(kafkaPool == null){

? ? ? this.kafkaPool = new GenericObjectPool[KafkaProxy](new KafkaProxyFactory(broker))

? ? }

? ? kafkaPool

? }

}

3.啟動(dòng)zookeeper

4.啟動(dòng)kafka

kafka-server-start.sh /opt/apps/Kafka/kafka_2.11_2.0.0/config/server.properties &

5.創(chuàng)建兩個(gè)主題

[root@master ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic source

[root@master ~]# kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181,slave4:2181 --replication-factor 2 --partitions 2 --topic target

6.啟動(dòng)producer 寫入數(shù)據(jù)到source

[root@master ~]# kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic source

7.啟動(dòng)consumer 監(jiān)聽target的數(shù)據(jù)

[root@master ~]# kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave2:9092,slave3:9092,slave4:9092 --topic target

手動(dòng)設(shè)置offset


4. DStream 轉(zhuǎn)換

DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的語法,如:updateStateByKey()、transform()以及各種Window相關(guān)的語法。

TransformationMeaning

map(func)將源DStream中的每個(gè)元素通過一個(gè)函數(shù)func從而得到新的DStreams。

flatMap(func)和map類似,但是每個(gè)輸入的項(xiàng)可以被映射為0或更多項(xiàng)。

filter(func)選擇源DStream中函數(shù)func判為true的記錄作為新DStreams

repartition(numPartitions)通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別。

union(otherStream)聯(lián)合源DStreams和其他DStreams來得到新DStream

count()統(tǒng)計(jì)源DStreams中每個(gè)RDD所含元素的個(gè)數(shù)得到單元素RDD的新DStreams。

reduce(func)通過函數(shù)func(兩個(gè)參數(shù)一個(gè)輸出)來整合源DStreams中每個(gè)RDD元素得到單元素RDD的DStreams。這個(gè)函數(shù)需要關(guān)聯(lián)從而可以被并行計(jì)算。

countByValue()對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream,其中Long值表明相應(yīng)的K在源DStream中每個(gè)RDD出現(xiàn)的頻率。

reduceByKey(func, [numTasks])對(K,V)對的DStream調(diào)用此函數(shù),返回同樣(K,V)對的新DStream,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來。Note:默認(rèn)情況下,這個(gè)操作使用Spark默認(rèn)數(shù)量的并行任務(wù)(本地模式為2,集群模式中的數(shù)量取決于配置參數(shù)spark.default.parallelism)。你也可以傳入可選的參數(shù)numTaska來設(shè)置不同數(shù)量的任務(wù)。

join(otherStream, [numTasks])兩DStream分別為(K,V)和(K,W)對,返回(K,(V,W))對的新DStream。

cogroup(otherStream, [numTasks])兩DStream分別為(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStreams

transform(func)將RDD到RDD映射的函數(shù)func作用于源DStream中每個(gè)RDD上得到新DStream。這個(gè)可用于在DStream的RDD上做任意操作。

updateStateByKey(func)得到”狀態(tài)”DStream,其中每個(gè)key狀態(tài)的更新是通過將給定函數(shù)用于此key的上一個(gè)狀態(tài)和新值而得到。這個(gè)可用于保存每個(gè)key值的任意狀態(tài)數(shù)據(jù)。

DStream 的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。

在無狀態(tài)轉(zhuǎn)化操作中,每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。常見的 RDD 轉(zhuǎn)化操作,例如 map()、filter()、reduceByKey() 等,都是無狀態(tài)轉(zhuǎn)化操作。

相對地,有狀態(tài)轉(zhuǎn)化操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計(jì)算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動(dòng)窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。

4.1 無狀態(tài)轉(zhuǎn)化操作

無狀態(tài)轉(zhuǎn)化操作就是把簡單的 RDD 轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上,也就是轉(zhuǎn)化 DStream 中的每一個(gè) RDD。部分無狀態(tài)轉(zhuǎn)化操作列在了下表中。 注意,針對鍵值對的 DStream 轉(zhuǎn)化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。?


需要記住的是,盡管這些函數(shù)看起來像作用在整個(gè)流上一樣,但事實(shí)上每個(gè) DStream 在內(nèi)部是由許多 RDD(批次)組成,且無狀態(tài)轉(zhuǎn)化操作是分別應(yīng)用到每個(gè) RDD 上的。例如, reduceByKey() 會(huì)歸約每個(gè)時(shí)間區(qū)間中的數(shù)據(jù),但不會(huì)歸約不同區(qū)間之間的數(shù)據(jù)。?

舉個(gè)例子,在之前的wordcount程序中,我們只會(huì)統(tǒng)計(jì)1秒內(nèi)接收到的數(shù)據(jù)的單詞個(gè)數(shù),而不會(huì)累加。?

無狀態(tài)轉(zhuǎn)化操作也能在多個(gè) DStream 間整合數(shù)據(jù),不過也是在各個(gè)時(shí)間區(qū)間內(nèi)。例如,鍵 值對 DStream 擁有和 RDD 一樣的與連接相關(guān)的轉(zhuǎn)化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個(gè)批次分別執(zhí)行了對應(yīng)的 RDD 操作。?

我們還可以像在常規(guī)的 Spark 中一樣使用 DStream 的 union() 操作將它和另一個(gè) DStream 的內(nèi)容合并起來,也可以使用 StreamingContext.union() 來合并多個(gè)流。

4.2 有狀態(tài)轉(zhuǎn)化操作

4.2.1 追蹤狀態(tài)變化UpdateStateByKey

UpdateStateByKey原語用于記錄歷史記錄,有時(shí),我們需要在 DStream 中跨批次維護(hù)狀態(tài)(例如流計(jì)算中累加wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個(gè)狀態(tài)變量的訪問,用于鍵值對形式的 DStream。給定一個(gè)由(鍵,事件)對構(gòu)成的 DStream,并傳遞一個(gè)指定如何根據(jù)新的事件 更新每個(gè)鍵對應(yīng)狀態(tài)的函數(shù),它可以構(gòu)建出一個(gè)新的 DStream,其內(nèi)部數(shù)據(jù)為(鍵,狀態(tài)) 對。?

updateStateByKey() 的結(jié)果會(huì)是一個(gè)新的 DStream,其內(nèi)部的 RDD 序列是由每個(gè)時(shí)間區(qū)間對應(yīng)的(鍵,狀態(tài))對組成的。?

updateStateByKey操作使得我們可以在用新信息進(jìn)行更新時(shí)保持任意的狀態(tài)。為使用這個(gè)功能,你需要做下面兩步:?

1. 定義狀態(tài),狀態(tài)可以是一個(gè)任意的數(shù)據(jù)類型。?

2. 定義狀態(tài)更新函數(shù),用此函數(shù)闡明如何使用之前的狀態(tài)和來自輸入流的新值對狀態(tài)進(jìn)行更新。?

使用updateStateByKey需要對檢查點(diǎn)目錄進(jìn)行配置,會(huì)使用檢查點(diǎn)來保存狀態(tài)。

4.2.2 Window

Window Operations有點(diǎn)類似于Storm中的State,可以設(shè)置窗口的大小和滑動(dòng)窗口的間隔來動(dòng)態(tài)的獲取當(dāng)前Steaming的允許狀態(tài)。?

基于窗口的操作會(huì)在一個(gè)比 StreamingContext 的批次間隔更長的時(shí)間范圍內(nèi),通過整合多個(gè)批次的結(jié)果,計(jì)算出整個(gè)窗口的結(jié)果。?


所有基于窗口的操作都需要兩個(gè)參數(shù),分別為窗口時(shí)長以及滑動(dòng)步長,兩者都必須是 StreamContext 的批次間隔的整數(shù)倍。窗口時(shí)長控制每次計(jì)算最近的多少個(gè)批次的數(shù)據(jù),其實(shí)就是最近的 windowDuration/batchInterval 個(gè)批次。如果有一個(gè)以 10 秒為批次間隔的源 DStream,要?jiǎng)?chuàng)建一個(gè)最近 30 秒的時(shí)間窗口(即最近 3 個(gè)批次),就應(yīng)當(dāng)把 windowDuration 設(shè)為 30 秒。而滑動(dòng)步長的默認(rèn)值與批次間隔相等,用來控制對新的 DStream 進(jìn)行計(jì)算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們只希望每兩個(gè)批次計(jì)算一次窗口結(jié)果, 就應(yīng)該把滑動(dòng)步長設(shè)置為 20 秒。?

假設(shè),你想拓展前例從而每隔十秒對持續(xù)30秒的數(shù)據(jù)生成word count。為做到這個(gè),我們需要在持續(xù)30秒數(shù)據(jù)的(word,1)對DStream上應(yīng)用reduceByKey。使用操作reduceByKeyAndWindow.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容