一、Spark Streaming概述
Spark Streaming類似于Apache Storm,用于流式數(shù)據(jù)的處理,具有高吞吐量和容錯能力強(qiáng)等特點(diǎn)。Spark Streaming支持的數(shù)據(jù)輸入源很多,例如:Kafka、Flume、Twitter和簡單的TCP套接字等等,而結(jié)果也能保存在很多地方,比如HDFS、數(shù)據(jù)庫等。Spark Streaming使用離散化流作為抽象表示,叫做DStream。DStream是隨著時間推移而收到的數(shù)據(jù)的序列。在內(nèi)部,每個時間區(qū)間收到的數(shù)據(jù)都作為RDD存在,而DStream是由這些RDD所組成的序列。創(chuàng)建出來的DStream支持兩種操作,一種是轉(zhuǎn)化操作(transformation),會生成一個新的DStream,另一種是輸出操作(output operation),可以把數(shù)據(jù)寫入外部系統(tǒng)中。DStream提供了許多與RDD所支持的操作相類似的操作支持,還增加了與時間相關(guān)的新操作,比如滑動窗口。
二、Spark Streaming和Storm的區(qū)別

三、架構(gòu)與抽象
Spark Streaming使用“微批次”的架構(gòu),把流式計算當(dāng)作一系列連續(xù)的小規(guī)模批處理來對待。Spark Streaming從各種輸入源中讀取數(shù)據(jù),并把數(shù)據(jù)分組為小的批次。新的批次按均勻的時間間隔創(chuàng)建出來。在每個時間區(qū)間開始的時候,一個新的批次就創(chuàng)建出來,在該區(qū)間內(nèi)收到的數(shù)據(jù)都會被添加到這個批次中。在時間區(qū)間結(jié)束時,批次停止增長。時間區(qū)間的大小是由批次間隔這個參數(shù)決定的。批次間隔一般設(shè)置在500毫秒到幾秒之間,由應(yīng)用開發(fā)者配置。每個輸入批次都形成一個RDD,以spark作業(yè)的方式處理并生成其他的RDD。處理的結(jié)果可以以批處理的方式傳給外部系統(tǒng)。


