還是看wordcount案例
val lines = sc.textFile()
val words = lines.flatMap(line => line.split(" "))
val pairs = words.map(word => (word, 1))
val counts = pairs.reduceByKey(_ + _)
counts.foreach(count => println(count._1 + ": " + count._2))
首先看textFile()
/**
* 首先,hadoopFile()方法的調(diào)用,會創(chuàng)建一個HadoopRDD,其中的元素,其實是(key,value)pais
* key是hdfs或文本文件的每一行的offset,value是文本行
* 然后對HadoopRDD調(diào)用map()方法,會剔除key,只保留value,然后會獲得一個MapPartitionRDD
* MapPartitionRDD內(nèi)部的元素,其實就是一行一行的文本行
* @param path
* @param minPartitions
* @return
*/
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map( pair => pair._2.toString).setName(path)
}
flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
map
def map[U: ClassTag](f: T => U): RDD[U] = {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
reduceByKey
其實RDD里是沒有reduceByKey的,因此對RDD調(diào)用reduceByKey()方法的時候,會觸發(fā)scala的隱式轉(zhuǎn)換;此時就會在作用域內(nèi),尋找隱式轉(zhuǎn)換,會在RDD中找到rddToPairRDDFunctions()隱式轉(zhuǎn)換,然后將RDD轉(zhuǎn)換為PairRDDFunctions。
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
接著會調(diào)用PairRDDFunctions中的reduceByKey()方法
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
看foreach
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
調(diào)用了runJob方法,一步步追蹤runJob方法,首先調(diào)用SparkContext的runJob
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
還是SparkContext的runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
}
還是SparkContext的runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
results
}
還是SparkContext的runJob
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 調(diào)用SparkContext,之前初始化時創(chuàng)建的dagScheduler的runJob()方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}