我們通過Spark WordCount動(dòng)手實(shí)踐,編寫單詞計(jì)數(shù)代碼;在wordcount.scala的基礎(chǔ)上,從數(shù)據(jù)流動(dòng)的視角深入分析Spark RDD的數(shù)據(jù)處理過程。
首先需要建立一個(gè)文本文件helloSpark.txt,helloSpark.txt的文本內(nèi)容如下。
Hello Spark Hello Scala
Hello Hadoop
Hello Flink
Spark is Awesome
然后在Eclipse中編寫wordcount.scala的代碼如下。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object wordcount {
? def main(args: Array[String]): Unit = {
? ? // 第1步:創(chuàng)建Spark的配置對象SparkConf,設(shè)置Spark程序運(yùn)行時(shí)的配置信息,
? ? val conf = new SparkConf().setAppName("My First Spark APP").setMaster("local")
? ? // 第2步:創(chuàng)建SparkContext對象
? ? val sc = new SparkContext(conf)
? ? // 第3步:根據(jù)具體的數(shù)據(jù)來源來創(chuàng)建RDD
? ? val lines = sc.textFile("helloSpark.txt", 1)
? ? // 第4步:對初始的RDD進(jìn)行Transformation級別的處理,如通過map、filter等
? ? val words = lines.flatMap{line=>line.split(" ")}
? ? val pairs = words.map{word=>(word,1)}
? ? val wordCountsOdered = pairs.reduceByKey(_+_).map(
? ? ? pair=>(pair._2,pair._1)? ?
? ? ).sortByKey(false).map(pair=>(pair._2,pair._1))
? ? wordCountsOdered.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2))
? ? sc.stop()
? }
}
在Eclipse中運(yùn)行程序,wordcount.scala的運(yùn)行結(jié)果如下:
二、解析RDD生成的內(nèi)部機(jī)制
下面詳細(xì)解析一下wordcount.scala的運(yùn)行原理。
(1)從數(shù)據(jù)流動(dòng)視角解密WordCount,使用Spark作單詞計(jì)數(shù)統(tǒng)計(jì),搞清楚數(shù)據(jù)到底是怎么流動(dòng)的。
(2)從RDD依賴關(guān)系的視角解密WordCount。Spark中的一切操作都是RDD,后面的RDD對前面的RDD有依賴關(guān)系。
(3)DAG與血統(tǒng)Lineage的思考。
在wordcount.scala的基礎(chǔ)上,我們從數(shù)據(jù)流動(dòng)的視角分析數(shù)據(jù)到底是怎么處理的。下面有一張WordCount數(shù)據(jù)處理過程圖,由于圖片較大,為了方便閱讀,將原圖分成兩張圖,如下面兩張圖所示。
數(shù)據(jù)在生產(chǎn)環(huán)境中默認(rèn)在HDFS中進(jìn)行分布式存儲,如果在分布式集群中,我們的機(jī)器會分成不同的節(jié)點(diǎn)對數(shù)據(jù)進(jìn)行處理,這里我們在本地測試,重點(diǎn)關(guān)注數(shù)據(jù)是怎么流動(dòng)的。處理的第一步是獲取數(shù)據(jù),讀取數(shù)據(jù)會生成HadoopRDD。
在WordCount.scala中,單擊sc.textFile進(jìn)入Spark框架,SparkContext.scala的textFile的源碼如下。
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
? ? path: String,
? ? minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
? assertNotStopped()
? hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
? ? minPartitions).map(pair => pair._2.toString).setName(path)
}
下面看一下hadoopFile的源碼,HadoopRDD從Hdfs上讀取分布式數(shù)據(jù),并且以數(shù)據(jù)分片的方式存在于集群中。所謂的數(shù)據(jù)分片,就是把我們要處理的數(shù)據(jù)分成不同的部分,例如,在集群中有4個(gè)節(jié)點(diǎn),粗略的劃分可以認(rèn)為將數(shù)據(jù)分成4個(gè)部分,4條語句就分成4個(gè)部分。例如,Hello Spark在第一臺機(jī)器上,Hello Hadoop在第二臺機(jī)器上,Hello Flink在第三臺機(jī)器上,Spark is Awesome在第四臺機(jī)器上。HadoopRDD幫助我們從磁盤上讀取數(shù)據(jù),計(jì)算的時(shí)候會分布式地放入內(nèi)存中,Spark運(yùn)行在Hadoop上,要借助Hadoop來讀取數(shù)據(jù)。
Spark的特點(diǎn)包括:分布式、基于內(nèi)存(部分基于磁盤)、可迭代;默認(rèn)分片策略Block多大,分片就多大。但這種說法不完全準(zhǔn)確,因?yàn)榉制涗浛赡芸鐑蓚€(gè)Block,所以一個(gè)分片不會嚴(yán)格地等于Block的大小。例如,HDFS的Block大小是128MB的話,分片可能多幾個(gè)字節(jié)或少幾個(gè)字節(jié)。分片不一定小于128MB,因?yàn)槿绻詈笠粭l記錄跨兩個(gè)Block,分片會把最后一條記錄放在前一個(gè)分片中。這里,HadoopRDD用了4個(gè)數(shù)據(jù)分片,設(shè)想為128M左右。
hadoopFile的源碼如下。
def hadoopFile[K, V](
? ? path: String,
? ? inputFormatClass: Class[_ <: InputFormat[K, V]],
? ? keyClass: Class[K],
? ? valueClass: Class[V],
? ? minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
? assertNotStopped()
? // This is a hack to enforce loading hdfs-site.xml.
? // See SPARK-11227 for details.
? FileSystem.getLocal(hadoopConfiguration)
? // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
? val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
? val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
? new HadoopRDD(
? ? this,
? ? confBroadcast,
? ? Some(setInputPathsFunc),
? ? inputFormatClass,
? ? keyClass,
? ? valueClass,
? ? minPartitions).setName(path)
}
SparkContext.scala的textFile源碼中,調(diào)用hadoopFile方法后進(jìn)行了map轉(zhuǎn)換操作,map對讀取的每一行數(shù)據(jù)進(jìn)行轉(zhuǎn)換,讀入的數(shù)據(jù)是一個(gè)Tuple,Key值為索引,Value值為每行數(shù)據(jù)的內(nèi)容,生成MapPartitionsRDD。這里,map(pair => pair._2.toString)是基于HadoopRDD產(chǎn)生的Partition去掉的行Key產(chǎn)生的Value,第二個(gè)元素是讀取的每行數(shù)據(jù)內(nèi)容。MapPartitionsRDD是Spark框架產(chǎn)生的,運(yùn)行中可能產(chǎn)生一個(gè)RDD,也可能產(chǎn)生兩個(gè)RDD。例如,textFile中Spark框架就產(chǎn)生了兩個(gè)RDD,即HadoopRDD和MapPartitionsRDD。下面是map的源碼。
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
? val cleanF = sc.clean(f)
? new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
我們再來看一下WordCount業(yè)務(wù)代碼,對讀取的每行數(shù)據(jù)進(jìn)行flatMap轉(zhuǎn)換。這里,flatMap對RDD中的每一個(gè)Partition的每一行數(shù)據(jù)內(nèi)容進(jìn)行單詞切分,如有4個(gè)Partition分別進(jìn)行單詞切分,將“Hello Spark”切分成單詞“Hello”和“Spark”,對每一個(gè)Partition中的每一行進(jìn)行單詞切分并合并成一個(gè)大的單詞實(shí)例的集合。flatMap轉(zhuǎn)換生成的仍然是MapPartitionsRDD:
/**
*? Return a new RDD by first applying a function to all elements of this
*? RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
? val cleanF = sc.clean(f)
? new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
繼續(xù)WordCount業(yè)務(wù)代碼,計(jì)數(shù)之后進(jìn)行一個(gè)關(guān)鍵的reduceByKey操作,對全局的數(shù)據(jù)進(jìn)行計(jì)數(shù)統(tǒng)計(jì)。reduceByKey對相同的Key進(jìn)行Value的累計(jì)(包括Local和Reducer級別,同時(shí)Reduce)。reduceByKey在MapPartitionsRDD之后,在Local reduce級別本地進(jìn)行了統(tǒng)計(jì),這里也是MapPartitionsRDD。例如,在本地將(Hello,1),(Spark,1),(Hello,1),(Scala,1)匯聚成(Hello,2),(Spark,1),(Scala,1)。
Shuffle之前的Local Reduce操作主要負(fù)責(zé)本地局部統(tǒng)計(jì),并且把統(tǒng)計(jì)以后的結(jié)果按照分區(qū)策略放到不同的file。舉一個(gè)簡單的例子,如果下一個(gè)階段Stage是3個(gè)并行度,每個(gè)Partition進(jìn)行l(wèi)ocal reduce以后,將自己的數(shù)據(jù)分成3種類型,最簡單的方式是根據(jù)HashCode按3取模。
PairRDDFunctions.scala的reduceByKey的源碼如下。
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
* parallelism level.
*/
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
? reduceByKey(defaultPartitioner(self), func)
}
至此,前面所有的操作都是一個(gè)Stage,一個(gè)Stage意味著什么:完全基于內(nèi)存操作。父Stage:Stage內(nèi)部的操作是基于內(nèi)存迭代的,也可以進(jìn)行Cache,這樣速度快很多。不同于Hadoop的Map Redcue,Hadoop Map Redcue每次都要經(jīng)過磁盤。
reduceByKey在Local reduce本地匯聚以后生成的MapPartitionsRDD仍屬于父Stage;然后reduceByKey展開真正的Shuffle操作,Shuffle是Spark甚至整個(gè)分布式系統(tǒng)的性能瓶頸,Shuffle產(chǎn)生ShuffleRDD,ShuffledRDD就變成另一個(gè)Stage,為什么是變成另外一個(gè)Stage?因?yàn)橐W(wǎng)絡(luò)傳輸,網(wǎng)絡(luò)傳輸不能在內(nèi)存中進(jìn)行迭代。
從WordCount業(yè)務(wù)代碼pairs.reduceByKey(_+_)中看一下PairRDDFunctions.scala的reduceByKey的源碼。
1
2
3
4
5
6
7
8
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
? combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
reduceByKey內(nèi)部調(diào)用了combineByKeyWithClassTag方法。下面看一下PairRDDFunctions. scala的combineByKeyWithClassTag的源碼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def combineByKeyWithClassTag[C](
? ? createCombiner: V => C,
? ? mergeValue: (C, V) => C,
? ? mergeCombiners: (C, C) => C,
? ? partitioner: Partitioner,
? ? mapSideCombine: Boolean = true,
? ? serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
? require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
? if (keyClass.isArray) {
? ? if (mapSideCombine) {
? ? ? throw new SparkException("Cannot use map-side combining with array keys.")
? ? }
? ? if (partitioner.isInstanceOf[HashPartitioner]) {
? ? ? throw new SparkException("Default partitioner cannot partition array keys.")
? ? }
? }
? val aggregator = new Aggregator[K, V, C](
? ? self.context.clean(createCombiner),
? ? self.context.clean(mergeValue),
? ? self.context.clean(mergeCombiners))
? if (self.partitioner == Some(partitioner)) {
? ? self.mapPartitions(iter => {
? ? ? val context = TaskContext.get()
? ? ? new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
? ? }, preservesPartitioning = true)
? } else {
? ? new ShuffledRDD[K, V, C](self, partitioner)
? ? ? .setSerializer(serializer)
? ? ? .setAggregator(aggregator)
? ? ? .setMapSideCombine(mapSideCombine)
? }
}
在combineByKeyWithClassTag方法中就用new()函數(shù)創(chuàng)建了ShuffledRDD。
前面假設(shè)有4臺機(jī)器并行計(jì)算,每臺機(jī)器在自己的內(nèi)存中進(jìn)行迭代計(jì)算,現(xiàn)在產(chǎn)生Shuffle,數(shù)據(jù)就要進(jìn)行分類,MapPartitionsRDD數(shù)據(jù)根據(jù)Hash已經(jīng)分好類,我們就抓取MapPartitionsRDD中的數(shù)據(jù)。我們從第一臺機(jī)器中獲取的內(nèi)容為(Hello,2),從第二臺機(jī)器中獲取的內(nèi)容為(Hello,1),從第三臺機(jī)器中獲取的內(nèi)容為(Hello,1),把所有的Hello都抓過來。同樣,我們把其他的數(shù)據(jù)(Hadoop,1),(Flink,1)……都抓過來。
這就是Shuffle的過程,根據(jù)數(shù)據(jù)的分類拿到自己需要的數(shù)據(jù)。注意,MapPartitionsRDD屬于第一個(gè)Stage,是父Stage,內(nèi)部基于內(nèi)存進(jìn)行迭代,不需要操作都要讀寫磁盤,所以速度非???;從計(jì)算算子的角度講,reduceByKey發(fā)生在哪里?reduceByKey發(fā)生的計(jì)算過程包括兩個(gè)RDD:一個(gè)是MapPartitionsRDD;一個(gè)是ShuffledRDD。ShuffledRDD要產(chǎn)生網(wǎng)絡(luò)通信。
reduceByKey之后,我們將結(jié)果收集起來,進(jìn)行全局級別的reduce,產(chǎn)生reduceByKey的最后結(jié)果,如將(Hello,2),(Hello,1),(Hello,1)在內(nèi)部變成(Hello,4),其他數(shù)據(jù)也類似統(tǒng)計(jì)。這里reduceByKey之后,如果通過Collect將數(shù)據(jù)收集起來,就會產(chǎn)生MapPartitionsRDD。從Collect的角度講,MapPartitionsRDD的作用是將結(jié)果收集起來發(fā)送給Driver;從saveAsTextFile輸出到Hdfs的角度講,例如輸出(Hello,4),其中Hello是key,4是Value嗎?不是!這里(Hello,4)就是value,這就需要設(shè)計(jì)一個(gè)key出來。
下面是RDD.scala的saveAsTextFile方法。
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String): Unit = withScope {
? // https://issues.apache.org/jira/browse/SPARK-2075
? //
? // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
? // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
? // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
? // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
? // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
? //
? // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
? // same bytecodes for `saveAsTextFile`.
? val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
? val textClassTag = implicitly[ClassTag[Text]]
? val r = this.mapPartitions { iter =>
? ? val text = new Text()
? ? iter.map { x =>
? ? ? text.set(x.toString)
? ? ? (NullWritable.get(), text)
? ? }
? }
? RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
? ? .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
深圳網(wǎng)站建設(shè)www.sz886.com