1. 創(chuàng)建Pair RDD
//Scala中使用第一個(gè)單詞作為鍵創(chuàng)建出一個(gè)pair RDD
val lines = sc.textFile("/path/README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
//Java中使用第一個(gè)單詞作為鍵創(chuàng)建出一個(gè)pair RDD
JavaRDD<String> lines = sc.textFile("/path/README.md")
JavaPairRDD<String, String> pairs = lines.mapToPair(x -> new Tuple2(x.split(" ")[0], x))
當(dāng)用Scala 和Python 從一個(gè)內(nèi)存中的數(shù)據(jù)集創(chuàng)建pair RDD 時(shí),只需要對(duì)這個(gè)由二元組組成的集合調(diào)用SparkContext.parallelize() 方法。而要使用Java 從內(nèi)存數(shù)據(jù)集創(chuàng)建pair RDD的話,則需要使用SparkContext.parallelizePairs()。
//Java中使用parallelizePairs創(chuàng)建pair RDD
List<Tuple2<String,Integer>> lt = new ArrayList<>();
Tuple2<String,Integer> tp1 = new Tuple2<>("pinda", 2);
Tuple2<String,Integer> tp2 = new Tuple2<>("qank", 6);
Tuple2<String,Integer> tp3 = new Tuple2<>("panda", 5);
lt.add(tp1);
lt.add(tp2);
lt.add(tp3);
JavaPairRDD<String,Integer> data = js.parallelizePairs(lt);
2. Pair RDD的轉(zhuǎn)化操作
2.1 基本轉(zhuǎn)化操作
Pair RDD可以使用所有標(biāo)準(zhǔn)RDD上的可用的轉(zhuǎn)化操作。
//Scala篩選掉長(zhǎng)度超過20個(gè)字符的行
pairs.filter{case (key, value) => value.length < 20}
//Java篩選掉長(zhǎng)度超過20個(gè)字符的行
Function<Tuple2<String, String>, Boolean> longWordFilter =
new Function<Tuple2<String, String>, Boolean>() {
public Boolean call(Tuple2<String, String> keyValue) {
return (keyValue._2().length() < 20);
}
};
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);