四、Spark Streaming解析
1、初始化StreamingContext
import org.apache.spark._
import org.apache.spark.streaming._
val conf=new SparkConf().setAppName(appName).setMaster(master)
val ssc=new StreamingContext(conf,Second(1))
初始化完Context之后:
1、定義消息輸入源來創(chuàng)建DStreams.
2、定義DStreams的轉(zhuǎn)化操作和輸出操作。
3、通過streamingContext.start()來啟動消息采集和處理。
4、等待程序終止,可以通過streamingContext.awaitTermination()來設(shè)置
5、通過streamingContext.stop()來手動終止處理程序。
注意:
StreamingContext一旦啟動,對DStreams的操作就不能修改了。在同一時間一個JVM中只有一個StreamingContext可以啟動,stop()方法將同時停止SparkContext,可以傳入?yún)?shù)stopSparkContext用于停止StreamingContext。
2、什么是DStreams
Discretized Stream是Spark Streaming的基本抽象,代表持續(xù)性的數(shù)據(jù)流和經(jīng)過各種Spark原語操作后的結(jié)果數(shù)據(jù)流。在內(nèi)部實現(xiàn)上,DStream是一系列連續(xù)的RDD來表示。每個RDD含有一段時間間隔內(nèi)的數(shù)據(jù)。對數(shù)據(jù)的操作也是按照RDD為單位來進(jìn)行的。
3、DStreams輸入
Spark Streaming原生支持一些不同的數(shù)據(jù)源。每個接收器都以Spark執(zhí)行程序中一個長期運(yùn)行的任務(wù)的形式運(yùn)行,因此會占據(jù)分配給應(yīng)用的CPU核心。此外,我們還需要有可用的CPU核心來處理數(shù)據(jù)。這意味這如果要運(yùn)行多個接收器,就必須至少有和接收器數(shù)目相同的核心數(shù),還要加上用來完成計算所需的核心數(shù)。例如,如果我們想要在流計算應(yīng)用中運(yùn)行10個接收器,那么至少需要為應(yīng)用分配11個CPU核心。所以如果在本地模式運(yùn)行,不要使用local或者local[1]。
3.1基本數(shù)據(jù)源
3.1文件數(shù)據(jù)源
文件數(shù)據(jù)流:能夠讀取所有HDFS API兼容的文件系統(tǒng)文件,通過fileStream方法進(jìn)行讀取
streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)
Spark Streaming將會監(jiān)控dataDirectory目錄并不斷處理移動進(jìn)來的文件,記住目前不支持嵌套目錄。
1、文件需要有相同的數(shù)據(jù)格式
2、文件進(jìn)入dataDirectory的方式需要通過移動或者重命名來實現(xiàn)
3、一旦文件移動進(jìn)目錄,則不能修改,即便修改了也不會讀取新數(shù)據(jù)。
如果文件比較簡單,則可以使用streamingContext.textFileStream(dataDirectory)方法來讀取文件。文件流不需要接收器,不需要單獨(dú)分配CPU核。
3.2 自定義數(shù)據(jù)源
通過繼承Receiver,并實現(xiàn)onStart、onStop方法來自定義數(shù)據(jù)源采集??梢酝ㄟ^streamingContext.receiverStream(<instance of custom receiver>)來使用自定義的數(shù)據(jù)采集源。
4、DStream轉(zhuǎn)換
DStream上的原語與RDD的類似,分為Transformations(轉(zhuǎn)換)和Output Operations(輸出)兩種,此外轉(zhuǎn)換操作中還有一些比較特殊的原語,比如:updateStateByKey()、transform()以及各種window相關(guān)的原語。
DStream的轉(zhuǎn)化操作可以分為無狀態(tài)(stateless)和有狀態(tài)(stateful)兩種:
(1)、在無狀態(tài)轉(zhuǎn)化操作中,每個批次的處理不依賴于之前批次的數(shù)據(jù)。常見的RDD轉(zhuǎn)化操作,例如map()、filter()、reduceByKey()等,都是無狀態(tài)轉(zhuǎn)化操作;
(2)、有狀態(tài)操作需要使用之前批次的數(shù)據(jù)或者是中間結(jié)果來計算當(dāng)前批次的數(shù)據(jù)。有狀態(tài)轉(zhuǎn)化操作包括基于滑動窗口的轉(zhuǎn)化操作和追蹤狀態(tài)變化的轉(zhuǎn)化操作。
4.1 無狀態(tài)轉(zhuǎn)化操作
無狀態(tài)轉(zhuǎn)化操作就是把簡單的RDD轉(zhuǎn)化操作應(yīng)用到每個批次上,也就是轉(zhuǎn)化DStream中的每一個RDD。注意,針對鍵值對的DStream轉(zhuǎn)化操作(比如reduceByKey())要添加import StreamingContext._才能在scala中使用。
#這里列舉一下無狀態(tài)轉(zhuǎn)化操作的例子
1、def map[U: ClassTag](mapFunc: T => U): DStream[U] 將源DStream中的每個元素通過一個函數(shù)func從而得到新的DStreams。
2、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] 和map類似,但是每個輸入的項可以被映射為0或更多項。
3、def filter(filterFunc: T => Boolean): DStream[T] 選擇源DStream中函數(shù)func判為true的記錄作為新DStream
4、def repartition(numPartitions: Int): DStream[T] 通過創(chuàng)建更多或者更少的partition來改變此DStream的并行級別。
5、def union(that: DStream[T]): DStream[T] 將一個具有相同slideDuration新的DStream和當(dāng)前DStream進(jìn)行合并,返回新的DStream
6、def count(): DStream[Long] 統(tǒng)計源DStreams中每個RDD所含元素的個數(shù)得到單元素RDD的新DStreams。
7、def reduce(reduceFunc: (T, T) => T): DStream[T] 通過函數(shù)func(兩個參數(shù)一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStream。
8、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)] 對于DStreams中元素類型為K調(diào)用此函數(shù),得到包含(K,Long)對的新DStream,其中Long值表明相應(yīng)的K在源DStream中每個RDD出現(xiàn)的次數(shù)。
9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] 對(K,V)對的DStream調(diào)用此函數(shù),返回同樣(K,V)對的新DStream,但是新DStream中的對應(yīng)V為使用reduce函數(shù)整合而來
10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] 兩DStream分別為(K,V)和(K,W)對,返回(K,(V,W))對的新DStream。
11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] 兩DStream分別為(K,V)和(K,W)對,返回(K,(Seq[V],Seq[W])對新DStream
12、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] 將RDD到RDD映射的函數(shù)func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作。注意的是,在這個轉(zhuǎn)換函數(shù)里面能夠應(yīng)用所有RDD的轉(zhuǎn)換操作。
4.2 有狀態(tài)轉(zhuǎn)換操作
1、def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)]
(1)、 S是你需要保存的狀態(tài)的類型。
(2)、updateFunc 是定義了每一批次RDD如何來更新的狀態(tài)值。 Seq[V] 是當(dāng)前批次相同key的值的集合。 Option[S] 是框架自動提供的,上一次保存的狀態(tài)的值。
(3)、updateStateByKey會返回一個新的DStream,該DStream中保存了(Key,State)的序列。
2、window 函數(shù)
(1)、def window(windowDuration: Duration, slideDuration: Duration): DStream[T] 基于對源DStream窗化的批次進(jìn)行計算返回一個新的DStream,windowDuration是窗口大小,slideDuration滑動步長。
(2)、def countByWindow( windowDuration: Duration, slideDuration: Duration): DStream[Long] 注意,返回的是window中記錄的條數(shù)。
(3)、def reduceByWindow( reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] 通過使用自定義函數(shù)整合滑動區(qū)間流元素來創(chuàng)建一個新的單元素流。
(4)、 def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration, slideDuration: Duration): DStream[(K, V)] 通過給定的窗口大小以滑動步長來應(yīng)用reduceFunc函數(shù),返回DStream[(K, V)], K就是DStream中相應(yīng)的K,V是window應(yīng)用了reduce之后產(chǎn)生的最終值。
(5)、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int =ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]
五、Spark Streaming應(yīng)用案例
這里我們使用spark streaming來編寫一個實時統(tǒng)計單詞的案例:


