1. spark如何實(shí)現(xiàn)容錯性?
spark的容錯性有2個手段, 檢查點(diǎn)和RDD血統(tǒng)
- 檢查點(diǎn): checkpoint, 將RDD數(shù)據(jù)持久化到hdfs中, 并打斷rdd的linage
- RDD血統(tǒng): 每個RDD都記錄有自己的血統(tǒng), 一個計(jì)算序列, 一旦有哪個RDD分區(qū)丟失或其它問題, 可以根據(jù)自身的RDD重新計(jì)算,
重算可以在某個幾點(diǎn)分區(qū)下執(zhí)行, 不會導(dǎo)致系統(tǒng)回滾
2. spark和hadoop的mr有什么區(qū)別
首先, 兩者都是基于MR的計(jì)算模型, 但有很多不同
- hadoop的一個job, 只有map操作和reduce操作, 多數(shù)據(jù)的表達(dá)能力欠缺, 每次reduce后要將數(shù)據(jù)存到hdfs上; 為了完成一個目標(biāo)往往要串行執(zhí)行多個job, 造成大量的HDFS IO浪費(fèi)
- spark將1個job劃分為多個stage, stage的劃分是通過RDD之間的依賴關(guān)系判定的; 每個stage產(chǎn)生的結(jié)果會存在內(nèi)存中, 避免多個stage串行的情況下對hdfs的讀寫請求. 而且spark擁有很多算子, 數(shù)據(jù)表現(xiàn)力上很強(qiáng)
3. RDD中的寬窄依賴? 有什么作用?
-
窄依賴是指父RDD的每個分區(qū)都只被子RDD的一個或幾個分區(qū)所使用(不是依賴所有的父RDD分區(qū))。
寬依賴就是指父RDD的分區(qū)被多個子RDD的分區(qū)所依賴。
例如,map就是一種窄依賴,而join則會導(dǎo)致寬依賴,主要是看有沒有shuffle操作。 - 寬窄依賴的作用是用來劃分stage。
- 窄依賴分為:
-
OneToOneDependency: 子RDD的一個分區(qū)只依賴與父RDD的一個分區(qū) -
RangeDependency: 子RDD的一個分區(qū)依賴父RDD的多個分區(qū), 這種依賴只在UnionRDD所使用。
-
4. cache和persist的區(qū)別?
它們都是用來進(jìn)行緩存RDD的。
- cache是特定的persist,rdd中cache的緩存級別是MEMORY_ONLY,cache調(diào)用了persist;
- persist可以設(shè)置不同的緩存級別。
- cache和persist, checkponit都是懶計(jì)算的, 需要執(zhí)行action算子時才真正觸發(fā)執(zhí)行動作;
因此需要在cache(), checkpint()后面加上類似foreach(_ => Unit)的算子進(jìn)行觸發(fā)
一共有幾種緩存級別
- MEMORY_ONLY:數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠,數(shù)據(jù)可能就不會持久化;
- MEMORY_AND_DISK:數(shù)據(jù)優(yōu)先保存在內(nèi)存中,如果內(nèi)存不夠則會存到磁盤中;
- MEMORY_ONLY_SER:和MEMORY_ONLY類似,區(qū)別是會將RDD中的數(shù)據(jù)進(jìn)行序列化,這種方式更加節(jié)省內(nèi)存;
- MEMORY_AND_DISK_SER:和MEMORY_AND_DISK類似,區(qū)別是會將RDD中的數(shù)據(jù)進(jìn)行序列化,這種方式更加節(jié)省內(nèi)存;
- DISK_ONLY:將數(shù)據(jù)全部寫入磁盤文件中;
- MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等:這種有后綴_2的,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯。
5. 廣播變量和累加器?
-
廣播變量
廣播變量是把一個集合, 通過driver節(jié)點(diǎn)發(fā)給每個executor, executor只能使用這個集合但不能修改這個集合. 使用廣播變量時, driver要先把一個RDD進(jìn)行collect(), 然后再廣播發(fā)送出去, 因此, driver的網(wǎng)絡(luò)壓力就會很大 -
累加器
累加器是在Driver端記錄的一個全局值, 在driver聲明, 在executor端更新, 且只能觸發(fā)add()方法, 最終由driver端調(diào)用collect得到最終結(jié)果
6. groupByKey和reduceByKey哪個好
reduceByKey更好
- reduceByKey中指定的聚合函數(shù), 會在map端先進(jìn)性聚合, 再發(fā)往reducer進(jìn)行計(jì)算;
- 而groupByKey直接將數(shù)據(jù)發(fā)到reducer, 網(wǎng)絡(luò)通信量會大很多
7. kafka
-
Kafka 分布式的情況下,如何保證消息的順序?
kafka分布式情況下, 數(shù)據(jù)只能保證局部有序, 不能保證全局有序. 什么意思?- kafka是劃分partition的, 一個partition因?yàn)楸灰粋€write agead log記錄, 因此同一個partition下的數(shù)據(jù)可以有序;
- 不同partition之間, 不能保證順序。
不過絕大多數(shù)用戶都可以通過 message key 來定義,因?yàn)橥粋€ key 的 message 可以保證只發(fā)送到同一個 Partition。
比如說 key 是 user id,table row id 等等,所以同一個 user 或者同一個 record 的消息永遠(yuǎn)只會發(fā)送到同一個 Partition上,保證了同一個 user 或 record 的順序。 - SparkStreaming可以設(shè)置手動提交offset, 已處理過的offset可以保存在zookeeper中
-
Spark streaming 讀取kafka數(shù)據(jù)的兩種方式?
- 基于Receiver方式
需要使用單獨(dú)的Receiver線程來異步獲取Kafka數(shù)據(jù)。Spark Streaming啟動時,會在Executor中同時啟動Receiver異步線程用于從Kafka持續(xù)獲取數(shù)據(jù),獲取的數(shù)據(jù)先存儲在Receiver中(存儲方式由StorageLevel決定),后續(xù),當(dāng)Batch Job觸發(fā)后,這些數(shù)據(jù)會被轉(zhuǎn)移到剩下的Executor中被處理。處理完畢后,Receiver會自動更新Zookeeper中的Offset。 - 基于Direct(No Receiver)方式
不需要使用單獨(dú)的Receiver線程從Kafka獲取數(shù)據(jù)。Spark Streaming Batch Job觸發(fā)時,Driver端確定要讀取的Topic-Partition的OffsetRange,然后由Executor并行從Kafka各Partition讀取數(shù)據(jù)并計(jì)算。
- 基于Receiver方式
-
如果Spark Streaming停掉了,如何保證Kafka的重新運(yùn)作是合理的呢
這主要依賴Spark Streaming的快速故障恢復(fù)機(jī)制.- 傳統(tǒng)流處理系統(tǒng)會在其他節(jié)點(diǎn)上重啟失敗的連續(xù)算子,這種方法需要等待轉(zhuǎn)移到的節(jié)點(diǎn)先處理完之前的任務(wù)才能繼續(xù)執(zhí)行新任務(wù);
- 但在Spark Streaming中, 這個節(jié)點(diǎn)的任務(wù)將均勻地分散到集群中的節(jié)點(diǎn)進(jìn)行計(jì)算,相對于傳遞故障恢復(fù)機(jī)制能夠更快地恢復(fù)。
-
說說Spark Streaming 是如何收集和處理數(shù)據(jù)的,一句話?
在 Spark Streaming 中,數(shù)據(jù)采集是逐條進(jìn)行的,而數(shù)據(jù)處理是按批 mini batch進(jìn)行的,因此 Spark Streaming 會先設(shè)置好批處理間隔 batch duration,當(dāng)超過批處理間隔就會把采集到的數(shù)據(jù)匯總起來成為一批數(shù)據(jù)交給系統(tǒng)去處理。
8. Spark處理數(shù)據(jù)傾斜有什么好方法
簡單一句: Spark 數(shù)據(jù)傾斜的幾種場景以及對應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義 Partitioner,使用 Map 側(cè) Join 代替 Reduce 側(cè) Join(內(nèi)存表合并),給傾斜 Key 加上隨機(jī)前綴等。下面分情況討論:
-
如果是多個不同的key落在一個分區(qū)導(dǎo)致的數(shù)據(jù)傾斜
- 可以調(diào)大并行度:
spark.default.parallelism : rdd分區(qū)數(shù) spark.sql.shuffle.partitions: SparkSQL通過通過這個值修改并發(fā)數(shù)。- 或者可以自定義Partitioner
spark比hive快的原因主要有2, 一個是shuffle后的結(jié)果可以存儲在磁盤中, 另一個是DAG執(zhí)行計(jì)劃里, 多個job串聯(lián)時結(jié)果不必回寫到HDFS中
-
如果是join產(chǎn)生的數(shù)據(jù)傾斜, 且發(fā)現(xiàn)其中有一個表很小
- 可以將小表通過broadcast廣播出去
這樣完全避免了shuffle, 讓join完全變成maper端的join - 或者可以把大表的key增加隨機(jī)前綴, 小表擴(kuò)大n倍進(jìn)行join, 最后再刪除key的前綴即可
- 可以將小表通過broadcast廣播出去
-
如果是1個非常非常大的key導(dǎo)致的數(shù)據(jù)傾斜
- 可以將這個key拿出來單獨(dú)處理, 給這個key加上隨機(jī)前綴, 打散這個key
- 或者把這些傾斜的key過濾出來后的RDD或DataFrame廣播出去, 讓其進(jìn)行map端的聚合
9. Spark為什么快,Spark SQL 一定比 Hive 快嗎
spark比hive快的原因主要有2, 一個是shuffle后的結(jié)果可以存儲在磁盤中, 另一個是DAG執(zhí)行計(jì)劃里, 多個job串聯(lián)時結(jié)果不必回寫到HDFS中.
因此, spark是否比hive快要是有一定條件的, 一個反例是: 當(dāng)查詢只有1次shuffle操作的時候, hive往往比spark還快, 因?yàn)镸R的map任務(wù)往往生成的更多
Select month_id, sum(sales) from T group by month_id; --反例
10. Job 和 Task 怎么理解
-
Job
Job來源于每次的action算子, 每用action算子獲取一次結(jié)果就會生成一個Job, Job提交給DAGScheduler后, 會分解成task和stage -
Task
Task是在每個Executor的CPU上執(zhí)行的任務(wù), Task 分為 ShuffleMapTask 和 ResultTask 兩種- ShuffleMapStage 中的 Task 為 ShuffleMapTask
- ResultStage 中的 Task 為 ResultTask
11. Spark作業(yè)提交-執(zhí)行流程是怎么樣的
-
spark-submit提交代碼后,執(zhí)行new SparkContext(),在SparkContext里構(gòu)造DAGScheduler和TaskScheduler。 -
DAGScheduler為1個Job生成多個Stage(stage分為ShuffleMapStage和ResultStage), 每個Stage創(chuàng)建一個TaskSet. -
TaskScheduler會把每一個TaskSet里的Task,提交到Executor上執(zhí)行。 - Executor上有線程池(CachedTHreadPool), 每收到一個Task. 先封裝成RunnerTask, 再從線程池中取出一個線程執(zhí)行這個Task
12. Spark UDF?
Spark SQL UDF 其實(shí)是一個Scala函數(shù),被catalyst封裝成一個 Expression 結(jié)點(diǎn),最后通過 eval 方法計(jì)根據(jù)當(dāng)前 Row 計(jì)算 UDF 的結(jié)果。
UDF 對表中的單行進(jìn)行轉(zhuǎn)換,以便為每行生成單個對應(yīng)的輸出值。
-
執(zhí)行步驟:
- 定義函數(shù)
- 將函數(shù)注冊為UDF
以下示例代碼使用 SQL 別名為 CTOF 來注冊我們的轉(zhuǎn)換 UDF,然后在 SQL 查詢使用它來轉(zhuǎn)換每個城市的溫度。
- 將函數(shù)注冊為UDF
val df = sqlContext.read.json("temperatures.json") df.registerTempTable("citytemps") // Register the UDF with our SQLContext sqlContext.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0)) sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show() Spark SQL定義了
UDF1到UDF22共22個類,UDF 最多支持22個輸入?yún)?shù)。
13. Spark UDAF?
- 用戶定義的聚合函數(shù)(User-defined aggregate functions, UDAF)同時處理多行,并且返回一個結(jié)果,通常結(jié)合使用 GROUP BY 語句(例如 COUNT 或 SUM)。為了簡單起見,我們將實(shí)現(xiàn)一個叫 SUMPRODUCT 的 UDAF 來計(jì)算以庫存來分組的所有車輛零售價值
- 使用方法: 通過擴(kuò)展
UserDefinedAggregateFunction類使用
下面例子中我們定義了一個名為 SumProductAggregateFunction 的類,并且為它取了一個名為 SUMPRODUCT 的別名- 輸入文件格式
{"Make":"Honda","Model":"Pilot","RetailValue":32145.0,"Stock":4} {"Make":"Honda","Model":"Civic","RetailValue":19575.0,"Stock":11} {"Make":"Honda","Model":"Ridgeline","RetailValue":42870.0,"Stock":2} {"Make":"Jeep","Model":"Cherokee","RetailValue":23595.0,"Stock":13} {"Make":"Jeep","Model":"Wrangler","RetailValue":27895.0,"Stock":4} {"Make":"Volkswagen","Model":"Passat","RetailValue":22440.0,"Stock":2}- 使用UDAF
// 定義SparkSQL UDAF邏輯 private class SumProductAggregateFunction extends UserDefinedAggregateFunction { // 定義UDF輸入schema: (Double price, Long quantity) def inputSchema: StructType = new StructType().add("price", DoubleType).add("quantity", LongType) // 定義輸出schema: Output = (Double total) def bufferSchema: StructType = new StructType().add("total", DoubleType) def dataType: DataType = DoubleType def deterministic: Boolean = true // true: our UDAF's output given an input is deterministic def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0, 0.0) // 初始化 result to 0.0 } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val sum = buffer.getDouble(0) // Intermediate result to be updated val price = input.getDouble(0) // First input parameter val qty = input.getLong(1) // Second input parameter buffer.update(0, sum + (price * qty)) // Update the intermediate result } // Merge intermediate result sums by adding them def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0)) } // THe final result will be contained in 'buffer' def evaluate(buffer: Row): Any = { buffer.getDouble(0) } } def main (args: Array[String]) { val conf = new SparkConf().setAppName("Scala UDAF Example") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val testDF = sqlContext.read.json("inventory.json") testDF.registerTempTable("inventory") // 注冊UDAF到SQLContext sqlContext.udf.register("SUMPRODUCT", new SumProductAggregateFunction) sqlContext.sql("SELECT Make, SUMPRODUCT(RetailValue,Stock) AS InventoryValuePerMake FROM inventory GROUP BY Make").show() }