翻譯:Hadoop權威指南之Spark-3

本文原始地址

Resilient Distributed Datasets

RDD是每個spark程序的核心,本節(jié)我們來看看更多細節(jié)。

Creation

創(chuàng)建RDD有三種方式:從一個內存中的對象集合,被稱為并行化(parallelizing) 一個集合;使用一個外部存儲(比如HDFS)的數(shù)據(jù)集;轉變(transform)已存在的RDD。在對少量的輸入數(shù)據(jù)并行地進行CPU密集型運算時,第一種方式非常有用。例如,下面執(zhí)行從1到10的獨立運算:

val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)

函數(shù)performExpensiveComputation并行處理輸入數(shù)據(jù)。并行性的級別由屬性spark.default.parallelism決定,該屬性的默認值取決于Spark的運行方式。本地運行時,是本地機器的核心數(shù)量,集群運行時,是集群中所有執(zhí)行(executor)節(jié)點的核心總數(shù)量。

可以為某特定運算設置并行性級別,指定parallelize()方法的第二個參數(shù)即可:

sc.parallelize(1 to 10, 10)

創(chuàng)建RDD的第二種方式,是創(chuàng)建一個指向外部數(shù)據(jù)集的引用。我們已經(jīng)見過怎樣為一個文本文件創(chuàng)建String對象的RDD:

val text:RDD[String] = sc.textFile(inputPath)

路徑inputPath可以是任意的Hadoop文件系統(tǒng)路徑,比如本地文件系統(tǒng)或HDFS上的一個文件。內部來看,Spark使用舊的MapReduce API中的TextInputFormat來讀取這個文件。這就意味著文件切分行為與Hadoop是一樣的,因此在HDFS的情況下,一個Spark分區(qū)對應一個HDFS塊(block)。這個默認行為可以改變,傳入第二個參數(shù)來請求一個特殊的切分數(shù)量:

sc.textFile(inputPath, 10)

另外一個方法允許把多個文本文件作為一個整體來處理,返回的RDD中,是成對的string,第一個string是文件的路徑,第二個string是文件的內容。因為每個文件都會加載進內存,所以這種方式僅僅適合于小文件:

val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath)

Spark能夠處理文本文件以外的其他文件格式,比如,序列文件可以這樣讀入:

sc.sequenceFile[IntWritable, Text](inputPath)

注意這里指定序列文件的鍵和值的Writable類型的方式。對于常用的Writable類型,Spark能夠映射到Java中的等價物,因此我們可以使用等價的方式:

sc.sequenceFile[Int, String](inputPath)

從任意的Hadoop InputFormat來創(chuàng)建RDD,有兩種方式:基于文件的格式,使用hadoopFile(),接收一個路徑;其他格式,比如HBase的TableInputFormat,使用hadoopRDD()。這些方法使用舊的MapReduce API。如果要用新的MapReduce API,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是讀取Avro數(shù)據(jù)文件的示例,使用特定的API和一個WeatherRecord類:

val job = new Job()
AvroJob.setInputKeySchema(job, WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(inputPath,
    classOf[AvroKeyInputFormat[WeatherRecord]],
    classOf[AvroKey[WeatherRecord]], classOf[NullWritable],
    job.getConfiguration)

除了路徑之外,newAPIHadoopFile()方法還需要InputFormat的類型、鍵的類型、值的類型,再加上Hadoop配置,該配置中帶有Avro模式,在第二行我們使用AvroJob幫助類做的設置。

創(chuàng)建RDD的第三種方式,是轉變(transform)已存在的RDD。

Transformations and Actions

Spark提供兩種類型的操作:transformationsactions。transformations從已存在的RDD生成新的RDD,而actions會觸發(fā)運算并輸出結果——返回給用戶,或者保存到外部存儲。

Actions會立刻產(chǎn)生影響,而transformations不會——它們是懶惰的,它們不做任何工作,直到action被觸發(fā)。下面的例子,把文本文件中的每一行轉為小寫:

val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))

map()方法是個transformation,Spark內部這樣處理:稍晚的時候,一個函數(shù)(這里是toLowerCase())會被調用,來處理RDD中的每一個元素。這個函數(shù)實際上沒有執(zhí)行,直到foreach()方法(這是個action)被調用,然后Spark會運行一個job,讀取輸入的文件,對文件中的每一行調用toLowerCase(),然后把結果寫到控制臺。

怎樣分辨一個操作究竟是transformation還是action呢?一個方法是看它的返回類型:如果返回類型是RDD,這是個transformation;否則就是action。當你查閱RDD的文檔時,這種方法是很有用的。對RDD執(zhí)行的大多數(shù)操作,可以在RDD的文檔(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里,這里包含了處理鍵值對RDD的transformations和actions。

Spark的庫中包含了豐富的操作,有transformations者諸如映射(mapping)、分組(grouping)、聚合(aggregating)、再分配(repartitioning)、取樣(sampling)、連接(joining)多個RDD、把RDDs作為集合(sets)對待。還有actions者諸如把RDDs物化(materializing)為集合(collections)、對RDD進行計算統(tǒng)計、從RDD中取樣出固定數(shù)目的元素,把RDD保存到外部存儲。細節(jié)內容,查看文檔。

MapReduce in Spark

盡管名字很有暗示性,Spark中的map()和reduce()操作,與Hadoop MapReduce中相同名字的函數(shù),不是直接對應的。Hadoop MapReduce中的map和reduce的通常形式是:

map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

從list標記可以看出,這兩個函數(shù)都可以返回多個輸出對。這種操作在Spark(Scala)中被實現(xiàn)為flatMap(),與map()很像,但是移除了一層嵌套:

scala> val l = List(1, 2, 3)
l: List[Int] = List(1, 2, 3)

scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))