2.2 聚合操作
//在Scala 中使用reduceByKey() 和mapValues() 計(jì)算每個(gè)鍵對(duì)應(yīng)的平均值
val rdd = sc.parallelize(List(("panda", 0), ("pink", 3), ("priate", 3), ("panda", 1), ("pink", 4)))
val keyMean = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
- 使用reduceByKey() 和mapValues() 計(jì)算每個(gè)鍵對(duì)應(yīng)的平均值的數(shù)據(jù)流
| key | value | =>mapValues=> | key | value | =>reduceByKey=> | key | value |
|---|---|---|---|---|---|---|---|
| panda | 0 | panda | (0, 1) | panda | (1, 2) | ||
| pink | 3 | pink | (3, 1) | pink | (7, 2) | ||
| pirate | 3 | pirate | (3, 1) | pirate | (3, 1) | ||
| panda | 1 | panda | (1, 1) | ||||
| pink | 4 | pink | (4, 1) |
- 使用flatMap() 和 map()來生成以單詞為鍵、以數(shù)字1 為值的pair RDD,然后使用reduceByKey() 對(duì)所有的單詞進(jìn)行計(jì)數(shù)。
//用Scala 實(shí)現(xiàn)單詞計(jì)數(shù)
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
//對(duì)RDD input使用countByValue() 函數(shù),以更快地實(shí)現(xiàn)單詞計(jì)數(shù)
val result = input.flatMap(x => x.split(" ")).countByValue()
//用Java 實(shí)現(xiàn)單詞計(jì)數(shù)
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = input.flatMap(line -> line.split(" "));
JavaPairRDD<String, Integer> result = words.mapToPair(x -> new Tuple2(x, 1)).
reduceByKey((x, y) -> x + y);
-
combineByKey() 是最為常用的基于鍵進(jìn)行聚合的函數(shù)。大多數(shù)基于鍵聚合的函數(shù)都是用它實(shí)現(xiàn)的。和aggregate() 一樣,combineByKey() 可以讓用戶返回與輸入數(shù)據(jù)的類型不同的返回值。
combineByKey() 有多個(gè)參數(shù)分別對(duì)應(yīng)聚合操作的各個(gè)階段,因而非常適合用來解釋聚合操作各個(gè)階段的功能劃分。- combineByKey的定義
def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null )- 解釋下3個(gè)重要的函數(shù)參數(shù):
- createCombiner: V => C ,這個(gè)函數(shù)把當(dāng)前的值作為參數(shù),此時(shí)我們可以對(duì)其做些附加操作(類型轉(zhuǎn)換)并把它返回 (這一步類似于初始化操作)
- mergeValue: (C, V) => C,該函數(shù)把元素V合并到之前的元素C(createCombiner)上 (這個(gè)操作在每個(gè)分區(qū)內(nèi)進(jìn)行)
- mergeCombiners: (C, C) => C,該函數(shù)把2個(gè)元素C合并 (這個(gè)操作在不同分區(qū)間進(jìn)行)
- 使用combineByKey來求解平均數(shù)的例子
//在Scala 中使用combineByKey() 求每個(gè)鍵對(duì)應(yīng)的平均值 val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0)) val rdd = sc.parallelize(initialScores) type MVType = (Int, Double) //定義一個(gè)元組類型(科目計(jì)數(shù)器,分?jǐn)?shù)) rdd.combineByKey( score => (1, score), (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore), (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2) ).map { case (name, (num, socre)) => (name, socre / num) }.collect- 參數(shù)含義的解釋
- score => (1, score),我們把分?jǐn)?shù)作為參數(shù),并返回了附加的元組類型。 以"Fred"為列,當(dāng)前其分?jǐn)?shù)為88.0 =>(1,88.0) 1表示當(dāng)前科目的計(jì)數(shù)器,此時(shí)只有一個(gè)科目
- (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),注意這里的c1就是createCombiner初始化得到的(1,88.0)。在一個(gè)分區(qū)內(nèi),我們又碰到了"Fred"的一個(gè)新的分?jǐn)?shù)91.0。當(dāng)然我們要把之前的科目分?jǐn)?shù)和當(dāng)前的分?jǐn)?shù)加起來即c1._2 + newScorez,然后把科目計(jì)算器加1即c1._1 + 1
- (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2),注意"Fred"可能是個(gè)學(xué)霸,他選修的科目可能過多而分散在不同的分區(qū)中。所有的分區(qū)都進(jìn)行mergeValue后,接下來就是對(duì)分區(qū)間進(jìn)行合并了,分區(qū)間科目數(shù)和科目數(shù)相加分?jǐn)?shù)和分?jǐn)?shù)相加就得到了總分和總科目數(shù)
- 執(zhí)行結(jié)果
res1: Array[(String, Double)] = Array((Wilma,95.33333333333333), (Fred,91.33333333333333))

