從 Java / Scala 啟動(dòng) Spark jobs
在一個(gè)較高的概念上來(lái)說(shuō),每一個(gè) Spark 應(yīng)用程序由一個(gè)在集群上運(yùn)行著用戶的main函數(shù)和執(zhí)行各種并行操作的driver program(驅(qū)動(dòng)程序)組成。Spark 提供的主要抽象是一個(gè)彈性分布式數(shù)據(jù)集(RDD),它是可以執(zhí)行并行操作且跨集群節(jié)點(diǎn)的元素的集合。RDD 可以從一個(gè) Hadoop 文件系統(tǒng)(或者任何其它 Hadoop 支持的文件系統(tǒng)),或者一個(gè)在 driver program(驅(qū)動(dòng)程序)中已存在的 Scala 集合,以及通過 transforming(轉(zhuǎn)換)來(lái)創(chuàng)建一個(gè) RDD。用戶為了讓它在整個(gè)并行操作中更高效的重用,也許會(huì)讓 Spark persist(持久化)一個(gè) RDD 到內(nèi)存中。最后,RDD 會(huì)自動(dòng)的從節(jié)點(diǎn)故障中恢復(fù)。
在 Spark 中的第二個(gè)抽象是能夠用于并行操作的shared variables(共享變量),默認(rèn)情況下,當(dāng) Spark 的一個(gè)函數(shù)作為一組不同節(jié)點(diǎn)上的任務(wù)運(yùn)行時(shí),它將每一個(gè)變量的副本應(yīng)用到每一個(gè)任務(wù)的函數(shù)中去。有時(shí)候,一個(gè)變量需要在整個(gè)任務(wù)中,或者在任務(wù)和 driver program(驅(qū)動(dòng)程序)之間來(lái)共享。Spark 支持兩種類型的共享變量 :broadcast variables(廣播變量),它可以用于在所有節(jié)點(diǎn)上緩存一個(gè)值,和accumulators(累加器),他是一個(gè)只能被 “added(增加)” 的變量,例如 counters 和 sums。
本指南介紹了每一種 Spark 所支持的語(yǔ)言的特性。如果您啟動(dòng) Spark 的交互式 shell - 針對(duì) Scala shell 使用bin/spark-shell或者針對(duì) Python 使用bin/pyspark是很容易來(lái)學(xué)習(xí)的。
Spark 2.2.0 默認(rèn)使用 Scala 2.11 來(lái)構(gòu)建和發(fā)布直到運(yùn)行。(當(dāng)然,Spark 也可以與其它的 Scala 版本一起運(yùn)行)。為了使用 Scala 編寫應(yīng)用程序,您需要使用可兼容的 Scala 版本(例如,2.11.X)。
要編寫一個(gè) Spark 的應(yīng)用程序,您需要在 Spark 上添加一個(gè) Maven 依賴。Spark 可以通過 Maven 中央倉(cāng)庫(kù)獲取:
groupId = org.apache.spark
artifactId = spark-core_2.11
version = 2.2.0
此外,如果您想訪問一個(gè) HDFS 集群,則需要針對(duì)您的 HDFS 版本添加一個(gè)hadoop-client(hadoop 客戶端)依賴。
groupId = org.apache.hadoop
artifactId = hadoop-client
version =
最后,您需要導(dǎo)入一些 Spark classes(類)到您的程序中去。添加下面幾行:
importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConf
(在 Spark 1.3.0 之前,您需要明確導(dǎo)入org.apache.spark.SparkContext._來(lái)啟用必要的的隱式轉(zhuǎn)換。)
Spark 程序必須做的第一件事情是創(chuàng)建一個(gè)SparkContext對(duì)象,它會(huì)告訴 Spark 如何訪問集群。要?jiǎng)?chuàng)建一個(gè)SparkContext,首先需要構(gòu)建一個(gè)包含應(yīng)用程序的信息的SparkConf對(duì)象。
每一個(gè) JVM 可能只能激活一個(gè) SparkContext 對(duì)象。在創(chuàng)新一個(gè)新的對(duì)象之前,必須調(diào)用stop()該方法停止活躍的 SparkContext。
valconf=newSparkConf().setAppName(appName).setMaster(master)newSparkContext(conf)
這個(gè)appName參數(shù)是一個(gè)在集群 UI 上展示應(yīng)用程序的名稱。master是一個(gè)Spark, Mesos 或 YARN 的 cluster URL,或者指定為在 local mode(本地模式)中運(yùn)行的 “l(fā)ocal” 字符串。在實(shí)際工作中,當(dāng)在集群上運(yùn)行時(shí),您不希望在程序中將 master 給硬編碼,而是用使用spark-submit啟動(dòng)應(yīng)用并且接收它。然而,對(duì)于本地測(cè)試和單元測(cè)試,您可以通過 “l(fā)ocal” 來(lái)運(yùn)行 Spark 進(jìn)程。
在 Spark Shell 中,一個(gè)特殊的 interpreter-aware(可用的解析器)SparkContext 已經(jīng)為您創(chuàng)建好了,稱之為sc的變量。創(chuàng)建您自己的 SparkContext 將不起作用。您可以使用--master參數(shù)設(shè)置這個(gè) SparkContext 連接到哪一個(gè) master 上,并且您可以通過--jars參數(shù)傳遞一個(gè)逗號(hào)分隔的列表來(lái)添加 JARs 到 classpath 中。也可以通過--packages參數(shù)應(yīng)用一個(gè)用逗號(hào)分隔的 maven coordinates(maven 坐標(biāo))方式來(lái)添加依賴(例如,Spark 包)到您的 shell session 中去。任何額外存在且依賴的倉(cāng)庫(kù)(例如 Sonatype)可以傳遞到--repositories參數(shù)。例如,要明確使用四個(gè)核(CPU)來(lái)運(yùn)行bin/spark-shell,使用:
$ ./bin/spark-shell --master local[4]
或者, 也可以添加code.jar到它的 classpath 中去, 使用:
$ ./bin/spark-shell --master local[4]--jars code.jar
為了包含一個(gè)依賴,使用 Maven 坐標(biāo):
$ ./bin/spark-shell --master local[4]--packages"org.example:example:0.1"
有關(guān)選項(xiàng)的完整列表, 請(qǐng)運(yùn)行spark-shell --help. 在幕后,spark-shell調(diào)用了常用的spark-submit腳本.
Spark 主要以一個(gè)彈性分布式數(shù)據(jù)集(RDD)的概念為中心,它是一個(gè)容錯(cuò)且可以執(zhí)行并行操作的元素的集合。有兩種方法可以創(chuàng)建 RDD : 在你的 driver program(驅(qū)動(dòng)程序)中parallelizing一個(gè)已存在的集合,或者在外部存儲(chǔ)系統(tǒng)中引用一個(gè)數(shù)據(jù)集,例如,一個(gè)共享文件系統(tǒng),HDFS,HBase,或者提供 Hadoop InputFormat 的任何數(shù)據(jù)源。
可以在您的 driver program (a ScalaSeq) 中已存在的集合上通過調(diào)用SparkContext的parallelize方法來(lái)創(chuàng)建并行集合。該集合的元素從一個(gè)可以并行操作的 distributed dataset(分布式數(shù)據(jù)集)中復(fù)制到另一個(gè) dataset(數(shù)據(jù)集)中去。例如,這里是一個(gè)如何去創(chuàng)建一個(gè)保存數(shù)字 1 ~ 5 的并行集合。
valdata=Array(1,2,3,4,5)valdistData=sc.parallelize(data)
在創(chuàng)建后,該 distributed dataset(分布式數(shù)據(jù)集)(distData)可以并行的執(zhí)行操作。例如,我們可以調(diào)用distData.reduce((a, b) => a + b) 來(lái)合計(jì)數(shù)組中的元素。后面我們將介紹 distributed dataset(分布式數(shù)據(jù)集)上的操作。
并行集合中一個(gè)很重要參數(shù)是partitions(分區(qū))的數(shù)量,它可用來(lái)切割 dataset(數(shù)據(jù)集)。Spark 將在集群中的每一個(gè)分區(qū)上運(yùn)行一個(gè)任務(wù)。通常您希望群集中的每一個(gè) CPU 計(jì)算 2-4 個(gè)分區(qū)。一般情況下,Spark 會(huì)嘗試根據(jù)您的群集情況來(lái)自動(dòng)的設(shè)置的分區(qū)的數(shù)量。當(dāng)然,您也可以將分區(qū)數(shù)作為第二個(gè)參數(shù)傳遞到parallelize(e.g.sc.parallelize(data, 10)) 方法中來(lái)手動(dòng)的設(shè)置它。注意: 代碼中的一些地方會(huì)使用 term slices (a synonym for partitions) 以保持向后兼容.
Spark 可以從 Hadoop 所支持的任何存儲(chǔ)源中創(chuàng)建 distributed dataset(分布式數(shù)據(jù)集),包括本地文件系統(tǒng),HDFS,Cassandra,HBase,Amazon S3等等。 Spark 支持文本文件,SequenceFiles,以及任何其它的 HadoopInputFormat。
可以使用SparkContext的textFile方法來(lái)創(chuàng)建文本文件的 RDD。此方法需要一個(gè)文件的 URI(計(jì)算機(jī)上的本地路徑 ,hdfs://,s3n://等等的 URI),并且讀取它們作為一個(gè) lines(行)的集合。下面是一個(gè)調(diào)用示例:
scala>valdistFile=sc.textFile("data.txt")distFile:org.apache.spark.rdd.RDD[String]=data.txtMapPartitionsRDD[10]attextFileat:26
在創(chuàng)建后,distFile可以使用 dataset(數(shù)據(jù)集)的操作。例如,我們可以使用下面的 map 和 reduce 操作來(lái)合計(jì)所有行的數(shù)量:distFile.map(s => s.length).reduce((a, b) => a + b)。
使用 Spark 讀取文件時(shí)需要注意:
如果使用本地文件系統(tǒng)的路徑,所工作節(jié)點(diǎn)的相同訪問路徑下該文件必須可以訪問。復(fù)制文件到所有工作節(jié)點(diǎn)上,或著使用共享的網(wǎng)絡(luò)掛載文件系統(tǒng)。
所有 Spark 基于文件的 input 方法, 包括textFile, 支持在目錄上運(yùn)行, 壓縮文件, 和通配符. 例如, 您可以使用textFile("/my/directory"),textFile("/my/directory/*.txt"), andtextFile("/my/directory/*.gz").
textFile方法也可以通過第二個(gè)可選的參數(shù)來(lái)控制該文件的分區(qū)數(shù)量. 默認(rèn)情況下, Spark 為文件的每一個(gè) block(塊)創(chuàng)建的一 個(gè) partition 分區(qū)(HDFS 中塊大小默認(rèn)是 128MB),當(dāng)然你也可以通過傳遞一個(gè)較大的值來(lái)要求一個(gè)較高的分區(qū)數(shù)量。請(qǐng)注意,分區(qū)的數(shù)量不能夠小于塊的數(shù)量。
除了文本文件之外,Spark 的 Scala API 也支持一些其它的數(shù)據(jù)格式:
SparkContext.wholeTextFiles可以讀取包含多個(gè)小文本文件的目錄, 并且將它們作為一個(gè) (filename, content) pairs 來(lái)返回. 這與textFile相比, 它的每一個(gè)文件中的每一行將返回一個(gè)記錄. 分區(qū)由數(shù)據(jù)量來(lái)確定, 某些情況下, 可能導(dǎo)致分區(qū)太少. 針對(duì)這些情況,wholeTextFiles在第二個(gè)位置提供了一個(gè)可選的參數(shù)用戶控制分區(qū)的最小數(shù)量.
針對(duì)SequenceFiles, 使用 SparkContext 的sequenceFile[K, V]方法,其中K和V指的是文件中 key 和 values 的類型. 這些應(yīng)該是 Hadoop 的Writable接口的子類, 像IntWritableandText. 此外, Spark 可以讓您為一些常見的 Writables 指定原生類型; 例如,sequenceFile[Int, String]會(huì)自動(dòng)讀取 IntWritables 和 Texts.
針對(duì)其它的 Hadoop InputFormats, 您可以使用SparkContext.hadoopRDD方法, 它接受一個(gè)任意的JobConf和 input format class, key class 和 value class. 通過相同的方法你可以設(shè)置你的 input source(輸入源). 你還可以針對(duì) InputFormats 使用基于 “new” MapReduce API (org.apache.hadoop.mapreduce) 的SparkContext.newAPIHadoopRDD.
RDD.saveAsObjectFile和SparkContext.objectFile支持使用簡(jiǎn)單的序列化的 Java objects 來(lái)保存 RDD. 雖然這不像 Avro 這種專用的格式一樣高效,但其提供了一種更簡(jiǎn)單的方式來(lái)保存任何的 RDD。.
RDDs support 兩種類型的操作:transformations(轉(zhuǎn)換), 它會(huì)在一個(gè)已存在的 dataset 上創(chuàng)建一個(gè)新的 dataset, 和actions(動(dòng)作), 將在 dataset 上運(yùn)行的計(jì)算后返回到 driver 程序. 例如,map是一個(gè)通過讓每個(gè)數(shù)據(jù)集元素都執(zhí)行一個(gè)函數(shù),并返回的新 RDD 結(jié)果的 transformation,reducereduce 通過執(zhí)行一些函數(shù),聚合 RDD 中所有元素,并將最終結(jié)果給返回驅(qū)動(dòng)程序(雖然也有一個(gè)并行reduceByKey返回一個(gè)分布式數(shù)據(jù)集)的 action.
Spark 中所有的 transformations 都是lazy(懶加載的), 因此它不會(huì)立刻計(jì)算出結(jié)果. 相反, 他們只記得應(yīng)用于一些基本數(shù)據(jù)集的轉(zhuǎn)換 (例如. 文件). 只有當(dāng)需要返回結(jié)果給驅(qū)動(dòng)程序時(shí),transformations 才開始計(jì)算. 這種設(shè)計(jì)使 Spark 的運(yùn)行更高效. 例如, 我們可以了解到,map所創(chuàng)建的數(shù)據(jù)集將被用在reduce中,并且只有reduce的計(jì)算結(jié)果返回給驅(qū)動(dòng)程序,而不是映射一個(gè)更大的數(shù)據(jù)集.
默認(rèn)情況下,每次你在 RDD 運(yùn)行一個(gè) action 的時(shí), 每個(gè) transformed RDD 都會(huì)被重新計(jì)算。但是,您也可用persist(或cache) 方法將 RDD persist(持久化)到內(nèi)存中;在這種情況下,Spark 為了下次查詢時(shí)可以更快地訪問,會(huì)把數(shù)據(jù)保存在集群上。此外,還支持持續(xù)持久化 RDDs 到磁盤,或復(fù)制到多個(gè)結(jié)點(diǎn)。
為了說(shuō)明 RDD 基礎(chǔ),請(qǐng)思考下面這個(gè)的簡(jiǎn)單程序:
vallines=sc.textFile("data.txt")vallineLengths=lines.map(s=>s.length)valtotalLength=lineLengths.reduce((a,b)=>a+b)
第一行從外部文件中定義了一個(gè)基本的 RDD,但這個(gè)數(shù)據(jù)集并未加載到內(nèi)存中或即將被行動(dòng):line僅僅是一個(gè)類似指針的東西,指向該文件. 第二行定義了lineLengths作為maptransformation 的結(jié)果。請(qǐng)注意,由于laziness(延遲加載)lineLengths不會(huì)被立即計(jì)算. 最后,我們運(yùn)行reduce,這是一個(gè) action。此時(shí),Spark 分發(fā)計(jì)算任務(wù)到不同的機(jī)器上運(yùn)行,每臺(tái)機(jī)器都運(yùn)行在 map 的一部分并本地運(yùn)行 reduce,僅僅返回它聚合后的結(jié)果給驅(qū)動(dòng)程序.
如果我們也希望以后再次使用lineLengths,我們還可以添加:
lineLengths.persist()
在reduce之前,這將導(dǎo)致lineLengths在第一次計(jì)算之后就被保存在 memory 中。
當(dāng) driver 程序在集群上運(yùn)行時(shí),Spark 的 API 在很大程度上依賴于傳遞函數(shù)。有 2 種推薦的方式來(lái)做到這一點(diǎn):
Anonymous function syntax(匿名函數(shù)語(yǔ)法), 它可以用于短的代碼片斷.
在全局單例對(duì)象中的靜態(tài)方法. 例如, 您可以定義object MyFunctions然后傳遞MyFunctions.func1, 如下:
objectMyFunctions{deffunc1(s:String):String={...}}myRdd.map(MyFunctions.func1)
請(qǐng)注意,雖然也有可能傳遞一個(gè)類的實(shí)例(與單例對(duì)象相反)的方法的引用,這需要發(fā)送整個(gè)對(duì)象,包括類中其它方法。例如,考慮:
classMyClass{deffunc1(s:String):String={...}defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(func1)}}
這里,如果我們創(chuàng)建一個(gè)MyClass的實(shí)例,并調(diào)用doStuff,在map內(nèi)有MyClass實(shí)例的func1方法的引用,所以整個(gè)對(duì)象需要被發(fā)送到集群的。它類似于rdd.map(x => this.func1(x))
類似的方式,訪問外部對(duì)象的字段將引用整個(gè)對(duì)象:
classMyClass{valfield="Hello"defdoStuff(rdd:RDD[String]):RDD[String]={rdd.map(x=>field+x)}}
相當(dāng)于寫rdd.map(x => this.field + x), 它引用this所有的東西. 為了避免這個(gè)問題, 最簡(jiǎn)單的方式是復(fù)制field到一個(gè)本地變量,而不是外部訪問它:
defdoStuff(rdd:RDD[String]):RDD[String]={valfield_=this.fieldrdd.map(x=>field_+x)}
在集群中執(zhí)行代碼時(shí),一個(gè)關(guān)于 Spark 更難的事情是理解變量和方法的范圍和生命周期. 修改其范圍之外的變量 RDD 操作可以混淆的常見原因。在下面的例子中,我們將看一下使用的foreach()代碼遞增累加計(jì)數(shù)器,但類似的問題,也可能會(huì)出現(xiàn)其他操作上.
考慮一個(gè)簡(jiǎn)單的 RDD 元素求和,以下行為可能不同,具體取決于是否在同一個(gè) JVM 中執(zhí)行. 一個(gè)常見的例子是當(dāng) Spark 運(yùn)行在local本地模式(--master = local[n])時(shí),與部署 Spark 應(yīng)用到群集(例如,通過 spark-submit 到 YARN):
varcounter=0varrdd=sc.parallelize(data)// Wrong: Don't do this!!rdd.foreach(x=>counter+=x)println("Counter value: "+counter)
上面的代碼行為是不確定的,并且可能無(wú)法按預(yù)期正常工作。執(zhí)行作業(yè)時(shí),Spark 會(huì)分解 RDD 操作到每個(gè) executor 中的 task 里。在執(zhí)行之前,Spark 計(jì)算任務(wù)的closure(閉包)。而閉包是在 RDD 上的 executor 必須能夠訪問的變量和方法(在此情況下的foreach())。閉包被序列化并被發(fā)送到每個(gè)執(zhí)行器。
閉包的變量副本發(fā)給每個(gè)counter,當(dāng)counter被foreach函數(shù)引用的時(shí)候,它已經(jīng)不再是 driver node 的counter了。雖然在 driver node 仍然有一個(gè) counter 在內(nèi)存中,但是對(duì) executors 已經(jīng)不可見。executor 看到的只是序列化的閉包一個(gè)副本。所以counter最終的值還是 0,因?yàn)閷?duì)counter所有的操作均引用序列化的 closure 內(nèi)的值。
在local本地模式,在某些情況下的foreach功能實(shí)際上是同一 JVM 上的驅(qū)動(dòng)程序中執(zhí)行,并會(huì)引用同一個(gè)原始的counter計(jì)數(shù)器,實(shí)際上可能更新.
為了確保這些類型的場(chǎng)景明確的行為應(yīng)該使用的Accumulator累加器。當(dāng)一個(gè)執(zhí)行的任務(wù)分配到集群中的各個(gè) worker 結(jié)點(diǎn)時(shí),Spark 的累加器是專門提供安全更新變量的機(jī)制。本指南的累加器的部分會(huì)更詳細(xì)地討論這些。
在一般情況下,closures - constructs 像循環(huán)或本地定義的方法,不應(yīng)該被用于改動(dòng)一些全局狀態(tài)。Spark 沒有規(guī)定或保證突變的行為,以從封閉件的外側(cè)引用的對(duì)象。一些代碼,這可能以本地模式運(yùn)行,但是這只是偶然和這樣的代碼如預(yù)期在分布式模式下不會(huì)表現(xiàn)。如果需要一些全局的聚合功能,應(yīng)使用 Accumulator(累加器)。
另一種常見的語(yǔ)法用于打印 RDD 的所有元素使用rdd.foreach(println)或rdd.map(println)。在一臺(tái)機(jī)器上,這將產(chǎn)生預(yù)期的輸出和打印 RDD 的所有元素。然而,在集群cluster模式下,stdout輸出正在被執(zhí)行寫操作 executors 的stdout代替,而不是在一個(gè)驅(qū)動(dòng)程序上,因此stdout的driver程序不會(huì)顯示這些!要打印driver程序的所有元素,可以使用的collect()方法首先把 RDD 放到 driver 程序節(jié)點(diǎn)上:rdd.collect().foreach(println)。這可能會(huì)導(dǎo)致 driver 程序耗盡內(nèi)存,雖說(shuō),因?yàn)閏ollect()獲取整個(gè) RDD 到一臺(tái)機(jī)器; 如果你只需要打印 RDD 的幾個(gè)元素,一個(gè)更安全的方法是使用take():rdd.take(100).foreach(println)。
雖然大多數(shù) Spark 操作工作在包含任何類型對(duì)象的 RDDs 上,只有少數(shù)特殊的操作可用于 Key-Value 對(duì)的 RDDs. 最常見的是分布式 “shuffle” 操作,如通過元素的 key 來(lái)進(jìn)行 grouping 或 aggregating 操作.
在 Scala 中,這些操作在 RDD 上是自動(dòng)可用,它包含了Tuple2objects (the built-in tuples in the language, created by simply writing(a, b)). 在PairRDDFunctionsclass 中該 key-value pair 操作是可用的, 其中圍繞 tuple 的 RDD 進(jìn)行自動(dòng)封裝.
例如,下面的代碼使用的Key-Value對(duì)的reduceByKey操作統(tǒng)計(jì)文本文件中每一行出現(xiàn)了多少次:
vallines=sc.textFile("data.txt")valpairs=lines.map(s=>(s,1))valcounts=pairs.reduceByKey((a,b)=>a+b)
我們也可以使用counts.sortByKey(),例如,在對(duì)按字母順序排序,最后counts.collect()把他們作為一個(gè)數(shù)據(jù)對(duì)象返回給 driver 程序。
Note(注意):當(dāng)在 key-value pair 操作中使用自定義的 objects 作為 key 時(shí), 您必須確保有一個(gè)自定義的equals()方法有一個(gè)hashCode()方法相匹配. 有關(guān)詳情, 請(qǐng)參閱Object.hashCode() documentation中列出的約定.
下表列出了一些 Spark 常用的 transformations(轉(zhuǎn)換). 詳情請(qǐng)參考 RDD API 文檔 (Scala,Java,Python,R) 和 pair RDD 函數(shù)文檔 (Scala,Java).
Transformation(轉(zhuǎn)換)Meaning(含義)
map(func)返回一個(gè)新的 distributed dataset(分布式數(shù)據(jù)集),它由每個(gè) source(數(shù)據(jù)源)中的元素應(yīng)用一個(gè)函數(shù)func來(lái)生成.
filter(func)返回一個(gè)新的 distributed dataset(分布式數(shù)據(jù)集),它由每個(gè) source(數(shù)據(jù)源)中應(yīng)用一個(gè)函數(shù)func且返回值為 true 的元素來(lái)生成.
flatMap(func)與 map 類似,但是每一個(gè)輸入的 item 可以被映射成 0 個(gè)或多個(gè)輸出的 items(所以func應(yīng)該返回一個(gè) Seq 而不是一個(gè)單獨(dú)的 item).
mapPartitions(func)與 map 類似,但是單獨(dú)的運(yùn)行在在每個(gè) RDD 的 partition(分區(qū),block)上,所以在一個(gè)類型為 T 的 RDD 上運(yùn)行時(shí)func必須是 Iterator => Iterator 類型.
mapPartitionsWithIndex(func)與 mapPartitions 類似,但是也需要提供一個(gè)代表 partition 的 index(索引)的 interger value(整型值)作為參數(shù)的func,所以在一個(gè)類型為 T 的 RDD 上運(yùn)行時(shí)func必須是 (Int, Iterator) => Iterator 類型.
sample(withReplacement,fraction,seed)樣本數(shù)據(jù),設(shè)置是否放回(withReplacement), 采樣的百分比(fraction)、使用指定的隨機(jī)數(shù)生成器的種子(seed).
union(otherDataset)反回一個(gè)新的 dataset,它包含了 source dataset(源數(shù)據(jù)集)和 otherDataset(其它數(shù)據(jù)集)的并集.
intersection(otherDataset)返回一個(gè)新的 RDD,它包含了 source dataset(源數(shù)據(jù)集)和 otherDataset(其它數(shù)據(jù)集)的交集.
distinct([numTasks]))返回一個(gè)新的 dataset,它包含了 source dataset(源數(shù)據(jù)集)中去重的元素.
groupByKey([numTasks])在一個(gè) (K, V) pair 的 dataset 上調(diào)用時(shí),返回一個(gè) (K, Iterable) .
Note:如果分組是為了在每一個(gè) key 上執(zhí)行聚合操作(例如,sum 或 average),此時(shí)使用reduceByKey或aggregateByKey來(lái)計(jì)算性能會(huì)更好.
Note:默認(rèn)情況下,并行度取決于父 RDD 的分區(qū)數(shù)??梢詡鬟f一個(gè)可選的numTasks參數(shù)來(lái)設(shè)置不同的任務(wù)數(shù).
reduceByKey(func, [numTasks])在 (K, V) pairs 的 dataset 上調(diào)用時(shí), 返回 dataset of (K, V) pairs 的 dataset, 其中的 values 是針對(duì)每個(gè) key 使用給定的函數(shù)func來(lái)進(jìn)行聚合的, 它必須是 type (V,V) => V 的類型. 像groupByKey一樣, reduce tasks 的數(shù)量是可以通過第二個(gè)可選的參數(shù)來(lái)配置的.
aggregateByKey(zeroValue)(seqOp,combOp, [numTasks])在 (K, V) pairs 的 dataset 上調(diào)用時(shí), 返回 (K, U) pairs 的 dataset,其中的 values 是針對(duì)每個(gè) key 使用給定的 combine 函數(shù)以及一個(gè) neutral "0" 值來(lái)進(jìn)行聚合的. 允許聚合值的類型與輸入值的類型不一樣, 同時(shí)避免不必要的配置. 像groupByKey一樣, reduce tasks 的數(shù)量是可以通過第二個(gè)可選的參數(shù)來(lái)配置的.
sortByKey([ascending], [numTasks])在一個(gè) (K, V) pair 的 dataset 上調(diào)用時(shí),其中的 K 實(shí)現(xiàn)了 Ordered,返回一個(gè)按 keys 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 類型的ascending參數(shù)來(lái)指定.
join(otherDataset, [numTasks])在一個(gè) (K, V) 和 (K, W) 類型的 dataset 上調(diào)用時(shí),返回一個(gè) (K, (V, W)) pairs 的 dataset,它擁有每個(gè) key 中所有的元素對(duì)。Outer joins 可以通過leftOuterJoin,rightOuterJoin和fullOuterJoin來(lái)實(shí)現(xiàn).
cogroup(otherDataset, [numTasks])在一個(gè) (K, V) 和的 dataset 上調(diào)用時(shí),返回一個(gè) (K, (Iterable, Iterable)) tuples 的 dataset. 這個(gè)操作也調(diào)用了groupWith.
cartesian(otherDataset)在一個(gè) T 和 U 類型的 dataset 上調(diào)用時(shí),返回一個(gè) (T, U) pairs 類型的 dataset(所有元素的 pairs,即笛卡爾積).
pipe(command,[envVars])通過使用 shell 命令來(lái)將每個(gè) RDD 的分區(qū)給 Pipe。例如,一個(gè) Perl 或 bash 腳本。RDD 的元素會(huì)被寫入進(jìn)程的標(biāo)準(zhǔn)輸入(stdin),并且 lines(行)輸出到它的標(biāo)準(zhǔn)輸出(stdout)被作為一個(gè)字符串型 RDD 的 string 返回.
coalesce(numPartitions)Decrease(降低)RDD 中 partitions(分區(qū))的數(shù)量為 numPartitions。對(duì)于執(zhí)行過濾后一個(gè)大的 dataset 操作是更有效的.
repartition(numPartitions)Reshuffle(重新洗牌)RDD 中的數(shù)據(jù)以創(chuàng)建或者更多的 partitions(分區(qū))并將每個(gè)分區(qū)中的數(shù)據(jù)盡量保持均勻. 該操作總是通過網(wǎng)絡(luò)來(lái) shuffles 所有的數(shù)據(jù).
repartitionAndSortWithinPartitions(partitioner)根據(jù)給定的 partitioner(分區(qū)器)對(duì) RDD 進(jìn)行重新分區(qū),并在每個(gè)結(jié)果分區(qū)中,按照 key 值對(duì)記錄排序。這比每一個(gè)分區(qū)中先調(diào)用repartition然后再 sorting(排序)效率更高,因?yàn)樗梢詫⑴判蜻^程推送到 shuffle 操作的機(jī)器上進(jìn)行.
下表列出了一些 Spark 常用的 actions 操作。詳細(xì)請(qǐng)參考 RDD API 文檔 (Scala,Java,Python,R)
和 pair RDD 函數(shù)文檔 (Scala,Java).
Action(動(dòng)作)Meaning(含義)
reduce(func)使用函數(shù)func聚合 dataset 中的元素,這個(gè)函數(shù)func輸入為兩個(gè)元素,返回為一個(gè)元素。這個(gè)函數(shù)應(yīng)該是可交換(commutative )和關(guān)聯(lián)(associative)的,這樣才能保證它可以被并行地正確計(jì)算.
collect()在 driver 程序中,以一個(gè) array 數(shù)組的形式返回 dataset 的所有元素。這在過濾器(filter)或其他操作(other operation)之后返回足夠?。╯ufficiently small)的數(shù)據(jù)子集通常是有用的.
count()返回 dataset 中元素的個(gè)數(shù).
first()返回 dataset 中的第一個(gè)元素(類似于 take(1).
take(n)將數(shù)據(jù)集中的前n個(gè)元素作為一個(gè) array 數(shù)組返回.
takeSample(withReplacement,num, [seed])對(duì)一個(gè) dataset 進(jìn)行隨機(jī)抽樣,返回一個(gè)包含num個(gè)隨機(jī)抽樣(random sample)元素的數(shù)組,參數(shù) withReplacement 指定是否有放回抽樣,參數(shù) seed 指定生成隨機(jī)數(shù)的種子.
takeOrdered(n,[ordering])返回 RDD 按自然順序(natural order)或自定義比較器(custom comparator)排序后的前n個(gè)元素.
saveAsTextFile(path)將 dataset 中的元素以文本文件(或文本文件集合)的形式寫入本地文件系統(tǒng)、HDFS 或其它 Hadoop 支持的文件系統(tǒng)中的給定目錄中。Spark 將對(duì)每個(gè)元素調(diào)用 toString 方法,將數(shù)據(jù)元素轉(zhuǎn)換為文本文件中的一行記錄.
saveAsSequenceFile(path)
(Java and Scala)將 dataset 中的元素以 Hadoop SequenceFile 的形式寫入到本地文件系統(tǒng)、HDFS 或其它 Hadoop 支持的文件系統(tǒng)指定的路徑中。該操作可以在實(shí)現(xiàn)了 Hadoop 的 Writable 接口的鍵值對(duì)(key-value pairs)的 RDD 上使用。在 Scala 中,它還可以隱式轉(zhuǎn)換為 Writable 的類型(Spark 包括了基本類型的轉(zhuǎn)換,例如 Int, Double, String 等等).
saveAsObjectFile(path)
(Java and Scala)使用 Java 序列化(serialization)以簡(jiǎn)單的格式(simple format)編寫數(shù)據(jù)集的元素,然后使用SparkContext.objectFile()進(jìn)行加載.
countByKey()僅適用于(K,V)類型的 RDD 。返回具有每個(gè) key 的計(jì)數(shù)的 (K , Int)pairs 的 hashmap.
foreach(func)對(duì) dataset 中每個(gè)元素運(yùn)行函數(shù)func。這通常用于副作用(side effects),例如更新一個(gè)Accumulator(累加器)或與外部存儲(chǔ)系統(tǒng)(external storage systems)進(jìn)行交互。Note:修改除foreach()之外的累加器以外的變量(variables)可能會(huì)導(dǎo)致未定義的行為(undefined behavior)。詳細(xì)介紹請(qǐng)閱讀Understanding closures(理解閉包)部分.
該 Spark RDD API 還暴露了一些 actions(操作)的異步版本,例如針對(duì)foreach的foreachAsync,它們會(huì)立即返回一個(gè)FutureAction到調(diào)用者,而不是在完成 action 時(shí)阻塞。 這可以用于管理或等待 action 的異步執(zhí)行。.
Spark 里的某些操作會(huì)觸發(fā) shuffle。shuffle 是spark 重新分配數(shù)據(jù)的一種機(jī)制,使得這些數(shù)據(jù)可以跨不同的區(qū)域進(jìn)行分組。這通常涉及在 executors 和 機(jī)器之間拷貝數(shù)據(jù),這使得 shuffle 成為一個(gè)復(fù)雜的、代價(jià)高的操作。
為了明白reduceByKey操作的過程,我們以reduceByKey為例。reduceBykey 操作產(chǎn)生一個(gè)新的 RDD,其中 key 所有相同的的值組合成為一個(gè) tuple - key 以及與 key 相關(guān)聯(lián)的所有值在 reduce 函數(shù)上的執(zhí)行結(jié)果。面臨的挑戰(zhàn)是,一個(gè) key 的所有值不一定都在一個(gè)同一個(gè) paritition 分區(qū)里,甚至是不一定在同一臺(tái)機(jī)器里,但是它們必須共同被計(jì)算。
在 spark 里,特定的操作需要數(shù)據(jù)不跨分區(qū)分布。在計(jì)算期間,一個(gè)任務(wù)在一個(gè)分區(qū)上執(zhí)行,為了所有數(shù)據(jù)都在單個(gè)reduceByKey的 reduce 任務(wù)上運(yùn)行,我們需要執(zhí)行一個(gè) all-to-all 操作。它必須從所有分區(qū)讀取所有的 key 和 key對(duì)應(yīng)的所有的值,并且跨分區(qū)聚集去計(jì)算每個(gè) key 的結(jié)果 - 這個(gè)過程就叫做shuffle.。
Although the set of elements in each partition of newly shuffled data will be deterministic, and so is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably ordered data following shuffle then it’s possible to use:
盡管每個(gè)分區(qū)新 shuffle 的數(shù)據(jù)集將是確定的,分區(qū)本身的順序也是這樣,但是這些數(shù)據(jù)的順序是不確定的。如果希望 shuffle 后的數(shù)據(jù)是有序的,可以使用:
mapPartitions對(duì)每個(gè) partition 分區(qū)進(jìn)行排序,例如,.sorted
repartitionAndSortWithinPartitions在分區(qū)的同時(shí)對(duì)分區(qū)進(jìn)行高效的排序.
sortBy對(duì) RDD 進(jìn)行全局的排序
觸發(fā)的 shuffle 操作包括repartition操作,如repartition和coalesce,‘ByKey操作 (除了 counting 之外) 像groupByKey和reduceByKey, 和join操作, 像cogroup和join.
該Shuffle是一個(gè)代價(jià)比較高的操作,它涉及磁盤 I/O、數(shù)據(jù)序列化、網(wǎng)絡(luò) I/O。為了準(zhǔn)備 shuffle 操作的數(shù)據(jù),Spark 啟動(dòng)了一系列的任務(wù),map任務(wù)組織數(shù)據(jù),reduce完成數(shù)據(jù)的聚合。這些術(shù)語(yǔ)來(lái)自 MapReduce,跟 Spark 的map操作和reduce操作沒有關(guān)系。
在內(nèi)部,一個(gè) map 任務(wù)的所有結(jié)果數(shù)據(jù)會(huì)保存在內(nèi)存,直到內(nèi)存不能全部存儲(chǔ)為止。然后,這些數(shù)據(jù)將基于目標(biāo)分區(qū)進(jìn)行排序并寫入一個(gè)單獨(dú)的文件中。在 reduce 時(shí),任務(wù)將讀取相關(guān)的已排序的數(shù)據(jù)塊。
某些 shuffle 操作會(huì)大量消耗堆內(nèi)存空間,因?yàn)?shuffle 操作在數(shù)據(jù)轉(zhuǎn)換前后,需要在使用內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)對(duì)數(shù)據(jù)進(jìn)行組織。需要特別說(shuō)明的是,reduceByKey和aggregateByKey在 map 時(shí)會(huì)創(chuàng)建這些數(shù)據(jù)結(jié)構(gòu),'ByKey操作在 reduce 時(shí)創(chuàng)建這些數(shù)據(jù)結(jié)構(gòu)。當(dāng)內(nèi)存滿的時(shí)候,Spark 會(huì)把溢出的數(shù)據(jù)存到磁盤上,這將導(dǎo)致額外的磁盤 I/O 開銷和垃圾回收開銷的增加。
shuffle 操作還會(huì)在磁盤上生成大量的中間文件。在 Spark 1.3 中,這些文件將會(huì)保留至對(duì)應(yīng)的 RDD 不在使用并被垃圾回收為止。這么做的好處是,如果在 Spark 重新計(jì)算 RDD 的血統(tǒng)關(guān)系(lineage)時(shí),shuffle 操作產(chǎn)生的這些中間文件不需要重新創(chuàng)建。如果 Spark 應(yīng)用長(zhǎng)期保持對(duì) RDD 的引用,或者垃圾回收不頻繁,這將導(dǎo)致垃圾回收的周期比較長(zhǎng)。這意味著,長(zhǎng)期運(yùn)行 Spark 任務(wù)可能會(huì)消耗大量的磁盤空間。臨時(shí)數(shù)據(jù)存儲(chǔ)路徑可以通過 SparkContext 中設(shè)置參數(shù)spark.local.dir進(jìn)行配置。
shuffle 操作的行為可以通過調(diào)節(jié)多個(gè)參數(shù)進(jìn)行設(shè)置。詳細(xì)的說(shuō)明請(qǐng)看Spark 配置指南中的 “Shuffle 行為” 部分。
Spark 中一個(gè)很重要的能力是將數(shù)據(jù)persisting持久化(或稱為caching緩存),在多個(gè)操作間都可以訪問這些持久化的數(shù)據(jù)。當(dāng)持久化一個(gè) RDD 時(shí),每個(gè)節(jié)點(diǎn)的其它分區(qū)都可以使用 RDD 在內(nèi)存中進(jìn)行計(jì)算,在該數(shù)據(jù)上的其他 action 操作將直接使用內(nèi)存中的數(shù)據(jù)。這樣會(huì)讓以后的 action 操作計(jì)算速度加快(通常運(yùn)行速度會(huì)加速 10 倍)。緩存是迭代算法和快速的交互式使用的重要工具。
RDD 可以使用persist()方法或cache()方法進(jìn)行持久化。數(shù)據(jù)將會(huì)在第一次 action 操作時(shí)進(jìn)行計(jì)算,并緩存在節(jié)點(diǎn)的內(nèi)存中。Spark 的緩存具有容錯(cuò)機(jī)制,如果一個(gè)緩存的 RDD 的某個(gè)分區(qū)丟失了,Spark 將按照原來(lái)的計(jì)算過程,自動(dòng)重新計(jì)算并進(jìn)行緩存。
另外,每個(gè)持久化的 RDD 可以使用不同的storage level存儲(chǔ)級(jí)別進(jìn)行緩存,例如,持久化到磁盤、已序列化的 Java 對(duì)象形式持久化到內(nèi)存(可以節(jié)省空間)、跨節(jié)點(diǎn)間復(fù)制、以 off-heap 的方式存儲(chǔ)在 Tachyon。這些存儲(chǔ)級(jí)別通過傳遞一個(gè)StorageLevel對(duì)象 (Scala,Java,Python) 給persist()方法進(jìn)行設(shè)置。cache()方法是使用默認(rèn)存儲(chǔ)級(jí)別的快捷設(shè)置方法,默認(rèn)的存儲(chǔ)級(jí)別是StorageLevel.MEMORY_ONLY(將反序列化的對(duì)象存儲(chǔ)到內(nèi)存中)。詳細(xì)的存儲(chǔ)級(jí)別介紹如下:
Storage Level(存儲(chǔ)級(jí)別)Meaning(含義)
MEMORY_ONLY將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中. 如果內(nèi)存空間不夠,部分?jǐn)?shù)據(jù)分區(qū)將不再緩存,在每次需要用到這些數(shù)據(jù)時(shí)重新進(jìn)行計(jì)算. 這是默認(rèn)的級(jí)別.
MEMORY_AND_DISK將 RDD 以反序列化的 Java 對(duì)象的形式存儲(chǔ)在 JVM 中。如果內(nèi)存空間不夠,將未緩存的數(shù)據(jù)分區(qū)存儲(chǔ)到磁盤,在需要使用這些分區(qū)時(shí)從磁盤讀取.
MEMORY_ONLY_SER
(Java and Scala)將 RDD 以序列化的 Java 對(duì)象的形式進(jìn)行存儲(chǔ)(每個(gè)分區(qū)為一個(gè) byte 數(shù)組)。這種方式會(huì)比反序列化對(duì)象的方式節(jié)省很多空間,尤其是在使用fast serializer時(shí)會(huì)節(jié)省更多的空間,但是在讀取時(shí)會(huì)增加 CPU 的計(jì)算負(fù)擔(dān).
MEMORY_AND_DISK_SER
(Java and Scala)類似于 MEMORY_ONLY_SER ,但是溢出的分區(qū)會(huì)存儲(chǔ)到磁盤,而不是在用到它們時(shí)重新計(jì)算.
DISK_ONLY只在磁盤上緩存 RDD.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.與上面的級(jí)別功能相同,只不過每個(gè)分區(qū)在集群中兩個(gè)節(jié)點(diǎn)上建立副本.
OFF_HEAP (experimental 實(shí)驗(yàn)性)類似于 MEMORY_ONLY_SER, 但是將數(shù)據(jù)存儲(chǔ)在off-heap memory中. 這需要啟用 off-heap 內(nèi)存.
Note:在 Python 中, stored objects will 總是使用Picklelibrary 來(lái)序列化對(duì)象, 所以無(wú)論你選擇序列化級(jí)別都沒關(guān)系. 在 Python 中可用的存儲(chǔ)級(jí)別有MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY, 和DISK_ONLY_2.
在 shuffle 操作中(例如reduceByKey),即便是用戶沒有調(diào)用persist方法,Spark 也會(huì)自動(dòng)緩存部分中間數(shù)據(jù).這么做的目的是,在 shuffle 的過程中某個(gè)節(jié)點(diǎn)運(yùn)行失敗時(shí),不需要重新計(jì)算所有的輸入數(shù)據(jù)。如果用戶想多次使用某個(gè) RDD,強(qiáng)烈推薦在該 RDD 上調(diào)用 persist 方法.
Spark 的存儲(chǔ)級(jí)別的選擇,核心問題是在 memory 內(nèi)存使用率和 CPU 效率之間進(jìn)行權(quán)衡。建議按下面的過程進(jìn)行存儲(chǔ)級(jí)別的選擇:
如果您的 RDD 適合于默認(rèn)存儲(chǔ)級(jí)別 (MEMORY_ONLY), leave them that way. 這是CPU效率最高的選項(xiàng),允許RDD上的操作盡可能快地運(yùn)行.
如果不是, 試著使用MEMORY_ONLY_SER和selecting a fast serialization library以使對(duì)象更加節(jié)省空間,但仍然能夠快速訪問。 (Java和Scala)
不要溢出到磁盤,除非計(jì)算您的數(shù)據(jù)集的函數(shù)是昂貴的, 或者它們過濾大量的數(shù)據(jù). 否則, 重新計(jì)算分區(qū)可能與從磁盤讀取分區(qū)一樣快.
如果需要快速故障恢復(fù),請(qǐng)使用復(fù)制的存儲(chǔ)級(jí)別 (e.g. 如果使用Spark來(lái)服務(wù) 來(lái)自網(wǎng)絡(luò)應(yīng)用程序的請(qǐng)求).All存儲(chǔ)級(jí)別通過重新計(jì)算丟失的數(shù)據(jù)來(lái)提供完整的容錯(cuò)能力,但復(fù)制的數(shù)據(jù)可讓您繼續(xù)在 RDD 上運(yùn)行任務(wù),而無(wú)需等待重新計(jì)算一個(gè)丟失的分區(qū).
Spark 會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上的緩存使用情況,并使用 least-recently-used(LRU)的方式來(lái)丟棄舊數(shù)據(jù)分區(qū)。 如果您想手動(dòng)刪除 RDD 而不是等待它掉出緩存,使用RDD.unpersist()方法。
通常情況下,一個(gè)傳遞給 Spark 操作(例如map或reduce)的函數(shù) func 是在遠(yuǎn)程的集群節(jié)點(diǎn)上執(zhí)行的。該函數(shù) func 在多個(gè)節(jié)點(diǎn)執(zhí)行過程中使用的變量,是同一個(gè)變量的多個(gè)副本。這些變量的以副本的方式拷貝到每個(gè)機(jī)器上,并且各個(gè)遠(yuǎn)程機(jī)器上變量的更新并不會(huì)傳播回 driver program(驅(qū)動(dòng)程序)。通用且支持 read-write(讀-寫) 的共享變量在任務(wù)間是不能勝任的。所以,Spark 提供了兩種特定類型的共享變量 : broadcast variables(廣播變量)和 accumulators(累加器)。
Broadcast variables(廣播變量)允許程序員將一個(gè) read-only(只讀的)變量緩存到每臺(tái)機(jī)器上,而不是給任務(wù)傳遞一個(gè)副本。它們是如何來(lái)使用呢,例如,廣播變量可以用一種高效的方式給每個(gè)節(jié)點(diǎn)傳遞一份比較大的 input dataset(輸入數(shù)據(jù)集)副本。在使用廣播變量時(shí),Spark 也嘗試使用高效廣播算法分發(fā) broadcast variables(廣播變量)以降低通信成本。
Spark 的 action(動(dòng)作)操作是通過一系列的 stage(階段)進(jìn)行執(zhí)行的,這些 stage(階段)是通過分布式的 “shuffle” 操作進(jìn)行拆分的。Spark 會(huì)自動(dòng)廣播出每個(gè) stage(階段)內(nèi)任務(wù)所需要的公共數(shù)據(jù)。這種情況下廣播的數(shù)據(jù)使用序列化的形式進(jìn)行緩存,并在每個(gè)任務(wù)運(yùn)行前進(jìn)行反序列化。這也就意味著,只有在跨越多個(gè) stage(階段)的多個(gè)任務(wù)會(huì)使用相同的數(shù)據(jù),或者在使用反序列化形式的數(shù)據(jù)特別重要的情況下,使用廣播變量會(huì)有比較好的效果。
廣播變量通過在一個(gè)變量v上調(diào)用SparkContext.broadcast(v)方法來(lái)進(jìn)行創(chuàng)建。廣播變量是v的一個(gè) wrapper(包裝器),可以通過調(diào)用value方法來(lái)訪問它的值。代碼示例如下:
scala>valbroadcastVar=sc.broadcast(Array(1,2,3))broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)scala>broadcastVar.valueres0:Array[Int]=Array(1,2,3)
在創(chuàng)建廣播變量之后,在集群上執(zhí)行的所有的函數(shù)中,應(yīng)該使用該廣播變量代替原來(lái)的v值,所以節(jié)點(diǎn)上的v最多分發(fā)一次。另外,對(duì)象v在廣播后不應(yīng)該再被修改,以保證分發(fā)到所有的節(jié)點(diǎn)上的廣播變量具有同樣的值(例如,如果以后該變量會(huì)被運(yùn)到一個(gè)新的節(jié)點(diǎn))。
Accumulators(累加器)是一個(gè)僅可以執(zhí)行 “added”(添加)的變量來(lái)通過一個(gè)關(guān)聯(lián)和交換操作,因此可以高效地執(zhí)行支持并行。累加器可以用于實(shí)現(xiàn) counter( 計(jì)數(shù),類似在 MapReduce 中那樣)或者 sums(求和)。原生 Spark 支持?jǐn)?shù)值型的累加器,并且程序員可以添加新的支持類型。
作為一個(gè)用戶,您可以創(chuàng)建 accumulators(累加器)并且重命名. 如下圖所示, 一個(gè)命名的 accumulator 累加器(在這個(gè)例子中是counter)將顯示在 web UI 中,用于修改該累加器的階段。 Spark 在 “Tasks” 任務(wù)表中顯示由任務(wù)修改的每個(gè)累加器的值.

