1 Spark Streaming 透徹理解之一 - 簡書
http://www.itdecent.cn/p/8de6ec8513ca
1.Spark Streaming另類實驗與 Spark Streaming本質(zhì)解析 - 聽風(fēng)居士 - 博客園
http://www.cnblogs.com/zhouyf/p/5471477.html
1 Spark源碼定制選擇從Spark Streaming入手
我們從第一課就選擇Spark子框架中的SparkStreaming。
那么,我們?yōu)槭裁匆x擇從SparkStreaming入手開始我們的Spark源碼版本定制之路?
有下面幾個方面的理由:
1)Spark大背景
Spark 最開始沒有我們今天看到的Spark Streaming、GraphX、Machine Learning、Spark SQL和Spark R等相關(guān)子框架內(nèi)容,最開始就只有很原始的Spark Core。我們要做Spark源碼定制,做自己的發(fā)行版本,以SparkStreaming為切入點,Spark Streaming本身是 Spark Core上的一個子框架,所以我們透過一個子框架的徹底研究,肯定可以精通Spark力量的源泉和所有問題的解決之道;
2)為什么不選Spark SQL?
我們知道,Spark有很多子框架,現(xiàn)在除了基于Spark Core編程之外,用得最多的就是SparkSQL。Spark SQL由于涉及了太多的SQL語法細(xì)節(jié)的解析或者說優(yōu)化,其實這些解析或優(yōu)化,對于我們集 中精力去研究Spark而言,它是一件重要的事情,但其實不是最重要的一件事情。由于它有太多的SQL語法解析,這個不是一個合適的子框架來讓我們研究。
3)為什么不選Spark R?
Spark R現(xiàn)在很不成熟,而且支持功能有限,這個也從我們的候選列表中刪除掉。
4)為什么不選Spark GraphX(圖計算)?
如果大家關(guān)注了Spark的演進(jìn)或發(fā)展的話,Spark最近發(fā)布的幾個版本,Spark圖計算基本沒有改進(jìn)。如果按照這個趨勢的話,Spark官方機(jī)構(gòu)似乎 在透露一個信號,圖計算已經(jīng)發(fā)展到盡頭了。所以說,我們?nèi)绻芯康脑?,肯定不會去做一個看上去發(fā)展到盡頭的東西。另外,至于圖計算而言,它有很多數(shù)學(xué)級 別的算法,而我們是要把Spark做到極致,這樣的話,數(shù)學(xué)這件事情很重要,但對我們來說卻不是最重要的。
5)為什么不選Spark MLlib(機(jī)器學(xué)習(xí))?
Spark機(jī)器學(xué)習(xí)在封裝了Vector(向量)和Metrics基礎(chǔ)之上,加上Spark的RDD,構(gòu)建了它的眾多的庫。這個也由于涉及到了太多的數(shù)學(xué)的知識,所以我們選機(jī)器學(xué)習(xí)其實也不是一個太好的選擇。
綜上所述,我們篩選之下,Spark Streaming是我們唯一的選擇。
我 們回顧過去,2015年是Spark最火的一年,最火的國家主要是美國。其實,2015年也是流式處理最火的一年。從從業(yè)人員的待遇上看,不論2015年 還是2016年,在搞大數(shù)據(jù)開發(fā)的公司中,以Spark崗位招聘的待遇一定是最高的。2016上半年,據(jù)StackOverflow開展的一項調(diào)查結(jié)果顯 示,在大數(shù)據(jù)領(lǐng)域,Spark從業(yè)人員的待遇是最高的。在調(diào)查中,50%以上的人認(rèn)為,Spark中最吸引人的是Spark Streaming??傊蠹铱紤]用Spark,主要是因為Spark Streaming。
Spark Streaming到底有什么魔力?
1)它是流式計算
這是一個流處理的時代,一切數(shù)據(jù)如果不是流式的處理或者跟流式的處理不相關(guān)的話,都是無效的數(shù)據(jù)。這句話會不斷地被社會的發(fā)展所證實。
2)流式處理才是真正的我們對大數(shù)據(jù)的初步印象
一方面,數(shù)據(jù)流進(jìn)來,立即給我們一個反饋,這不是批處理或者數(shù)據(jù)挖掘能做到的。另一方面,Spark非常強(qiáng)大的地方在于它的流式處理可以在線的利用機(jī)器學(xué)習(xí)、圖計算、Spark SQL或者Spark R的成果,這得益于Spark多元化、一體化的基礎(chǔ)架構(gòu)設(shè)計。也就是說,在Spark技術(shù)堆棧中,Spark Streaming可以調(diào)用任何的API接口,不需要做任何的設(shè)置。這是Spark無可匹敵之處,也是Spark Streaming必將一統(tǒng)天下的根源。這個時代的流處理單打獨斗已經(jīng)不行了,Spark Streaming必然會跟多個Spark子框架聯(lián)合起來,稱霸大數(shù)據(jù)領(lǐng)域。
3)流式處理“魅力和復(fù)雜”的雙重體
如果你精通SparkStreaming,你就知道Spark Streaming以及它背后的兄弟框架,展示了Spark和大數(shù)據(jù)的無窮魅力。不過,在Spark的所有程序中,肯定是基于SparkStreaming的應(yīng)用程序最容易出問題。為什么?因為數(shù)據(jù)不斷流進(jìn)來,它要動態(tài)控制數(shù)據(jù)的流入,作業(yè)的切分還有數(shù)據(jù)的處理。這些都會帶來極大的復(fù)雜性。
4)與其他Spark子框架的巨大區(qū)別
如果你仔細(xì)觀察,你會發(fā)現(xiàn),Spark Streaming很像是基于Spark Core之上的一個應(yīng)用程序。不像其他子框架,比如機(jī)器學(xué)習(xí)是把數(shù)學(xué)算法直接應(yīng)用在Spark的RDD之上,Spark Streaming更像一般的應(yīng)用程序那樣,感知流進(jìn)來的數(shù)據(jù)并進(jìn)行相應(yīng)的處理。
所以如果要做Spark的定制開發(fā),Spark Streaming則提供了最好的參考,掌握了Spark Streaming也就容易開發(fā)任意其他的程序。當(dāng)然想掌握SparkStreaming,但不去精通Spark Core的話,那是不可能的。Spark Core加Spark Streaming更是雙劍合璧,威力無窮。我們選擇SparkStreaming來入手,等于是找到了關(guān)鍵點。如果對照風(fēng)水學(xué)的說法,對于Spark,我們算是已經(jīng)幸運地找到了龍脈。如果要尋龍點穴,那么Spark Streaming就是龍穴之所在。找到了穴位,我們就能一日千里。
2 Spark Streaming另類在線實驗
我們在研究Spark Streaming的過程中,會有困惑的事情:如何清晰的看到數(shù)據(jù)的流入、被處理的過程?
使用一個小技巧,通過調(diào)節(jié)放大Batch Interval的方式,來降低批處理次數(shù),以方便看清楚各個環(huán)節(jié)。
我們從已寫過的廣告點擊的在線黑名單過濾的Spark Streaming應(yīng)用程序入手。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object OnlineBlackListFilter { def main(args: Array[String]) { /** * 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序的運行時的配置信息。 * 例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置 * 為local,則代表Spark程序在本地運行,特別適合于機(jī)器配置條件非常差(例如 * 只有1G的內(nèi)存)的初學(xué)者 /val conf = new SparkConf() //創(chuàng)建SparkConf對象 conf.setAppName("OnlineBlackListFilter") //設(shè)置應(yīng)用程序的名稱,在程序運行的監(jiān)控界面可以看到名稱 conf.setMaster("spark://Master:7077") //此時,程序在Spark集群val ssc = new StreamingContext(conf,Seconds(300)) /* * 黑名單數(shù)據(jù)準(zhǔn)備,實際上黑名單一般都是動態(tài)的,例如在Redis或者數(shù)據(jù)庫中,黑名單的生成往往有復(fù)雜的業(yè)務(wù) * 邏輯,具體情況算法不同,但是在Spark Streaming進(jìn)行處理的時候每次都能工訪問完整的信息 /val blackList = Array(("hadoop",true),("mahout",true)) val blackListRDD = ssc.sparkContext.parallelize(blackList,8) //監(jiān)聽主機(jī)Master上的9999端口,接收數(shù)據(jù)val adsClickStream = ssc.socketTextStream("Master" ,9999) /* * 此處模擬的廣告點擊的每條數(shù)據(jù)的格式為:time、name * 此處map操作的結(jié)果是name、(time,name)的格式 /val adsClientStreamFormated = adsClickStream.map(ads=>(ads.split(" ")(1),ads)) adsClientStreamFormated.transform(userClickRDD => { //通過leftOuterJoin操作既保留了左側(cè)用戶廣告點擊內(nèi)容的RDD的所有內(nèi)容,又獲得了相應(yīng)點擊內(nèi)容是否在黑名單中val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD) /* * 進(jìn)行filter過濾的時候,其輸入元素是一個Tuple:(name,((time,name), boolean)) * 其中第一個元素是黑名單的名稱,第二元素的第二個元素是進(jìn)行l(wèi)eftOuterJoin的時候是否存在在值 * 如果存在的話,表面當(dāng)前廣告點擊是黑名單,需要過濾掉,否則的話則是有效點擊內(nèi)容; /val validClicked = joinedBlackListRDD.filter(joinedItem=>{ if(joinedItem._2._2.getOrElse(false)){ false }else{ true } }) validClicked.map(validClick => {validClick._2._1}) }).print() /* * 計算后的有效數(shù)據(jù)一般都會寫入Kafka中,下游的計費系統(tǒng)會從kafka中pull到有效數(shù)據(jù)進(jìn)行計費 */ ssc.start() ssc.awaitTermination() }}
把程序的Batch Interval設(shè)置從30秒改成300秒:
val ssc = new StreamingContext(conf, Seconds(300))
重新生成一下jar包 。
Spark集群有5臺機(jī)器:Master、Worker1、Worker2、Worker3、Worker4。
啟動HDFS集群:start-dfs.sh啟動Spark集群:start-all.sh啟動Spark的History Server:start-history-server.sh
打開數(shù)據(jù)發(fā)送的端口:nc -lk 9999。
用spark-submit運行前面生成的jar包。
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.sparkstreaming.OnlineBlackListFilter --master spark://Master:7077 /root/Documents/SparkApps/OnlineBlackListFilter.jar
在數(shù)據(jù)發(fā)送端口輸入若干數(shù)據(jù),比如:
1375864674543 Tom
1375864674553 Spy
1375864674571 Andy
1375864688436 Cheater
1375864784240 Kelvin
1375864853892 Steven
1375864979347 John
打開瀏覽器,看History Server的日志信息:
點擊最新的應(yīng)用,看我們目前運行的應(yīng)用程序中有些什么Job:
總共竟然有5個Job。這完全不是我們此前做Spark SQL之類的應(yīng)用程序時看到的樣子。
我們接下來看一看這些Job的內(nèi)容,主要揭示一些現(xiàn)象,不會做完全深入的剖析,只是為了先讓大家進(jìn)行一些思考。
Job 0:此Job不體現(xiàn)我們的業(yè)務(wù)邏輯代碼。這個Job是出于對后面計算的負(fù)載均衡的考慮。
Job 1:運行時間比較長,耗時1.5分鐘。
從DAG Visualization部分,就知道此Job實際就是啟動了一個接收數(shù)據(jù)的Receiver:
看來,Spark Streaming應(yīng)用程序啟動后,自己會啟動一些Job。默認(rèn)啟動了一個Job來接收數(shù)據(jù),為后續(xù)處理做準(zhǔn)備。
重要啟示:一個Spark應(yīng)用程序中可以啟動很多Job,而這些不同的Job之間可以相互配合。這一認(rèn)識為我們寫復(fù)雜Spark程序奠定了良好的基礎(chǔ)。
Job 2:看Details可以發(fā)現(xiàn)有我們程序的主要業(yè)務(wù)邏輯,體現(xiàn)在Stag 3、Stag4、Stag 5中。
Job4:****也體現(xiàn)了我們應(yīng)用程序中的業(yè)務(wù)邏輯 。有Stage 9、Stage 10、Stage 11。其中Stage 9、Stage 10被跳過。
我們的神奇之旅才剛剛開始。
3 瞬間理解Spark Streaming本質(zhì)
我們先看一張圖:
以上的連續(xù)4個圖,分別對應(yīng)以下4個段落的描述:
Spark Streaming接收Kafka、Flume、HDFS和Kinesis等各種來源的實時輸入數(shù)據(jù),進(jìn)行處理后,處理結(jié)果保存在HDFS、Databases等各種地方。
Spark Streaming接收這些實時輸入數(shù)據(jù)流,會將它們按批次劃分,然后交給Spark引擎處理,生成按照批次劃分的結(jié)果流。
Spark Streaming提供了表示連續(xù)數(shù)據(jù)流的、高度抽象的被稱為離散流的DStream。DStream本質(zhì)上表示RDD的序列。任何對DStream的操作都會轉(zhuǎn)變?yōu)閷Φ讓覴DD的操作。
Spark Streaming使用數(shù)據(jù)源產(chǎn)生的數(shù)據(jù)流創(chuàng)建DStream,也可以在已有的DStream上使用一些操作來創(chuàng)建新的DStream。
在我們前面的實驗中,每300秒會接收一批數(shù)據(jù),基于這批數(shù)據(jù)會生成RDD,進(jìn)而觸發(fā)Job,執(zhí)行處理。
DStream是一個沒有邊界的集合,沒有大小的限制。
DStream代表了時空的概念。隨著時間的推移,里面不斷產(chǎn)生RDD。
鎖定到時間片后,就是空間的操作,也就是對本時間片的對應(yīng)批次的數(shù)據(jù)的處理。
下面用實例來講解數(shù)據(jù)處理過程。
從Spark Streaming程序轉(zhuǎn)換為Spark執(zhí)行的作業(yè)的過程中,使用了DStreamGraph。
Spark Streaming程序中一般會有若干個對DStream的操作。DStreamGraph就是由這些操作的依賴關(guān)系構(gòu)成。
看來我們的學(xué)習(xí),將從Spark Streaming的現(xiàn)象開始,深入到Spark Core和Spark Streaming的本質(zhì)。
備注:本博客內(nèi)容來源于Spark發(fā)行版本定制課程