Spark core
一、 spark是一個分布式同通用計算框架,可用于離線大數(shù)據(jù)處理、實時流計算、交互式計算、圖計算等,集成了SparkSQL、SparkStream、MLLib等庫,提供了豐富的API。
spark中針對彈性分布數(shù)據(jù)集RDD進行計算,RDD可緩存在內(nèi)存或磁盤中,程序運行結(jié)束時刪除??赏ㄟ^sc.testFile()、其他RDD通過轉(zhuǎn)化因子轉(zhuǎn)化,DF/DS轉(zhuǎn)化等方式構(gòu)建成RDD。RDD會有多個分區(qū),如果從HDFS上讀取的數(shù)據(jù),如果不指定分區(qū),默認分區(qū)與block塊一一對應(yīng),如果不指定最小分區(qū)數(shù),默認為2。
spark提供兩種算子對RDD進行計算,Transformation 和 Action,Transformation 為惰性執(zhí)行,只記錄關(guān)系,當執(zhí)行Action算子時,才會觸發(fā)Transformatin的一系列執(zhí)行操作,生成新的RDD。一個Action對應(yīng)一個job。
spark與mapReduce的相比,具有優(yōu)勢:
1.計算速度快,spark采用了緩存機制,中間結(jié)果不用寫入HDFS再讀取,減少了IO消耗。
2.spark采用DAG引擎、以及執(zhí)行任務(wù)時,excutor中采用多線程方式處理,提升了任務(wù)執(zhí)行速度。container中不能多線程嗎?
2.mr只有map 和 reduce兩種語義,實現(xiàn)復(fù)雜的邏輯不夠靈活,spark提供了豐富的Transformation 和 action 算子,實現(xiàn)較為方便
二、spark的程序運行結(jié)構(gòu)
1. Driver:Driver中封裝了SparkContext,一個程序?qū)?yīng)一個driver,main函數(shù)運行在driver中,driver負責(zé)程序的解析。sparkContext加載程序運行的環(huán)境,及創(chuàng)建內(nèi)部的DAGscheduler和TaskSchedule。
TaskSchedule將任務(wù)調(diào)度到WordNode的Executor中,一和節(jié)點可以啟動多個Executor中,每個Executor中可以以多線程方式運行多個Task。
spark程序內(nèi)部執(zhí)行流程:
1.根據(jù)Transformation算子,查詢RDD間的依賴關(guān)系,構(gòu)建邏輯查詢計劃
2.構(gòu)建物理查詢計劃:由DAGSchedule 構(gòu)建DAG圖,根據(jù)算子是否為寬依賴進行Stage劃分,一個Stage中生成一組Task,組成一個TaskSet,task可并行執(zhí)行,提交給TaskSchedule
3.TaskSchedule將任務(wù)調(diào)度到Executor,以及負責(zé)任務(wù)失敗的重新調(diào)度,執(zhí)行較慢的任務(wù)啟動備份執(zhí)行等。
三、Spark-shuffle
劃分Stage時,類似MR的Shuffle,sparkShuffle分為 shuffleWrite 和 shuffleRead階段,
shuffleWrite:將Task中間結(jié)果寫入磁盤
shuffleRead:從磁盤拉取數(shù)據(jù)到內(nèi)存中進行并行計算
shuffleWrite 方式有:
基于Hash:將每一個mapTask的結(jié)果對ReduceTaskNum取余,將結(jié)果寫入本地小文件,例,一個Executor中有2個MapTask,3個ReduceTask,則每個MapTask會生成3個小文件,分別對應(yīng)3個ReduceTask,在寫入小文件時,會先將數(shù)據(jù)寫到buffer緩沖區(qū),再寫入小文件?;贖ash的方式會生成大量小文件,增加IO消耗,且消耗緩沖區(qū)。
基于Hash的優(yōu)化版:如果一個Excutor中有多個MapTask,余數(shù)相同的會寫入相同的小文件中,例,一個Executor中有2個MapTask,3個ReduceTask,則該Executor中會生成3個小文件,減少了小文件個數(shù),但未減少緩沖區(qū)消耗。
基于Sort:
shuffleRead 方式:
基于Hash的優(yōu)化版、基于Sort 的shuffleWrite 采用相同的shuffleRead實現(xiàn),將拉取的數(shù)據(jù)寫入到HashMap中,如果需要對key進行排序,則排序后寫入HashMap中(HashMap本身是無序的,用的是linkHashMap?),HasHMap寫滿后,溢出一個小文件到磁盤中,最后將多個小文件進行歸并排序合成大文件,進行處理輸出。
四、Spark運行模型
1.本地local方式運行 spark-shell
2.獨立集群模式
3.結(jié)合其他調(diào)度系統(tǒng),例如Spark-on-YARN
參見?https://blog.csdn.net/swing2008/article/details/60869183
五、常用算子
reduceBykey、groupByKey等
SparkSQL
一、sparkSql:基于spark的SQL引擎,提供更加豐富的數(shù)據(jù)源及API:DF、DS;可通過SQL、DF、DS來對數(shù)據(jù)進行處理。
使用SQL操作Spark組件,進行數(shù)據(jù)分析。
與spark core相比:
1. 提供數(shù)據(jù)集類型DF(schema +RDD)、DS 處理結(jié)構(gòu)化數(shù)據(jù),比直接操作RDD簡單方便
2. 提供更多靈活A(yù)PI,例如讀寫多種數(shù)據(jù)源的數(shù)據(jù),json等,可以通過API直接解析
3.內(nèi)置優(yōu)化器catalyst自動優(yōu)化程序(優(yōu)化邏輯查詢計劃等),而且效率較高,若直接操作RDD,還需要自己優(yōu)化程序。
相比直接利用Spark SQL處理數(shù)據(jù),利用 spark DF/DS 處理數(shù)據(jù),可以實現(xiàn)復(fù)雜的語義邏輯,例如實現(xiàn)算法等,用SQL處理較復(fù)雜的邏輯實現(xiàn)很困難。
RDD的算子,DF、DS基本都支持,RDD可處理 結(jié)構(gòu)化/非結(jié)構(gòu)化 數(shù)據(jù),DF/DS 只能處理結(jié)構(gòu)化數(shù)據(jù)。三者可相互轉(zhuǎn)化。
spark SQL: 適用于交互式查詢,實時查詢,數(shù)據(jù)量不太大,響應(yīng)時間快的操作。
Hive:適用于大規(guī)模的數(shù)據(jù)查詢分析,對響應(yīng)時間要求較低的操作。(底層是mr)
廣泛應(yīng)用場景:
部署 spark thrift server, 接受BI系統(tǒng)的SQL請求,轉(zhuǎn)化為spark 引擎處理,讀取hive中的數(shù)據(jù)。
二、程序流程
三、Spark Streaming
將流式計算轉(zhuǎn)化為一批很小的,確定的批處理作業(yè)。以秒(分鐘等)為單位將作業(yè)切分成很小的離散的作業(yè)(以時間為單位切分數(shù)據(jù)流)。 ?低延遲,秒級別。
特有算子:
window
mapWithStatus:類似Spark自己維護的分布式HashMap(不需要再借助外部存儲,節(jié)約與外部存儲交互的開銷),狀態(tài)信息以KeyValue形式存儲狀態(tài)信息。實時操作中的狀態(tài)信息。適合保存短時間內(nèi)的狀態(tài)信息用于分析。