//Java 中使用combineByKey() 求每個(gè)鍵對(duì)應(yīng)的平均值
public static class AvgCount implements Serializable {
public AvgCount(int total, int num) { total_ = total; num_ = num; }
public int total_;
public int num_;
public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
- 數(shù)據(jù)分組
groupByKey() cogroup() - 連接
join() rightOuterJoin() leftOuterJoin() - 數(shù)據(jù)排序
sortByKey()
3. Pair RDD的行動(dòng)操作
和轉(zhuǎn)化操作一樣,所有基礎(chǔ)RDD支持的行動(dòng)操作也都在pair RDD上可用。

4. 數(shù)據(jù)分區(qū)
4.1 獲取RDD的數(shù)據(jù)分區(qū)
你可以使用RDD 的partitioner 屬性(Java 中使用partitioner() 方法)來獲取RDD 的分區(qū)方式。它會(huì)返回一個(gè)scala.Option 對(duì)象,這是Scala 中用來存放可能存在的對(duì)象的容器類。你可以對(duì)這個(gè)Option 對(duì)象調(diào)用isDefined() 來檢查其中是否有值,調(diào)用get() 來獲取其中的值
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
//創(chuàng)建一個(gè)由(Int, Int) 對(duì)組成的RDD
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:12
//初始時(shí)沒有分區(qū)方式信息(一個(gè)值為None 的Option 對(duì)象)。
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
//對(duì)第一個(gè)RDD 進(jìn)行哈希分區(qū),創(chuàng)建出了第二個(gè)RDD
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2)).persist()
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
//對(duì)RDD 完成哈希分區(qū)操作
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
4.2 從分區(qū)中獲益的操作
Spark 的許多操作都引入了將數(shù)據(jù)根據(jù)鍵跨節(jié)點(diǎn)進(jìn)行混洗的過程。所有這些操作都會(huì)從數(shù)據(jù)分區(qū)中獲益。能夠從數(shù)據(jù)分區(qū)中獲益的操作有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey() 以及l(fā)ookup()。
4.3 影響分區(qū)方式的操作
會(huì)為生成的結(jié)果RDD設(shè)好分區(qū)方式的操作:cogroup()、groupWith()、join()、lef tOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()、partitionBy()、sort()、mapValues()(如果父RDD 有分區(qū)方式的話)、flatMapValues()(如果父RDD 有分區(qū)方式的話),以及filter()(如果父RDD 有分區(qū)方式的話)。其他所有的操作生成的結(jié)果都不會(huì)存在特定的分區(qū)方式。
最后,對(duì)于二元操作,輸出數(shù)據(jù)的分區(qū)方式取決于父RDD 的分區(qū)方式。默認(rèn)情況下,結(jié)果會(huì)采用哈希分區(qū),分區(qū)的數(shù)量和操作的并行度一樣。不過,如果其中的一個(gè)父RDD 已經(jīng)設(shè)置過分區(qū)方式,那么結(jié)果就會(huì)采用那種分區(qū)方式;如果兩個(gè)父RDD 都設(shè)置過分區(qū)方式,結(jié)果RDD 會(huì)采用第一個(gè)父RDD 的分區(qū)方式。
4.4 自定義分區(qū)
要實(shí)現(xiàn)自定義的分區(qū)器,你需要繼承org.apache.spark.Partitioner 類并實(shí)現(xiàn)下面三個(gè)方法。
- numPartitions: Int:返回創(chuàng)建出來的分區(qū)數(shù)。
- getPartition(key: Any): Int:返回給定鍵的分區(qū)編號(hào)(0 到 numPartitions-1)。
- equals():Java 判斷相等性的標(biāo)準(zhǔn)方法。這個(gè)方法的實(shí)現(xiàn)非常重要,Spark 需要用這個(gè)方法來檢查你的分區(qū)器對(duì)象是否和其他分區(qū)器實(shí)例相同,這樣Spark 才可以判斷兩個(gè)RDD 的分區(qū)方式是否相同。
4.4.1 數(shù)據(jù)分區(qū)示例
舉個(gè)例子,假設(shè)我們要在一個(gè)網(wǎng)頁(yè)的集合上運(yùn)行前一節(jié)中的PageRank 算法。在這里,每個(gè)頁(yè)面的ID(RDD 中的鍵)是頁(yè)面的URL。當(dāng)我們使用簡(jiǎn)單的哈希函數(shù)進(jìn)行分區(qū)時(shí),擁有相似的URL 的頁(yè)面(比如http://www.cnn.com/WORLD 和http://www.cnn.com/US)可能會(huì)被分到完全不同的節(jié)點(diǎn)上。然而,我們知道在同一個(gè)域名下的網(wǎng)頁(yè)更有可能相互鏈接。由于PageRank 需要在每次迭代中從每個(gè)頁(yè)面向它所有相鄰的頁(yè)面發(fā)送一條消息,因此把這些頁(yè)面分組到同一個(gè)分區(qū)中會(huì)更好。可以使用自定義的分區(qū)器來實(shí)現(xiàn)僅根據(jù)域名而不是整個(gè)URL 來分區(qū)。
//使用Scala自定義分區(qū)
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if(code < 0) {
code + numPartitions // 使其非負(fù)
}else{
code
}
}
// 用來讓Spark區(qū)分分區(qū)函數(shù)對(duì)象的Java equals方法
override def equals(other: Any): Boolean = other match {
case dnp: DomainNamePartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
注意,在equals() 方法中,使用Scala 的模式匹配操作符(match)來檢查other 是否是DomainNamePartitioner,并在成立時(shí)自動(dòng)進(jìn)行類型轉(zhuǎn)換;這和Java 中的instanceof() 是一樣的。
使用自定義的Partitioner 是很容易的:只要把它傳給partitionBy() 方法即可。Spark 中有許多依賴于數(shù)據(jù)混洗的方法,比如join() 和groupByKey(),它們也可以接收一個(gè)可選的Partitioner 對(duì)象來控制輸出數(shù)據(jù)的分區(qū)方式。