Shuffle操作

http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations

Spark中的某些操作會觸發(fā)稱為shuffle的事件。 shuffle是Spark的重新分配數據的機制,因此它可以跨分區(qū)進行不同的分組。 這通常涉及跨執(zhí)行程序和機器復制數據,使得shuffle成為復雜且昂貴的操作。

為了理解在shuffle期間發(fā)生的事情,我們可以考慮reduceByKey操作的示例。 reduceByKey操作生成一個新的RDD(重新分區(qū)),其中單個鍵的所有值都組合成一個元組,是 key 和 對與該key關聯(lián)的所有value執(zhí)行reduce函數的結果。 挑戰(zhàn)在于,并非單個key的所有值都必須位于同一個分區(qū),甚至是同一個機器上,但它們必須位于同一位置才能計算結果。

背景:

在Spark中,數據通常不跨分區(qū)分布,以便在特定操作的必要位置。 在計算過程中,單個任務將在單個分區(qū)上運行 - 因此,要組織單個reduceByKey reduce任務執(zhí)行的所有數據,Spark需要執(zhí)行全部操作。 它必須從所有分區(qū)讀取以查找所有鍵的所有值,然后將各個值組合在一起以計算每個鍵的最終結果 - 這稱為shuffle。

盡管新shuffle數據的每個分區(qū)中的元素集將是確定性的,并且分區(qū)本身的排序也是如此,但這些元素的排序不是。 如果在shuffle后需要可預測的有序數據,則可以使用:

? ? ? ? ? ? ? mapPartitions使用例如.sorted對每個分區(qū)進行排序;

? ? ? ? ? ? ? repartitionAndSortWithinPartitions在同時重新分區(qū)的同時有效地對分區(qū)進行排序;

? ? ? ? ? ? ? sortBy來創(chuàng)建一個全局排序的RDD

可以導致shuffle的操作包括重新分區(qū)操作,例如重新分區(qū)和合并,“ByKey操作(計數除外)”,如groupByKey和reduceByKey,以及聯(lián)合操作,如?cogroup?and?join.

性能影響:

Shuffle是一項昂貴的操作,因為它涉及磁盤I / O,數據序列化和網絡I / O. 為了組織shuffle的數據,

Spark生成多組任務 -map?tasks以組織數據,以及一組?reduce?tasks來聚合它。 這個術語來自MapReduce,并不直接與Spark的map和reduce操作有關。

在內部,各個map任務的結果會保留在內存中,直到它們無法fit。 然后,這些基于目標分區(qū)進行排序并寫入單個文件。 在reduce方面,tasks 讀取相關的排序的塊。

某些shuffle操作會消耗大量的堆內存,因為它們使用內存中的數據結構來在傳輸記錄之前或之后組織記錄。 具體來說,reduceByKey和aggregateByKey在map側創(chuàng)建這些結構,并且'ByKey操作在reduce側生成這些結構。 當數據不適合內存時,Spark會將這些表溢出到磁盤,從而導致磁盤I / O的額外開銷和垃圾收集增加。

Shuffle還會在磁盤上生成大量中間文件。 從Spark 1.3開始,這些文件將被保留,直到不再使用相應的RDD并進行垃圾回收。 這樣做是為了在重新計算 lineage 時不需要重新創(chuàng)建shuffle文件。 如果應用程序保留對這些RDD的引用或者GC不經常啟動,則垃圾收集可能僅在很長一段時間后才會發(fā)生。 這意味著長時間運行的Spark作業(yè)可能會占用大量磁盤空間。

配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。

可以通過調整各種配置參數來調整shuffle行為。See the ‘Shuffle Behavior’ section within the?Spark Configuration Guide.

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容