Spark面試題

1. spark如何實(shí)現(xiàn)容錯性?

spark的容錯性有2個手段, 檢查點(diǎn)和RDD血統(tǒng)

  • 檢查點(diǎn): checkpoint, 將RDD數(shù)據(jù)持久化到hdfs中, 并打斷rdd的linage
  • RDD血統(tǒng): 每個RDD都記錄有自己的血統(tǒng), 一個計(jì)算序列, 一旦有哪個RDD分區(qū)丟失或其它問題, 可以根據(jù)自身的RDD重新計(jì)算,
    重算可以在某個幾點(diǎn)分區(qū)下執(zhí)行, 不會導(dǎo)致系統(tǒng)回滾

2. spark和hadoop的mr有什么區(qū)別

首先, 兩者都是基于MR的計(jì)算模型, 但有很多不同

  • hadoop的一個job, 只有map操作和reduce操作, 多數(shù)據(jù)的表達(dá)能力欠缺, 每次reduce后要將數(shù)據(jù)存到hdfs上; 為了完成一個目標(biāo)往往要串行執(zhí)行多個job, 造成大量的HDFS IO浪費(fèi)
  • spark將1個job劃分為多個stage, stage的劃分是通過RDD之間的依賴關(guān)系判定的; 每個stage產(chǎn)生的結(jié)果會存在內(nèi)存中, 避免多個stage串行的情況下對hdfs的讀寫請求. 而且spark擁有很多算子, 數(shù)據(jù)表現(xiàn)力上很強(qiáng)

3. RDD中的寬窄依賴? 有什么作用?

  • 窄依賴是指父RDD的每個分區(qū)都只被子RDD的一個或幾個分區(qū)所使用(不是依賴所有的父RDD分區(qū))。
    寬依賴就是指父RDD的分區(qū)被多個子RDD的分區(qū)所依賴。
    例如,map就是一種窄依賴,而join則會導(dǎo)致寬依賴,主要是看有沒有shuffle操作。
  • 寬窄依賴的作用是用來劃分stage。
  • 窄依賴分為:
    • OneToOneDependency: 子RDD的一個分區(qū)只依賴與父RDD的一個分區(qū)
    • RangeDependency: 子RDD的一個分區(qū)依賴父RDD的多個分區(qū), 這種依賴只在UnionRDD所使用。

4. cache和persist的區(qū)別?

它們都是用來進(jìn)行緩存RDD的。

  • cache是特定的persist,rdd中cache的緩存級別是MEMORY_ONLY,cache調(diào)用了persist;
  • persist可以設(shè)置不同的緩存級別。
  • cache和persist, checkponit都是懶計(jì)算的, 需要執(zhí)行action算子時才真正觸發(fā)執(zhí)行動作;
    因此需要在cache(), checkpint()后面加上類似foreach(_ => Unit)的算子進(jìn)行觸發(fā)

一共有幾種緩存級別

  • MEMORY_ONLY:數(shù)據(jù)保存在內(nèi)存中,如果內(nèi)存不夠,數(shù)據(jù)可能就不會持久化;
  • MEMORY_AND_DISK:數(shù)據(jù)優(yōu)先保存在內(nèi)存中,如果內(nèi)存不夠則會存到磁盤中;
  • MEMORY_ONLY_SER:和MEMORY_ONLY類似,區(qū)別是會將RDD中的數(shù)據(jù)進(jìn)行序列化,這種方式更加節(jié)省內(nèi)存;
  • MEMORY_AND_DISK_SER:和MEMORY_AND_DISK類似,區(qū)別是會將RDD中的數(shù)據(jù)進(jìn)行序列化,這種方式更加節(jié)省內(nèi)存;
  • DISK_ONLY:將數(shù)據(jù)全部寫入磁盤文件中;
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等:這種有后綴_2的,代表的是將每個持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯。

