Spark 3. RDD 操作一 基礎(chǔ) ,放入方法,閉包,輸出元素, 使用 K-V 工作

RDD 操作一 基礎(chǔ) ,放入方法,閉包,輸出元素,使用 Key-Value 工作

原文地址: http://spark.apache.org/docs/latest/programming-guide.html
僅限交流使用,轉(zhuǎn)載請注明出處。如有錯(cuò)誤,歡迎指出!

Henvealf/譯

RDD 提供了兩種類型的操作:

  • transformations :從已經(jīng)存在的 RDD 中創(chuàng)建出一個(gè)新的 RDD。
  • actions: 在集群上運(yùn)行了一個(gè)計(jì)算后,最終返回一個(gè)值給設(shè)備中的程序。

transformation 的一個(gè)例子就是map,對 RDD 中的每個(gè)元素進(jìn)行相同的操作,返回一個(gè)新的 RDD。
action 的一個(gè)例子就是 reduce,使用相同的函數(shù)來聚合 RDD 中的元素。

在 Spark 中,所有的 transformation 都是懶惰的(lazy),以至于他不會(huì)立刻計(jì)算出他們結(jié)果。代替的是,他們僅僅記住這個(gè) transformation 應(yīng)用在哪些基礎(chǔ)的數(shù)據(jù)集上(比如一個(gè)文件)。transformation 計(jì)算僅僅是在程序中的一個(gè)動(dòng)作需要一個(gè)返回值的時(shí)候才開始。這個(gè)設(shè)計(jì)讓能夠讓 Spark 更加高效。舉個(gè)例子,我們能夠意識(shí)到一個(gè) map 生成的數(shù)據(jù)集只會(huì)用在一個(gè) reduce 上,并且僅僅返回 reduce 的結(jié)果給設(shè)備,而不會(huì)是一個(gè) map 后的很大的數(shù)據(jù)集給設(shè)備。

默認(rèn)情況下,在你每次重新運(yùn)行一個(gè)通過轉(zhuǎn)換(transforme)得到的RDD的action 的時(shí)候,轉(zhuǎn)換每次都可能重新再運(yùn)行一次。然而,你也可以使用 persist(或者 cache)方法將一個(gè) RDD 持久化在內(nèi)存中,這樣就能讓 Spark 把這些元素維持在集群中,讓下一次的存取速度變得飛快。這里也同樣支持持久話 RDD 在磁盤中,或者備份在多個(gè)節(jié)點(diǎn)中。

基礎(chǔ)

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
lineLengths.persist()
val totalLength = lineLengths.reduce((a, b) => a + b)

往 Spark 中放入方法

scala

Spark 的 API 最信任的就是在集群上運(yùn)行的方法上放方法。下面有兩個(gè)建議:

  • 使用匿名方法語法,能夠減少代碼量。
  • 靜態(tài)化在全局的單例對象上的函數(shù),就是定義一個(gè)object,可以把它理解為直接創(chuàng)建了一個(gè)對象,不需要 new 就可以使用。也可以把他理解為一個(gè)類,而其中的函數(shù)都默認(rèn)為靜態(tài)的。里面有你用到所有函數(shù)/方法。比如,你可以定義 object MyFunctions ,之后通過 MyFunxtions 來使用方法:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

注意你也可能傳入一個(gè)引用給一個(gè)類(class) 中的函數(shù)(與單例 object 的做法是相反的),他需要向 Spark 中傳入包含了要使用的方法的類的對象。比如下面:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

著這里,我們要 new 一個(gè) MyClass 的對象才能使用 doSuff 方法。我們要將這一整個(gè)對象傳送入集群中才可以,然后書寫方式和 rdd.map(x => this.func1(x)) 很像。

用很相似的方式,外部的對象存取字段就會(huì)引用到整個(gè)對象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

可以發(fā)現(xiàn)這樣其實(shí)就是 rdd.map(x => this.field + x), 這樣在外部就得到了他的 this 引用,這樣很不安全,容易出錯(cuò),為了解決這個(gè)問題,下面有一個(gè)簡單方式, 就是先將字段賦給一個(gè)局部變量中:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解閉包(closures)

在 Spark 中比較難理解的就是當(dāng)在集群上運(yùn)行代碼的時(shí)候,變量和方法的作用域與生命周期。在其作用域之外修改變量的 RDD 操作可以是一個(gè)混淆的常用資源( can be a frequent source of confusion)。下面我們使用 foreach 來遞增一個(gè)計(jì)數(shù)器,相同的問題也能同樣出現(xiàn)在其他的操作上。

例子

考慮原生的 RDD 元素的加和操作,這個(gè)操作在不同的虛擬機(jī)上執(zhí)行可能會(huì)呈現(xiàn)出不同的行為。這個(gè)普通的例子是將 Spark 運(yùn)行在 local 模式下(--master = local[n])的情況與運(yùn)行在集群上的情況做比較(通過 park-submit 給 YARN)。

Scala

var counter = 0;
var rdd = sc.parallelize(data)

// Wrong: 不會(huì)執(zhí)行他
rdd.foreach(x => counter + x)

println("Counter: " + counter)

本地模式 Vs. 集群模式

在這之前先明確一下一些概念:

