http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations
Spark中的某些操作會觸發(fā)稱為shuffle的事件。 shuffle是Spark的重新分配數據的機制,因此它可以跨分區(qū)進行不同的分組。 這通常涉及跨執(zhí)行程序和機器復制數據,使得shuffle成為復雜且昂貴的操作。
為了理解在shuffle期間發(fā)生的事情,我們可以考慮reduceByKey操作的示例。 reduceByKey操作生成一個新的RDD(重新分區(qū)),其中單個鍵的所有值都組合成一個元組,是 key 和 對與該key關聯(lián)的所有value執(zhí)行reduce函數的結果。 挑戰(zhàn)在于,并非單個key的所有值都必須位于同一個分區(qū),甚至是同一個機器上,但它們必須位于同一位置才能計算結果。
背景:
在Spark中,數據通常不跨分區(qū)分布,以便在特定操作的必要位置。 在計算過程中,單個任務將在單個分區(qū)上運行 - 因此,要組織單個reduceByKey reduce任務執(zhí)行的所有數據,Spark需要執(zhí)行全部操作。 它必須從所有分區(qū)讀取以查找所有鍵的所有值,然后將各個值組合在一起以計算每個鍵的最終結果 - 這稱為shuffle。
盡管新shuffle數據的每個分區(qū)中的元素集將是確定性的,并且分區(qū)本身的排序也是如此,但這些元素的排序不是。 如果在shuffle后需要可預測的有序數據,則可以使用:
? ? ? ? ? ? ? mapPartitions使用例如.sorted對每個分區(qū)進行排序;
? ? ? ? ? ? ? repartitionAndSortWithinPartitions在同時重新分區(qū)的同時有效地對分區(qū)進行排序;
? ? ? ? ? ? ? sortBy來創(chuàng)建一個全局排序的RDD
可以導致shuffle的操作包括重新分區(qū)操作,例如重新分區(qū)和合并,“ByKey操作(計數除外)”,如groupByKey和reduceByKey,以及聯(lián)合操作,如?cogroup?and?join.
性能影響:
Shuffle是一項昂貴的操作,因為它涉及磁盤I / O,數據序列化和網絡I / O. 為了組織shuffle的數據,
Spark生成多組任務 -map?tasks以組織數據,以及一組?reduce?tasks來聚合它。 這個術語來自MapReduce,并不直接與Spark的map和reduce操作有關。
在內部,各個map任務的結果會保留在內存中,直到它們無法fit。 然后,這些基于目標分區(qū)進行排序并寫入單個文件。 在reduce方面,tasks 讀取相關的排序的塊。
某些shuffle操作會消耗大量的堆內存,因為它們使用內存中的數據結構來在傳輸記錄之前或之后組織記錄。 具體來說,reduceByKey和aggregateByKey在map側創(chuàng)建這些結構,并且'ByKey操作在reduce側生成這些結構。 當數據不適合內存時,Spark會將這些表溢出到磁盤,從而導致磁盤I / O的額外開銷和垃圾收集增加。
Shuffle還會在磁盤上生成大量中間文件。 從Spark 1.3開始,這些文件將被保留,直到不再使用相應的RDD并進行垃圾回收。 這樣做是為了在重新計算 lineage 時不需要重新創(chuàng)建shuffle文件。 如果應用程序保留對這些RDD的引用或者GC不經常啟動,則垃圾收集可能僅在很長一段時間后才會發(fā)生。 這意味著長時間運行的Spark作業(yè)可能會占用大量磁盤空間。
配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。
可以通過調整各種配置參數來調整shuffle行為。See the ‘Shuffle Behavior’ section within the?Spark Configuration Guide.