5. 廣播變量和累加器?

  • 廣播變量
    廣播變量是把一個集合, 通過driver節(jié)點(diǎn)發(fā)給每個executor, executor只能使用這個集合但不能修改這個集合. 使用廣播變量時, driver要先把一個RDD進(jìn)行collect(), 然后再廣播發(fā)送出去, 因此, driver的網(wǎng)絡(luò)壓力就會很大
  • 累加器
    累加器是在Driver端記錄的一個全局值, 在driver聲明, 在executor端更新, 且只能觸發(fā)add()方法, 最終由driver端調(diào)用collect得到最終結(jié)果

6. groupByKey和reduceByKey哪個好

reduceByKey更好

  • reduceByKey中指定的聚合函數(shù), 會在map端先進(jìn)性聚合, 再發(fā)往reducer進(jìn)行計(jì)算;
  • 而groupByKey直接將數(shù)據(jù)發(fā)到reducer, 網(wǎng)絡(luò)通信量會大很多

7. kafka

  • Kafka 分布式的情況下,如何保證消息的順序?
    kafka分布式情況下, 數(shù)據(jù)只能保證局部有序, 不能保證全局有序. 什么意思?
    • kafka是劃分partition的, 一個partition因?yàn)楸灰粋€write agead log記錄, 因此同一個partition下的數(shù)據(jù)可以有序;
    • 不同partition之間, 不能保證順序。
      不過絕大多數(shù)用戶都可以通過 message key 來定義,因?yàn)橥粋€ key 的 message 可以保證只發(fā)送到同一個 Partition。
      比如說 key 是 user id,table row id 等等,所以同一個 user 或者同一個 record 的消息永遠(yuǎn)只會發(fā)送到同一個 Partition上,保證了同一個 user 或 record 的順序。
    • SparkStreaming可以設(shè)置手動提交offset, 已處理過的offset可以保存在zookeeper中
  • Spark streaming 讀取kafka數(shù)據(jù)的兩種方式?
    1. 基于Receiver方式
      需要使用單獨(dú)的Receiver線程來異步獲取Kafka數(shù)據(jù)。Spark Streaming啟動時,會在Executor中同時啟動Receiver異步線程用于從Kafka持續(xù)獲取數(shù)據(jù),獲取的數(shù)據(jù)先存儲在Receiver中(存儲方式由StorageLevel決定),后續(xù),當(dāng)Batch Job觸發(fā)后,這些數(shù)據(jù)會被轉(zhuǎn)移到剩下的Executor中被處理。處理完畢后,Receiver會自動更新Zookeeper中的Offset。
    2. 基于Direct(No Receiver)方式
      不需要使用單獨(dú)的Receiver線程從Kafka獲取數(shù)據(jù)。Spark Streaming Batch Job觸發(fā)時,Driver端確定要讀取的Topic-Partition的OffsetRange,然后由Executor并行從Kafka各Partition讀取數(shù)據(jù)并計(jì)算。
  • 如果Spark Streaming停掉了,如何保證Kafka的重新運(yùn)作是合理的呢
    這主要依賴Spark Streaming的快速故障恢復(fù)機(jī)制.
    • 傳統(tǒng)流處理系統(tǒng)會在其他節(jié)點(diǎn)上重啟失敗的連續(xù)算子,這種方法需要等待轉(zhuǎn)移到的節(jié)點(diǎn)先處理完之前的任務(wù)才能繼續(xù)執(zhí)行新任務(wù);
    • 但在Spark Streaming中, 這個節(jié)點(diǎn)的任務(wù)將均勻地分散到集群中的節(jié)點(diǎn)進(jìn)行計(jì)算,相對于傳遞故障恢復(fù)機(jī)制能夠更快地恢復(fù)。
  • 說說Spark Streaming 是如何收集和處理數(shù)據(jù)的,一句話?
    在 Spark Streaming 中,數(shù)據(jù)采集是逐條進(jìn)行的,而數(shù)據(jù)處理是按批 mini batch進(jìn)行的,因此 Spark Streaming 會先設(shè)置好批處理間隔 batch duration,當(dāng)超過批處理間隔就會把采集到的數(shù)據(jù)匯總起來成為一批數(shù)據(jù)交給系統(tǒng)去處理。