scala> l.flatMap(a => List(a))
res1: List[Int] = List(1, 2, 3)

有一種樸素的方式,可以在Spark中模擬Hadoop MapReduce。用兩個flatMap()操作,中間用groupByKey()和sortByKey()來執(zhí)行MapReduce的混洗(shuffle)和排序:

val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)

這里key的類型K2要繼承自Scala的Ordering類型,以滿足sortByKey()。

這個例子可以幫助我們理解MapReduce和Spark的關系,但是不能盲目應用。首先,這里的語義和Hadoop的MapReduce有微小的差別,sortByKey()執(zhí)行的是全量排序。使用repartitionAndSortWithinPartitions()方法來執(zhí)行部分排序,可以避免這個問題。然而,這樣還是無效的,因為Spark有兩次混洗的過程(一次groupByKey(),一次sort)。

與其重造MapReduce,不如僅僅使用那些你實際需要的操作。比如,如果不需要按key排序,你可以省略sortByKey(),這在Hadoop MapReduce中是不可能的。

同樣的,大多數(shù)情況下groupByKey()太普遍了。通常只在聚合數(shù)據(jù)時需要混洗,因此應該使用reduceByKey(),foldByKey(),或者aggregateByKey(),這些函數(shù)比groupByKey()更有效率,因為它們可以在map任務中作為combiner運行。最后,flatMap()可能總是不需要的,如果總有一個返回值,map()是首選,如果有0或1個返回值,使用filter()。

Aggregation transformations

根據(jù)key來聚合鍵值對RDD的三個主要的transformations是reduceByKey(),foldByKey(),和aggregateByKey()。它們的工作方式稍有不同,但它們都是根據(jù)鍵來聚合值的,為每一個鍵生成一個單獨的值。對應的actions是reduce(),fold()和aggregate(),它們以類似的方式運行,為整個RDD輸出一個單獨的值。

最簡單的是reduceByKey(),它對成對兒的值反復執(zhí)行一個函數(shù),直到生成一個單獨的值。例如:

val pairs: RDD[(String, Int)] =
    sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

鍵 a 對應的值,使用相加函數(shù)(+)聚合起來,(3 + 1)+ 5 = 9,而鍵 b 對應的值只有一個,因此不需要聚合。一般來說,這些操作是分布式的,在RDD的不同分區(qū)對應的任務中分別執(zhí)行,因此這些函數(shù)要具有互換性和連接性。換句話說,操作的順序和分組是不重要的。這種情況下,聚合函數(shù)可以這樣執(zhí)行 5 +(3 + 1),或者 3 + (1 + 5),都會返回相同的結果。

在assert語句中使用的三聯(lián)相等操作符(===),來自ScalaTest,比通常的 == 操作符提供更多有用的失敗信息。

下面是用foldByKey()來執(zhí)行相同的操作:

val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

注意到這次我們需要提供一個零值,整數(shù)相加時是0,但如果是別的類型和操作,零值將是其他不同的東西。這一次,鍵 a 對應的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的順序,不過加 0 總是第一個操作)。對于 b 是0 + 7 = 7。

使用foldByKey(),并不比reduceByKey()更強或更弱。特別地,也不能改變聚合結果的值類型。為此我們需要aggregateByKey(),例如,我們可以把那些整數(shù)值聚合到一個集合里:

val sets: RDD[(String, HashSet[Int])] =
    pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect.toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))

集合相加時,零值是空集合,因此我們用new HashSet[Int]來創(chuàng)建一個新的可變集合。我們需要向aggregateByKey()提供兩個函數(shù)作為參數(shù)。第一個函數(shù)用來控制怎樣把一個Int和一個HashSet[Int]相加,本例中我們用加等函數(shù) += 把整數(shù)加到集合里面(+ 會返回一個新集合,舊集合不會改變)。

第二個函數(shù)用來控制怎樣把兩個HashSet[Int]相加(這種情況發(fā)生在map任務的combiner執(zhí)行之后,reduce任務把兩個分區(qū)聚合之時),這里我們使用 ++= 把第二個集合的所有元素加到第一個集合里。

對于鍵 a,操作的順序可能是:
(( ? + 3) + 1) + 5) = (1, 3, 5)
或者:
( ? + 3) + 1) ++ ( ? + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了組合器(combiner)的話。

轉變后的RDD可以持久化到內存中,因此后續(xù)的操作效率很高。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容