2、在pom.xml文件中添加一下依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sparkstreaming</groupId>
<artifactId>sparkstreamingdemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.4.1</spark.version>
</properties>
<dependencies>
<!--添加Scala依賴-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
<!--如果有provided存在,那么打包的時候該依賴不會打到j(luò)ar包中-->
<scope>provided</scope>
</dependency>
<!--添加spark-core依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!--添加spark-streaming依賴-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!--添加編譯支持,都編譯成java1.8版本-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!--添加Scala編譯的支持-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!--添加打jar包的支持工具-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!--在你應(yīng)用maven package這個階段的時候,該插件會啟動-->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<!--在你jar包中指定啟動類-->
<manifest>
<mainClass>com.SparkStreaming.WordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
3、編寫Scala文件


接下來編寫scala代碼,每一行都有注釋,可以仔細(xì)看一下:
package com.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WordCount extends App{
//需要新建一個sparkConf變量,來提供spark的配置
val sparkConf=new SparkConf().setAppName("StreamWordCount").setMaster("local[2]")
//新建一個StreamingContext入口
val ssc=new StreamingContext(sparkConf,Seconds(2))
//從master機(jī)器上的9999端口不斷的獲取輸入的文本數(shù)據(jù)
val lines=ssc.socketTextStream("master",9999)
//將每行文本通過空格分割多個單詞
val words=lines.flatMap(_.split(" "))
//將每一個單詞裝換成一個元組
val pairs=words.map((_,1))
//根據(jù)單詞來統(tǒng)計相同單詞的數(shù)量
val result=pairs.reduceByKey(_+_)
//打印結(jié)果
result.print()
//啟動你的流式處理程序
ssc.start()
//等待你的停止信號
ssc.awaitTermination()
}
4、寫完這個項目代碼之后,打包上傳到服務(wù)器


5、最后我們就是測試一下,因為是實時計算所以我們這里需要有一個輸入的地方,在代碼我們已經(jīng)給出了一個監(jiān)聽端口號,所以我們另打開一個終端輸入以下命令:
nc -lk 9999
//或者(注意這里的l不是1是小寫的L)
nc -l -p 9999

接下來我們就是開啟我們的程序,這里要注意:開啟的順序不能錯,先開啟監(jiān)聽端口號,然后再啟動程序:
./spark-submit --class com.SparkStreaming.WordCount /root/Pro-jar/sparkstreamingdemo-1.0-SNAPSHOT-jar-with-dependencies.jar





案例到這里就結(jié)束了。
這里主要是淺顯的講解了一下Spark Streaming,后期還會擴(kuò)展。