5. 第四章 鍵值對(duì)操作

1. 創(chuàng)建Pair RDD

//Scala中使用第一個(gè)單詞作為鍵創(chuàng)建出一個(gè)pair RDD
val lines = sc.textFile("/path/README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
//Java中使用第一個(gè)單詞作為鍵創(chuàng)建出一個(gè)pair RDD
JavaRDD<String> lines = sc.textFile("/path/README.md")
JavaPairRDD<String, String> pairs = lines.mapToPair(x -> new Tuple2(x.split(" ")[0], x))

當(dāng)用Scala 和Python 從一個(gè)內(nèi)存中的數(shù)據(jù)集創(chuàng)建pair RDD 時(shí),只需要對(duì)這個(gè)由二元組組成的集合調(diào)用SparkContext.parallelize() 方法。而要使用Java 從內(nèi)存數(shù)據(jù)集創(chuàng)建pair RDD的話,則需要使用SparkContext.parallelizePairs()。

//Java中使用parallelizePairs創(chuàng)建pair RDD
List<Tuple2<String,Integer>> lt = new ArrayList<>();
Tuple2<String,Integer> tp1 = new Tuple2<>("pinda", 2);
Tuple2<String,Integer> tp2 = new Tuple2<>("qank", 6);
Tuple2<String,Integer> tp3 = new Tuple2<>("panda", 5);
lt.add(tp1);
lt.add(tp2);
lt.add(tp3);
JavaPairRDD<String,Integer> data = js.parallelizePairs(lt);

2. Pair RDD的轉(zhuǎn)化操作

2.1 基本轉(zhuǎn)化操作

Pair RDD可以使用所有標(biāo)準(zhǔn)RDD上的可用的轉(zhuǎn)化操作。

//Scala篩選掉長(zhǎng)度超過20個(gè)字符的行
pairs.filter{case (key, value) => value.length < 20}
//Java篩選掉長(zhǎng)度超過20個(gè)字符的行
Function<Tuple2<String, String>, Boolean> longWordFilter =
  new Function<Tuple2<String, String>, Boolean>() {
    public Boolean call(Tuple2<String, String> keyValue) {
      return (keyValue._2().length() < 20);
  }
};
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);
Pair RDD的轉(zhuǎn)化操作
針對(duì)兩個(gè)Pair RDD的轉(zhuǎn)化操作
2.2 聚合操作
//在Scala 中使用reduceByKey() 和mapValues() 計(jì)算每個(gè)鍵對(duì)應(yīng)的平均值
val rdd = sc.parallelize(List(("panda", 0), ("pink", 3), ("priate", 3), ("panda", 1), ("pink", 4)))
val keyMean = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
  • 使用reduceByKey() 和mapValues() 計(jì)算每個(gè)鍵對(duì)應(yīng)的平均值的數(shù)據(jù)流
key value =>mapValues=> key value =>reduceByKey=> key value
panda 0 panda (0, 1) panda (1, 2)
pink 3 pink (3, 1) pink (7, 2)
pirate 3 pirate (3, 1) pirate (3, 1)
panda 1 panda (1, 1)
pink 4 pink (4, 1)
  • 使用flatMap() 和 map()來生成以單詞為鍵、以數(shù)字1 為值的pair RDD,然后使用reduceByKey() 對(duì)所有的單詞進(jìn)行計(jì)數(shù)。