8. Spark處理數(shù)據(jù)傾斜有什么好方法

簡單一句: Spark 數(shù)據(jù)傾斜的幾種場景以及對應(yīng)的解決方案,包括避免數(shù)據(jù)源傾斜,調(diào)整并行度,使用自定義 Partitioner,使用 Map 側(cè) Join 代替 Reduce 側(cè) Join(內(nèi)存表合并),給傾斜 Key 加上隨機(jī)前綴等。下面分情況討論:

  • 如果是多個不同的key落在一個分區(qū)導(dǎo)致的數(shù)據(jù)傾斜

    • 可以調(diào)大并行度:
    spark.default.parallelism : rdd分區(qū)數(shù)
    spark.sql.shuffle.partitions: SparkSQL通過通過這個值修改并發(fā)數(shù)。
    
    • 或者可以自定義Partitioner

    spark比hive快的原因主要有2, 一個是shuffle后的結(jié)果可以存儲在磁盤中, 另一個是DAG執(zhí)行計(jì)劃里, 多個job串聯(lián)時結(jié)果不必回寫到HDFS中

  • 如果是join產(chǎn)生的數(shù)據(jù)傾斜, 且發(fā)現(xiàn)其中有一個表很小
    • 可以將小表通過broadcast廣播出去
      這樣完全避免了shuffle, 讓join完全變成maper端的join
    • 或者可以把大表的key增加隨機(jī)前綴, 小表擴(kuò)大n倍進(jìn)行join, 最后再刪除key的前綴即可
  • 如果是1個非常非常大的key導(dǎo)致的數(shù)據(jù)傾斜
    • 可以將這個key拿出來單獨(dú)處理, 給這個key加上隨機(jī)前綴, 打散這個key
    • 或者把這些傾斜的key過濾出來后的RDD或DataFrame廣播出去, 讓其進(jìn)行map端的聚合

9. Spark為什么快,Spark SQL 一定比 Hive 快嗎

spark比hive快的原因主要有2, 一個是shuffle后的結(jié)果可以存儲在磁盤中, 另一個是DAG執(zhí)行計(jì)劃里, 多個job串聯(lián)時結(jié)果不必回寫到HDFS中.
因此, spark是否比hive快要是有一定條件的, 一個反例是: 當(dāng)查詢只有1次shuffle操作的時候, hive往往比spark還快, 因?yàn)镸R的map任務(wù)往往生成的更多

Select month_id, sum(sales) from T group by month_id;  --反例

10. Job 和 Task 怎么理解

  • Job
    Job來源于每次的action算子, 每用action算子獲取一次結(jié)果就會生成一個Job, Job提交給DAGScheduler后, 會分解成task和stage
  • Task
    Task是在每個Executor的CPU上執(zhí)行的任務(wù), Task 分為 ShuffleMapTask 和 ResultTask 兩種
    • ShuffleMapStage 中的 Task 為 ShuffleMapTask
    • ResultStage 中的 Task 為 ResultTask

11. Spark作業(yè)提交-執(zhí)行流程是怎么樣的

  1. spark-submit 提交代碼后,執(zhí)行new SparkContext(),在SparkContext里構(gòu)造DAGSchedulerTaskScheduler。
  2. DAGScheduler為1個Job生成多個Stage(stage分為ShuffleMapStage和ResultStage), 每個Stage創(chuàng)建一個TaskSet.
  3. TaskScheduler會把每一個TaskSet里的Task,提交到Executor上執(zhí)行。
  4. Executor上有線程池(CachedTHreadPool), 每收到一個Task. 先封裝成RunnerTask, 再從線程池中取出一個線程執(zhí)行這個Task