Driver: 驅(qū)動(dòng)器,一個(gè) job 只有一個(gè),主要負(fù)責(zé) job 的解析,與 task 的調(diào)度等。

Executor:執(zhí)行器,實(shí)際運(yùn)行 task 的地方,一個(gè) job 有多個(gè)。

上面這段代碼的行為是不確定的,可能不像預(yù)想中那樣工作。為了執(zhí)行 job ,Spark 會(huì)將處理 RDD 的操作拆分到許多 task 中,且每一個(gè) task 被一個(gè)執(zhí)行器執(zhí)行。在執(zhí)行之前, Spark 會(huì)計(jì)算 task 的閉包。閉包是一些必須讓執(zhí)行器可見的變量和方法,這樣執(zhí)行器才能執(zhí)行他們在 RDD 上的操作(這里就是 foreach)。這個(gè)閉包是被序列化并傳送到了每個(gè)執(zhí)行器。

在集群上的變量會(huì)立刻被送到每個(gè)執(zhí)行器中,事實(shí)上,當(dāng) counter 被引用使用在 foreach 方法里面時(shí),他就不再是驅(qū)動(dòng)(driver)節(jié)點(diǎn)上的 counter 了。也就是說在驅(qū)動(dòng)節(jié)點(diǎn)的內(nèi)存中也一直會(huì)有一個(gè) counter ,可他對執(zhí)行器來說,已經(jīng)不可見了。執(zhí)行器僅僅能看到序列化了的閉包中的拷貝。事實(shí)上, 驅(qū)動(dòng)器上的 final 的 counter 的值在操作執(zhí)行的時(shí)候一直都是0,執(zhí)行器操作的只是引用的序列化的閉包中的值。

在 local 模式下,foreach 方法實(shí)際會(huì)運(yùn)行在作為驅(qū)動(dòng)器的 JVM 中,也就是說運(yùn)行程序的 JVM 和運(yùn)行驅(qū)動(dòng)器的 JVM 是同一個(gè)。所以操作就會(huì)引用到原始的 countercounter 的值就被改變了。

如果想要確?,F(xiàn)在說的這種情況有確定的行為,一種就是使用一個(gè) Accumulator(積累器)。Accumulator 常常使用于在執(zhí)行被分片到不同的 worker 時(shí)需要安全的對變量進(jìn)行更新的情況。 Accumulator 以后詳細(xì)介紹。

一般情況下,閉包--構(gòu)建循環(huán)或者局部函數(shù),應(yīng)該不要用于改變一些全局的狀態(tài)。 Spark 不能確定或者保證修改閉包之外的的對象引用時(shí)的行為。一些代碼在本地模式下運(yùn)行的好好的,在放到集群上運(yùn)行時(shí)就可能得不到期望的結(jié)果。如果需要使用全局的聚合,就使用一個(gè) Accumulator 來代替他。

輸出一個(gè) RDD 的元素

另一個(gè)老事件就是試圖使用 rdd.foreach(println) 或者 rdd.map(println) 打印出元素的值。在一個(gè)機(jī)器上,輸出 RDD 所有的元素的將會(huì)生成你期望的輸出。然而,在 cluster 模式下是,stdout 會(huì)由執(zhí)行器來調(diào)用,寫在了執(zhí)行器的標(biāo)準(zhǔn)輸出上,而不是驅(qū)動(dòng)器上。所以在驅(qū)動(dòng)器上你就看不到 stdout 的輸出類了。

為了在驅(qū)動(dòng)器上輸出所有的元素,一個(gè)你可以使用 collect 方法,先把這個(gè) RDD 帶到驅(qū)動(dòng)器節(jié)點(diǎn)上: rdd.collect().foreach(println)。不過這中方法容易造成內(nèi)存不足。因?yàn)?collect() 會(huì)把 RDD 實(shí)體拿進(jìn)一個(gè)單獨(dú)的機(jī)器中;如果僅僅需要輸出 RDD 的一小部分元素,最安全的方式是使用 take(): rdd.take().foreach(println).

使用 Key-Value 工作

Scala

RDDs 中包括任何類型的對象,有一些特殊的操作是能用于 RDDs 的鍵值對上。 最普遍的就是集群上的 “洗牌” 過程,就是使用 key 來進(jìn)行分組和聚合。

在 Scala 中,這些操作在 包含 ** Tuple2** (二元組)對象的 RDDs 中是自動(dòng)(直接?)可用的(在本語言中,之間寫一個(gè)(a,b) 就能創(chuàng)建 tuples )。鍵值對操作可以在 PairRDDFunction 中得到,他是自動(dòng)包裝了一個(gè)元組RDD。

舉個(gè)例子,下面的代碼就在鍵值對上使用 reduceByKey 操作來計(jì)算當(dāng)前文件的行數(shù)。

val lines = sc.textFile("data.txt")
val pairs = line.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

我們也可以使用 counts.sortByKey(),在這個(gè)例子中,機(jī)會(huì)按字母排序這些簡直對,最后 count.collect() 就將他們帶會(huì)驅(qū)動(dòng)器程序,作為一個(gè)對象數(shù)值使用。

注意 :當(dāng)你使用自定義的對象來作為鍵值對的鍵值,你必須保證這個(gè)自定義的該對象的 equals 方法和與之聯(lián)合匹配的 hashCode() 方法。

End !!!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容