概論
較高的層次上,每個Spark應用程序都包含一個驅(qū)動程序,該程序運行用戶的main功能并在集群上執(zhí)行各種并行操作。Spark提供的主要抽象是彈性分布式數(shù)據(jù)集(RDD),它是跨群集節(jié)點分區(qū)的元素集合,可以并行操作。RDD是通過從Hadoop文件系統(tǒng)(或任何其他Hadoop支持的文件系統(tǒng))中的文件或驅(qū)動程序中的現(xiàn)有Scala集合開始并對其進行轉(zhuǎn)換來創(chuàng)建的。用戶還可以要求Spark 在內(nèi)存中保留 RDD,允許它在并行操作中有效地重用。最后,RDD會自動從節(jié)點故障中恢復。
Spark中的第二個抽象是可以在并行操作中使用的共享變量。默認情況下,當Spark并行運行一個函數(shù)作為不同節(jié)點上的一組任務時,它會將函數(shù)中使用的每個變量的副本發(fā)送給每個任務。有時,變量需要跨任務共享,或者在任務和驅(qū)動程序之間共享。Spark支持兩種類型的共享變量:廣播變量,可用于緩存所有節(jié)點的內(nèi)存中的值; 累加器,它們是僅“添加”到的變量,例如計數(shù)器和總和。
本指南以Spark支持的每種語言顯示了這些功能。如果您啟動Spark的交互式shell,最簡單的方法就是 - bin/spark-shell對于Scala shell或 bin/pysparkPython。
連接Spark
Spark 2.3.2的構建和分發(fā)默認情況下與Scala 2.11一起使用。(Spark也可以構建為與其他版本的Scala一起使用。)要在Scala中編寫應用程序,您需要使用兼容的Scala版本(例如2.11.X)。
要編寫Spark應用程序,需要在Spark上添加Maven依賴項。Spark可通過Maven Central獲得:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.3.2
如果希望訪問HDFS群集,則需要hadoop-client為您的HDFS版本添加依賴關系 。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,需要將一些Spark類導入程序中
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
初始化Spark
Spark程序必須做的第一件事是創(chuàng)建一個SparkContext對象,它告訴Spark如何訪問集群。要創(chuàng)建SparkContext您首先需要構建一個包含有關應用程序信息的SparkConf對象。
每個JVM只能激活一個SparkContext。stop()在創(chuàng)建新的SparkContext之前,您必須是活動的SparkContext。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
該appName參數(shù)是應用程序在群集UI上顯示的名稱。 master是Spark,Mesos或YARN群集URL,或以本地模式運行的特殊“本地”字符串。
使用shell
在Spark shell中,已經(jīng)在名為的變量中為您創(chuàng)建了一個特殊的解釋器感知SparkContext sc。制作自己的SparkContext將無法正常工作??梢允褂?-master參數(shù)設置上下文連接到的主服務器,并且可以通過將逗號分隔的列表傳遞給參數(shù)來將JAR添加到類路徑中--jars??梢酝ㄟ^向參數(shù)提供以逗號分隔的Maven坐標列表,將依賴項(例如Spark包)添加到shell會話中--packages。任何可能存在依賴關系的其他存儲庫(例如Sonatype)都可以傳遞給--repositories參數(shù)。例如,要bin/spark-shell在四個核心上運行,請使用:
$ ./bin/spark-shell --master local[4]
或者,要添加code.jar到其類路徑,請使用:
$ ./bin/spark-shell --master local[4] --jars code.jar
要使用Maven坐標包含依賴項:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
有關選項的完整列表,請運行spark-shell --help。在幕后, spark-shell調(diào)用更通用的spark-submit腳本。
彈性分布式數(shù)據(jù)集(RDD)
park圍繞彈性分布式數(shù)據(jù)集(RDD)的概念展開,RDD是可以并行操作的容錯的容錯集合。創(chuàng)建RDD有兩種方法:并行化 驅(qū)動程序中的現(xiàn)有集合,或引用外部存儲系統(tǒng)中的數(shù)據(jù)集,例如共享文件系統(tǒng),HDFS,HBase或提供Hadoop InputFormat的任何數(shù)據(jù)源。
并行化集合
并行集合通過調(diào)用創(chuàng)建SparkContext的parallelize一個現(xiàn)有的收集方法,在你的驅(qū)動程序(斯卡拉Seq)。復制集合的元素以形成可以并行操作的分布式數(shù)據(jù)集。例如,以下是如何創(chuàng)建包含數(shù)字1到5的并行化集合:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦創(chuàng)建,分布式數(shù)據(jù)集(distData)可以并行操作。
并行集合的一個重要參數(shù)是將數(shù)據(jù)集切割為的分區(qū)數(shù)。Spark將為集群的每個分區(qū)運行一個任務。
外部數(shù)據(jù)集
Spark可以從Hadoop支持的任何存儲源創(chuàng)建分布式數(shù)據(jù)集,包括本地文件系統(tǒng),HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
文本文件RDDS可以使用創(chuàng)建SparkContext的textFile方法。此方法需要一個URI的文件(本地路徑的機器上,或一個hdfs://,s3a://等URI),并讀取其作為行的集合。這是一個示例調(diào)用:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
創(chuàng)建后,distFile可以通過數(shù)據(jù)集操作執(zhí)行操作。例如,我們可以使用map和reduce操作添加所有行的大小,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。
有關使用Spark讀取文件的一些注意事項
如果在本地文件系統(tǒng)上使用路徑,則還必須可以在工作節(jié)點上的相同路徑上訪問該文件。將文件復制到所有工作者或使用網(wǎng)絡安裝的共享文件系統(tǒng)。
所有Spark的基于文件的輸入方法,包括textFile支持在目錄,壓縮文件和通配符上運行。例如,你可以使用textFile("/my/directory"),textFile("/my/directory/.txt")和textFile("/my/directory/.gz")。
該textFile方法還采用可選的第二個參數(shù)來控制文件的分區(qū)數(shù)。默認情況下,Spark為文件的每個塊創(chuàng)建一個分區(qū)(HDFS中默認為128MB),但您也可以通過傳遞更大的值來請求更多的分區(qū)。請注意,您不能擁有比塊少的分區(qū)。
除文本文件外,Spark的Scala API還支持其他幾種數(shù)據(jù)格式:
SparkContext.wholeTextFiles允許您讀取包含多個小文本文件的目錄,并將它們作為(文件名,內(nèi)容)對返回。這與之相反textFile,每個文件中每行返回一條記錄。分區(qū)由數(shù)據(jù)局部性決定,在某些情況下,可能導致分區(qū)太少。對于這些情況,wholeTextFiles提供可選的第二個參數(shù)來控制最小數(shù)量的分區(qū)。
對于SequenceFiles,使用SparkContext的
sequenceFile[K, V]方法,其中K和V是文件中鍵和值的類型。這些應該是Hadoop的Writable接口的子類,如IntWritable和Text。此外,Spark允許您為一些常見的Writable指定本機類型; 例如,sequenceFile[Int, String]將自動讀取IntWritables和文本。對于其他Hadoop InputFormats,您可以使用該SparkContext.hadoopRDD方法,該方法采用任意JobConf和輸入格式類,鍵類和值類。設置這些與使用輸入源的Hadoop作業(yè)相同。您還可以使用SparkContext.newAPIHadoopRDD基于“新”MapReduce API(org.apache.hadoop.mapreduce)的InputFormats 。
RDD.saveAsObjectFile并SparkContext.objectFile支持以包含序列化Java對象的簡單格式保存RDD。雖然這不像Avro這樣的專用格式有效,但它提供了一種保存任何RDD的簡便方法。
RDD操作
RDD支持兩種類型的操作:轉(zhuǎn)換(從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集)和操作(在數(shù)據(jù)集上運行計算后將值返回到驅(qū)動程序)。例如,map是一個轉(zhuǎn)換,它通過一個函數(shù)傳遞每個數(shù)據(jù)集元素,并返回一個表示結果的新RDD。另一方面,reduce是一個使用某個函數(shù)聚合RDD的所有元素并將最終結果返回給驅(qū)動程序的動作(盡管還有一個reduceByKey返回分布式數(shù)據(jù)集的并行)。
Spark中的所有轉(zhuǎn)換都是懶惰的,因為它們不會立即計算結果。相反,他們只記得應用于某些基礎數(shù)據(jù)集的轉(zhuǎn)換(例如文件)。僅當操作需要將結果返回到驅(qū)動程序時才會計算轉(zhuǎn)換。這種設計使Spark能夠更有效地運行。例如,我們可以意識到通過創(chuàng)建的數(shù)據(jù)集map將用于a reduce并僅返回reduce驅(qū)動程序的結果,而不是更大的映射數(shù)據(jù)集。
默認情況下,每次對其執(zhí)行操作時,都可以重新計算每個轉(zhuǎn)換后的RDD。但是,您也可以使用(或)方法在內(nèi)存中保留 RDD ,在這種情況下,Spark會在群集上保留元素,以便在下次查詢時更快地訪問。還支持在磁盤上保留RDD,或在多個節(jié)點之間復制。persistcache
基本操作
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行從外部文件定義基礎RDD。此數(shù)據(jù)集未加載到內(nèi)存中或以其他方式執(zhí)行:lines僅僅是指向文件的指針。第二行定義lineLengths為map轉(zhuǎn)換的結果。再次,lineLengths 是不是馬上計算,由于懶惰。最后,我們運行reduce,這是一個動作。此時,Spark將計算分解為在不同機器上運行的任務,并且每臺機器都運行其部分映射和本地縮減,僅返回其對驅(qū)動程序的答案。
如果我們lineLengths以后想再次使用,我們可以添加:
lineLengths.persist()
之前reduce,這將導致lineLengths在第一次計算之后保存在內(nèi)存中。
將函數(shù)傳遞給Spark
Spark的API在很大程度上依賴于在驅(qū)動程序中傳遞函數(shù)以在集群上運行。有兩種建議的方法可以做到這一點:
- 匿名函數(shù)語法,可用于短片代碼。
- 全局單例對象中的靜態(tài)方法。例如, 可以定義`object
MyFunctions`然后傳遞`MyFunctions.func1`,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
請注意,雖然也可以將引用傳遞給類實例中的方法(而不是單例對象),但這需要發(fā)送包含該類的對象以及方法。例如,考慮:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
在這里,如果我們創(chuàng)建一個新的MyClass實例,并調(diào)用doStuff就可以了,map里面有引用的 func1方法是的MyClass實例,所以需要發(fā)送到群集的整個對象。它類似于寫作rdd.map(x => this.func1(x))。
以類似的方式,訪問外部對象的字段將引用整個對象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相當于寫作rdd.map(x => this.field + x),它引用了所有this。要避免此問題,最簡單的方法是復制field到本地變量而不是從外部訪問它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
了解閉包
Spark的一個難點是在跨集群執(zhí)行代碼時理解變量和方法的范圍和生命周期。修改其范圍之外的變量的RDD操作可能經(jīng)常引起混淆。在下面的示例中,我們將查看foreach()用于遞增計數(shù)器的代碼,但其他操作也可能出現(xiàn)類似問題。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
本地與群集模式
上述代碼的行為未定義,可能無法按預期工作。為了執(zhí)行作業(yè),Spark將RDD操作的處理分解為任務,每個任務由執(zhí)行程序執(zhí)行。在執(zhí)行之前,Spark計算任務的關閉。閉包是那些變量和方法,它們必須是可見的,以便執(zhí)行者在RDD上執(zhí)行計算(在這種情況下foreach())。該閉包被序列化并發(fā)送給每個執(zhí)行者。
發(fā)送給每個執(zhí)行程序的閉包內(nèi)的變量現(xiàn)在是副本,因此,當在函數(shù)內(nèi)引用計數(shù)器時foreach,它不再是驅(qū)動程序節(jié)點上的計數(shù)器。驅(qū)動程序節(jié)點的內(nèi)存中仍然有一個計數(shù)器,但執(zhí)行程序不再可見!執(zhí)行程序只能看到序列化閉包中的副本。因此,計數(shù)器的最終值仍然為零,因為計數(shù)器上的所有操作都引用了序列化閉包內(nèi)的值。
在本地模式下,在某些情況下,該`foreach`函數(shù)實際上將在與驅(qū)動程序相同的JVM中執(zhí)行,并將引用相同的原始**計數(shù)器**,并且可能實際更新它。
為了確保在這些場景中明確定義的行為,應該使用[`Accumulator`](http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators)。Spark中的累加器專門用于提供一種機制,用于在跨集群中的工作節(jié)點拆分執(zhí)行時安全地更新變量。本指南的“累加器”部分更詳細地討論了這些內(nèi)容。
通常,閉包 - 類似循環(huán)或本地定義的方法的構造不應該用于改變某些全局狀態(tài)。Spark沒有定義或保證從閉包外部引用的對象的突變行為。執(zhí)行此操作的某些代碼可能在本地模式下工作,但這只是偶然的,并且此類代碼在分布式模式下不會按預期運行。如果需要某些全局聚合,請使用累加器。
打印RDD的元素
另一個常見的習慣用法是嘗試使用rdd.foreach(println)或打印出RDD的元素rdd.map(println)。在一臺機器上,這將生成預期的輸出并打印所有RDD的元素。但是,在cluster模式下,stdout執(zhí)行程序調(diào)用的輸出現(xiàn)在寫入執(zhí)行stdout程序,而不是驅(qū)動程序上的輸出,因此stdout驅(qū)動程序不會顯示這些!要打印驅(qū)動程序上的所有元素,可以使用該collect()方法首先將RDD帶到驅(qū)動程序節(jié)點:rdd.collect().foreach(println)。但是,這會導致驅(qū)動程序內(nèi)存不足,因為collect()將整個RDD提取到一臺機器上; 如果您只需要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。
使用鍵值對
雖然大多數(shù)Spark操作都適用于包含任何類型對象的RDD,但一些特殊操作僅適用于鍵值對的RDD。最常見的是分布式“隨機”操作,例如通過密鑰對元素進行分組或聚合。
在Scala中,這些操作在包含Tuple2對象的RDD上自動可用 (語言中的內(nèi)置元組,通過簡單編寫創(chuàng)建(a, b))。PairRDDFunctions類中提供了鍵值對操作 ,它自動包裝元組的RDD。
例如,以下代碼使用reduceByKey鍵值對上的操作來計算文件中每行文本出現(xiàn)的次數(shù):
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
轉(zhuǎn)換
下表列出了Spark支持的一些常見轉(zhuǎn)換。有關詳細信息,請參閱RDD API文檔(Scala, Java, Python, R)并配對RDD函數(shù)doc(Scala,Java)。
| Transformation | Meaning |
| map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
| filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. |
| flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item). |
| mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. |
| mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. |
| sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
| union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
| intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
| distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. |
| groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks. |
| reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
| aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
| sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
| join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. |
| cogroup(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith. |
| cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
| pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. |
| coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
| repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
| repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. |
操作
下表列出了Spark支持的一些常見操作。請參閱RDD API文檔(Scala, Java, Python, R)
并配對RDD函數(shù)doc(Scala, Java)以獲取詳細信息。
| reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
| collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
| count() | Return the number of elements in the dataset. |
| first() | Return the first element of the dataset (similar to take(1)). |
| take(n) | Return an array with the first n elements of the dataset. |
| takeSample(withReplacement, num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. |
| takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. |
| saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
| saveAsSequenceFile(path)
(Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
| saveAsObjectFile(path)
(Java and Scala) | Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile(). |
| countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. |
| foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details. |
隨機操作
Spark中的某些操作會觸發(fā)稱為shuffle的事件。隨機播放是Spark的重新分配數(shù)據(jù)的機制,因此它可以跨分區(qū)進行不同的分組。這通常涉及跨執(zhí)行程序和機器復制數(shù)據(jù),使得混洗成為復雜且昂貴的操作。
背景
為了理解在洗牌過程中發(fā)生的事情,我們可以考慮reduceByKey操作的例子 。該reduceByKey操作生成一個新的RDD,其中單個鍵的所有值都組合成一個元組 - 鍵和對與該鍵關聯(lián)的所有值執(zhí)行reduce函數(shù)的結果。挑戰(zhàn)在于,并非單個密鑰的所有值都必須位于同一個分區(qū),甚至是同一個機器上,但它們必須位于同一位置才能計算結果。
在Spark中,數(shù)據(jù)通常不跨分區(qū)分布,以便在特定操作的必要位置。在計算過程中,單個任務將在單個分區(qū)上運行 - 因此,為了組織reduceByKey執(zhí)行單個reduce任務的所有數(shù)據(jù),Spark需要執(zhí)行全部操作。它必須從所有分區(qū)讀取以查找所有鍵的所有值,然后將各個值組合在一起以計算每個鍵的最終結果 - 這稱為shuffle。
盡管新洗牌數(shù)據(jù)的每個分區(qū)中的元素集將是確定性的,并且分區(qū)本身的排序也是如此,但這些元素的排序不是。如果在隨機播放后需要可預測的有序數(shù)據(jù),則可以使用:
- mapPartitions 例如,使用以下方式對每個分區(qū)進行排序 .sorted
- repartitionAndSortWithinPartitions 在重新分區(qū)的同時有效地對分區(qū)進行排序
- sortBy 制作全局有序的RDD
可以導致混洗的操作包括重新分區(qū)操作,如 repartition和coalesce,'ByKey操作(除了計數(shù))之類的groupByKey和reduceByKey,以及 連接操作,如cogroup和join。
所述隨機播放是昂貴的操作,因為它涉及的磁盤I / O,數(shù)據(jù)序列,和網(wǎng)絡I / O。為了組織shuffle的數(shù)據(jù),Spark生成多組任務 - 映射任務以組織數(shù)據(jù),以及一組reduce任務來聚合它。這個術語來自MapReduce,并不直接與Spark map和reduce操作相關。
在內(nèi)部,各個地圖任務的結果會保留在內(nèi)存中,直到它們無法適應。然后,這些基于目標分區(qū)進行排序并寫入單個文件。在reduce方面,任務讀取相關的排序塊。
某些shuffle操作會消耗大量的堆內(nèi)存,因為它們使用內(nèi)存中的數(shù)據(jù)結構來在傳輸記錄之前或之后組織記錄。具體而言, reduceByKey并aggregateByKey創(chuàng)建在地圖上側這樣的結構,和'ByKey操作產(chǎn)生這些上減少側。當數(shù)據(jù)不適合內(nèi)存時,Spark會將這些表溢出到磁盤,從而導致磁盤I / O的額外開銷和垃圾收集增加。
Shuffle還會在磁盤上生成大量中間文件。從Spark 1.3開始,這些文件將被保留,直到不再使用相應的RDD并進行垃圾回收。這樣做是為了在重新計算譜系時不需要重新創(chuàng)建shuffle文件。如果應用程序保留對這些RDD的引用或GC不經(jīng)常啟動,則垃圾收集可能僅在很長一段時間后才會發(fā)生。這意味著長時間運行的Spark作業(yè)可能會占用大量磁盤空間。spark.local.dir配置Spark上下文時,配置參數(shù)指定臨時存儲目錄 。
可以通過調(diào)整各種配置參數(shù)來調(diào)整隨機行為。請參閱“ Spark配置指南 ”中的“隨機行為”部分。