//用Scala 實(shí)現(xiàn)單詞計(jì)數(shù)
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
//對(duì)RDD input使用countByValue() 函數(shù),以更快地實(shí)現(xiàn)單詞計(jì)數(shù)
val result = input.flatMap(x => x.split(" ")).countByValue()
//用Java 實(shí)現(xiàn)單詞計(jì)數(shù)
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = input.flatMap(line -> line.split(" "));
JavaPairRDD<String, Integer> result = words.mapToPair(x -> new Tuple2(x, 1)).
    reduceByKey((x, y) -> x + y);
  • combineByKey() 是最為常用的基于鍵進(jìn)行聚合的函數(shù)。大多數(shù)基于鍵聚合的函數(shù)都是用它實(shí)現(xiàn)的。和aggregate() 一樣,combineByKey() 可以讓用戶返回與輸入數(shù)據(jù)的類型不同的返回值。
    combineByKey() 有多個(gè)參數(shù)分別對(duì)應(yīng)聚合操作的各個(gè)階段,因而非常適合用來解釋聚合操作各個(gè)階段的功能劃分。

    1. combineByKey的定義
    def combineByKey[C](  
        createCombiner: V => C,  
        mergeValue: (C, V) => C,  
        mergeCombiners: (C, C) => C,  
        partitioner: Partitioner,  
        mapSideCombine: Boolean = true,  
        serializer: Serializer = null )  
    
    1. 解釋下3個(gè)重要的函數(shù)參數(shù):
    • createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回 (這一步類似于初始化操作)
    • mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)
    • mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)
    1. 使用combineByKey來求解平均數(shù)的例子
    //在Scala 中使用combineByKey() 求每個(gè)鍵對(duì)應(yīng)的平均值
    val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), 
       ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))  
    val rdd = sc.parallelize(initialScores)  
    type MVType = (Int, Double) //定義一個(gè)元組類型(科目計(jì)數(shù)器,分?jǐn)?shù))  
    rdd.combineByKey(  
      score => (1, score),  
      (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),  
      (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)  
    ).map { case (name, (num, socre)) => (name, socre / num) }.collect  
    
    1. 參數(shù)含義的解釋
    • score => (1, score),我們把分?jǐn)?shù)作為參數(shù),并返回了附加的元組類型。 以"Fred"為列,當(dāng)前其分?jǐn)?shù)為88.0 =>(1,88.0) 1表示當(dāng)前科目的計(jì)數(shù)器,此時(shí)只有一個(gè)科目
    • (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意這里的c1就是createCombiner初始化得到的(1,88.0)。在一個(gè)分區(qū)內(nèi),我們又碰到了"Fred"的一個(gè)新的分?jǐn)?shù)91.0。當(dāng)然我們要把之前的科目分?jǐn)?shù)和當(dāng)前的分?jǐn)?shù)加起來即c1._2 + newScorez,然后把科目計(jì)算器加1即c1._1 + 1
    • (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是個(gè)學(xué)霸,他選修的科目可能過多而分散在不同的分區(qū)中。所有的分區(qū)都進(jìn)行mergeValue后,接下來就是對(duì)分區(qū)間進(jìn)行合并了,分區(qū)間科目數(shù)和科目數(shù)相加分?jǐn)?shù)和分?jǐn)?shù)相加就得到了總分和總科目數(shù)
    1. 執(zhí)行結(jié)果
    res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))  
    
combineByKey()數(shù)據(jù)流示意圖
//Java 中使用combineByKey() 求每個(gè)鍵對(duì)應(yīng)的平均值
public static class AvgCount implements Serializable {
  public AvgCount(int total, int num) { total_ = total; num_ = num; }
  public int total_;
  public int num_;
  public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
  public AvgCount call(Integer x) {
    return new AvgCount(x, 1);
  }
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
  new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
      a.total_ += x;
      a.num_ += 1;
      return a;
   }
};
Function2<AvgCount, AvgCount, AvgCount> combine =
  new Function2<AvgCount, AvgCount, AvgCount>() {
    public AvgCount call(AvgCount a, AvgCount b) {
    a.total_ += b.total_;
    a.num_ += b.num_;
    return a;
  }
};
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
    nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
  System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
  • 數(shù)據(jù)分組 groupByKey() cogroup()
  • 連接 join() rightOuterJoin() leftOuterJoin()
  • 數(shù)據(jù)排序 sortByKey()

3. Pair RDD的行動(dòng)操作

和轉(zhuǎn)化操作一樣,所有基礎(chǔ)RDD支持的行動(dòng)操作也都在pair RDD上可用。

一些額外的行動(dòng)操作

4. 數(shù)據(jù)分區(qū)

4.1 獲取RDD的數(shù)據(jù)分區(qū)

你可以使用RDD 的partitioner 屬性(Java 中使用partitioner() 方法)來獲取RDD 的分區(qū)方式。它會(huì)返回一個(gè)scala.Option 對(duì)象,這是Scala 中用來存放可能存在的對(duì)象的容器類。你可以對(duì)這個(gè)Option 對(duì)象調(diào)用isDefined() 來檢查其中是否有值,調(diào)用get() 來獲取其中的值

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
//創(chuàng)建一個(gè)由(Int, Int) 對(duì)組成的RDD
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
//初始時(shí)沒有分區(qū)方式信息(一個(gè)值為None 的Option 對(duì)象)。
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
//對(duì)第一個(gè)RDD 進(jìn)行哈希分區(qū),創(chuàng)建出了第二個(gè)RDD
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2)).persist()
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
//對(duì)RDD 完成哈希分區(qū)操作
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
4.2 從分區(qū)中獲益的操作