在 UI 中跟蹤累加器可以有助于了解運(yùn)行階段的進(jìn)度(注: 這在 Python 中尚不支持).
可以通過調(diào)用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()方法創(chuàng)建數(shù)值類型的accumulator(累加器)以分別累加 Long 或 Double 類型的值。集群上正在運(yùn)行的任務(wù)就可以使用add方法來(lái)累計(jì)數(shù)值。然而,它們不能夠讀取它的值。只有 driver program(驅(qū)動(dòng)程序)才可以使用value方法讀取累加器的值。
下面的代碼展示了一個(gè) accumulator(累加器)被用于對(duì)一個(gè)數(shù)組中的元素求和:
scala>valaccum=sc.longAccumulator("My Accumulator")accum:org.apache.spark.util.LongAccumulator=LongAccumulator(id:0,name:Some(MyAccumulator),value:0)scala>sc.parallelize(Array(1,2,3,4)).foreach(x=>accum.add(x))...10/09/2918:41:08INFOSparkContext:Tasksfinishedin0.317106sscala>accum.valueres2:Long=10
雖然此代碼使用 Long 類型的累加器的內(nèi)置支持, 但是開發(fā)者通過AccumulatorV2它的子類來(lái)創(chuàng)建自己的類型. AccumulatorV2 抽象類有幾個(gè)需要 override(重寫)的方法:reset方法可將累加器重置為 0,add方法可將其它值添加到累加器中,merge方法可將其他同樣類型的累加器合并為一個(gè). 其他需要重寫的方法可參考API documentation. 例如, 假設(shè)我們有一個(gè)表示數(shù)學(xué)上 vectors(向量)的MyVector類,我們可以寫成:
classVectorAccumulatorV2extendsAccumulatorV2[MyVector,MyVector]{privatevalmyVector:MyVector=MyVector.createZeroVectordefreset():Unit={myVector.reset()}defadd(v:MyVector):Unit={myVector.add(v)}...}// Then, create an Accumulator of this type:valmyVectorAcc=newVectorAccumulatorV2// Then, register it into spark context:sc.register(myVectorAcc,"MyVectorAcc1")
注意,在開發(fā)者定義自己的 AccumulatorV2 類型時(shí), resulting type(返回值類型)可能與添加的元素的類型不一致。
累加器的更新只發(fā)生在action操作中,Spark 保證每個(gè)任務(wù)只更新累加器一次,例如,重啟任務(wù)不會(huì)更新值。在 transformations(轉(zhuǎn)換)中, 用戶需要注意的是,如果 task(任務(wù))或 job stages(階段)重新執(zhí)行,每個(gè)任務(wù)的更新操作可能會(huì)執(zhí)行多次。
累加器不會(huì)改變 Spark lazy evaluation(懶加載)的模式。如果累加器在 RDD 中的一個(gè)操作中進(jìn)行更新,它們的值僅被更新一次,RDD 被作為 action 的一部分來(lái)計(jì)算。因此,在一個(gè)像map()這樣的 transformation(轉(zhuǎn)換)時(shí),累加器的更新并沒有執(zhí)行。下面的代碼片段證明了這個(gè)特性:
valaccum=sc.longAccumulatordata.map{x=>accum.add(x);x}// Here, accum is still 0 because no actions have caused the map operation to be computed.
該應(yīng)用提交指南描述了如何將應(yīng)用提交到集群中. 簡(jiǎn)單的說(shuō), 在您將應(yīng)用打包成一個(gè)JAR(針對(duì) Java/Scala) 或者一組.py或.zip文件 (針對(duì)Python), 該bin/spark-submit腳本可以讓你提交它到任何所支持的 cluster manager 上去.
從 Java / Scala 啟動(dòng) Spark jobs
該org.apache.spark.launcherpackage 提供了 classes 用于使用簡(jiǎn)單的 Java API 來(lái)作為一個(gè)子進(jìn)程啟動(dòng) Spark jobs.
Spark 可以友好的使用流行的單元測(cè)試框架進(jìn)行單元測(cè)試。在將 master URL 設(shè)置為local來(lái)測(cè)試時(shí)會(huì)簡(jiǎn)單的創(chuàng)建一個(gè)SparkContext,運(yùn)行您的操作,然后調(diào)用SparkContext.stop()將該作業(yè)停止。因?yàn)?Spark 不支持在同一個(gè)程序中并行的運(yùn)行兩個(gè) contexts,所以需要確保使用 finally 塊或者測(cè)試框架的tearDown方法將 context 停止。
您可以在 Spark 網(wǎng)站上看一下Spark 程序示例. 此外, Spark 在examples目錄中包含了許多示例 (Scala,Java,Python,R). 您可以通過傳遞 class name 到 Spark 的 bin/run-example 腳本以運(yùn)行 Java 和 Scala 示例; 例如:
./bin/run-example SparkPi
針對(duì) Python 示例,使用spark-submit來(lái)代替:
./bin/spark-submit examples/src/main/python/pi.py
針對(duì) R 示例,使用spark-submit來(lái)代替:
./bin/spark-submit examples/src/main/r/dataframe.R
針對(duì)應(yīng)用程序的優(yōu)化, 該配置和優(yōu)化指南一些最佳實(shí)踐的信息. 這些優(yōu)化建議在確保你的數(shù)據(jù)以高效的格式存儲(chǔ)在內(nèi)存中尤其重要. 針對(duì)部署參考, 該集群模式概述描述了分布式操作和支持的 cluster managers 集群管理器的組件.
最后,所有的 API 文檔可在Scala,Java,PythonandR中獲取.

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/rdd-programming-guide.html
網(wǎng)頁(yè)地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺得不錯(cuò)麻煩給個(gè) Star,謝謝!~)