12. Spark UDF?

  • Spark SQL UDF 其實(shí)是一個Scala函數(shù),被catalyst封裝成一個 Expression 結(jié)點(diǎn),最后通過 eval 方法計(jì)根據(jù)當(dāng)前 Row 計(jì)算 UDF 的結(jié)果。

  • UDF 對表中的單行進(jìn)行轉(zhuǎn)換,以便為每行生成單個對應(yīng)的輸出值。

  • 執(zhí)行步驟:

      1. 定義函數(shù)
      1. 將函數(shù)注冊為UDF
        以下示例代碼使用 SQL 別名為 CTOF 來注冊我們的轉(zhuǎn)換 UDF,然后在 SQL 查詢使用它來轉(zhuǎn)換每個城市的溫度。
    val df = sqlContext.read.json("temperatures.json")
    df.registerTempTable("citytemps")
    // Register the UDF with our SQLContext
    
    sqlContext.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0))
    sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
    
    
  • Spark SQL定義了UDF1UDF22共22個類,UDF 最多支持22個輸入?yún)?shù)。

13. Spark UDAF?

  • 用戶定義的聚合函數(shù)(User-defined aggregate functions, UDAF)同時處理多行,并且返回一個結(jié)果,通常結(jié)合使用 GROUP BY 語句(例如 COUNT 或 SUM)。為了簡單起見,我們將實(shí)現(xiàn)一個叫 SUMPRODUCT 的 UDAF 來計(jì)算以庫存來分組的所有車輛零售價值
  • 使用方法: 通過擴(kuò)展UserDefinedAggregateFunction類使用
    下面例子中我們定義了一個名為 SumProductAggregateFunction 的類,并且為它取了一個名為 SUMPRODUCT 的別名
    • 輸入文件格式
    {"Make":"Honda","Model":"Pilot","RetailValue":32145.0,"Stock":4}
    {"Make":"Honda","Model":"Civic","RetailValue":19575.0,"Stock":11}
    {"Make":"Honda","Model":"Ridgeline","RetailValue":42870.0,"Stock":2}
    {"Make":"Jeep","Model":"Cherokee","RetailValue":23595.0,"Stock":13}
    {"Make":"Jeep","Model":"Wrangler","RetailValue":27895.0,"Stock":4}
    {"Make":"Volkswagen","Model":"Passat","RetailValue":22440.0,"Stock":2}
    
    • 使用UDAF
      // 定義SparkSQL UDAF邏輯
      private class SumProductAggregateFunction extends UserDefinedAggregateFunction {
        // 定義UDF輸入schema: (Double price, Long quantity)
        def inputSchema: StructType = new StructType().add("price", DoubleType).add("quantity", LongType)
        // 定義輸出schema: Output = (Double total)
        def bufferSchema: StructType = new StructType().add("total", DoubleType)
    
        def dataType: DataType = DoubleType
        def deterministic: Boolean = true // true: our UDAF's output given an input is deterministic
    
        def initialize(buffer: MutableAggregationBuffer): Unit = {
          buffer.update(0, 0.0)           // 初始化 result to 0.0
        }
    
        def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
          val sum   = buffer.getDouble(0) // Intermediate result to be updated
          val price = input.getDouble(0)  // First input parameter
          val qty   = input.getLong(1)    // Second input parameter
          buffer.update(0, sum + (price * qty))   // Update the intermediate result
        }
        // Merge intermediate result sums by adding them
        def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
          buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
        }
        // THe final result will be contained in 'buffer'
        def evaluate(buffer: Row): Any = {
          buffer.getDouble(0)
        }
      }
    
      def main (args: Array[String]) {
        val conf       = new SparkConf().setAppName("Scala UDAF Example")
        val sc         = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val testDF = sqlContext.read.json("inventory.json")
        testDF.registerTempTable("inventory") 
        // 注冊UDAF到SQLContext
        sqlContext.udf.register("SUMPRODUCT", new SumProductAggregateFunction)
        sqlContext.sql("SELECT Make, SUMPRODUCT(RetailValue,Stock) AS InventoryValuePerMake FROM inventory GROUP BY Make").show()
      }
    
    
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容