Spark 的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨節(jié)點(diǎn)進(jìn)行混洗的過程。所有這些操作都會(huì)從數(shù)據(jù)分區(qū)中獲益。能夠從數(shù)據(jù)分區(qū)中獲益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey() 以及l(fā)ookup()。

4.3 影響分區(qū)方式的操作

會(huì)為生成的結(jié)果RDD設(shè)好分區(qū)方式的操作:cogroup()、groupWith()、join()、lef tOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()(如果父RDD 有分區(qū)方式的話)、flatMapValues()(如果父RDD 有分區(qū)方式的話),以及filter()(如果父RDD 有分區(qū)方式的話)。其他所有的操作生成的結(jié)果都不會(huì)存在特定的分區(qū)方式。
最后,對(duì)于二元操作,輸出數(shù)據(jù)的分區(qū)方式取決于父RDD 的分區(qū)方式。默認(rèn)情況下,結(jié)果會(huì)采用哈希分區(qū),分區(qū)的數(shù)量和操作的并行度一樣。不過,如果其中的一個(gè)父RDD 已經(jīng)設(shè)置過分區(qū)方式,那么結(jié)果就會(huì)采用那種分區(qū)方式;如果兩個(gè)父RDD 都設(shè)置過分區(qū)方式,結(jié)果RDD 會(huì)采用第一個(gè)父RDD 的分區(qū)方式。

4.4 自定義分區(qū)

要實(shí)現(xiàn)自定義的分區(qū)器,你需要繼承org.apache.spark.Partitioner 類并實(shí)現(xiàn)下面三個(gè)方法。

  • numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)。
  • getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(hào)(0 到 numPartitions-1)。
  • equals():Java 判斷相等性的標(biāo)準(zhǔn)方法。這個(gè)方法的實(shí)現(xiàn)非常重要,Spark 需要用這個(gè)方法來檢查你的分區(qū)器對(duì)象是否和其他分區(qū)器實(shí)例相同,這樣Spark 才可以判斷兩個(gè)RDD 的分區(qū)方式是否相同。
4.4.1 數(shù)據(jù)分區(qū)示例

舉個(gè)例子,假設(shè)我們要在一個(gè)網(wǎng)頁(yè)的集合上運(yùn)行前一節(jié)中的PageRank 算法。在這里,每個(gè)頁(yè)面的ID(RDD 中的鍵)是頁(yè)面的URL。當(dāng)我們使用簡(jiǎn)單的哈希函數(shù)進(jìn)行分區(qū)時(shí),擁有相似的URL 的頁(yè)面(比如http://www.cnn.com/WORLDhttp://www.cnn.com/US)可能會(huì)被分到完全不同的節(jié)點(diǎn)上。然而,我們知道在同一個(gè)域名下的網(wǎng)頁(yè)更有可能相互鏈接。由于PageRank 需要在每次迭代中從每個(gè)頁(yè)面向它所有相鄰的頁(yè)面發(fā)送一條消息,因此把這些頁(yè)面分組到同一個(gè)分區(qū)中會(huì)更好。可以使用自定義的分區(qū)器來實(shí)現(xiàn)僅根據(jù)域名而不是整個(gè)URL 來分區(qū)。

//使用Scala自定義分區(qū)
class DomainNamePartitioner(numParts: Int) extends Partitioner {
  override def numPartitions: Int = numParts
  override def getPartition(key: Any): Int = {
    val domain = new Java.net.URL(key.toString).getHost()
    val code = (domain.hashCode % numPartitions)
    if(code < 0) {
      code + numPartitions // 使其非負(fù)
    }else{
      code
    }
  }
  // 用來讓Spark區(qū)分分區(qū)函數(shù)對(duì)象的Java equals方法
  override def equals(other: Any): Boolean = other match {
    case dnp: DomainNamePartitioner =>
      dnp.numPartitions == numPartitions
    case _ =>
      false
  }
}

注意,在equals() 方法中,使用Scala 的模式匹配操作符(match)來檢查other 是否是DomainNamePartitioner,并在成立時(shí)自動(dòng)進(jìn)行類型轉(zhuǎn)換;這和Java 中的instanceof() 是一樣的。

使用自定義的Partitioner 是很容易的:只要把它傳給partitionBy() 方法即可。Spark 中有許多依賴于數(shù)據(jù)混洗的方法,比如join() 和groupByKey(),它們也可以接收一個(gè)可選的Partitioner 對(duì)象來控制輸出數(shù)據(jù)的分區(qū)方式。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容