1.14 transformation和action
Spark支持兩種RDD操作:transformation和action。transformation操作會針對已有的RDD創(chuàng)建一個(gè)新的RDD;而action則主要是對RDD進(jìn)行最后的操作,比如遍歷、reduce、保存到文件等,并可以返回結(jié)果給Driver程序。
例如,map就是一種transformation操作,它用于將已有RDD的每個(gè)元素傳入一個(gè)自定義的函數(shù),并獲取一個(gè)新的元素,然后將所有的新元素組成一個(gè)新的RDD。而reduce就是一種action操作,它用于對RDD中的所有元素進(jìn)行聚合操作,并獲取一個(gè)最終的結(jié)果,然后返回給Driver程序。
transformation的特點(diǎn)就是lazy特性。lazy特性指的是,如果一個(gè)spark應(yīng)用中只定義了transformation操作,那么即使你執(zhí)行該應(yīng)用,這些操作也不會執(zhí)行。也就是說,transformation是不會觸發(fā)spark程序的執(zhí)行的,它們只是記錄了對RDD所做的操作,但是不會自發(fā)的執(zhí)行。只有當(dāng)transformation之后,接著執(zhí)行了一個(gè)action操作,那么所有的transformation才會執(zhí)行。Spark通過這種lazy特性,來進(jìn)行底層的spark應(yīng)用執(zhí)行的優(yōu)化,避免產(chǎn)生過多中間結(jié)果。
action操作執(zhí)行,會觸發(fā)一個(gè)spark job的運(yùn)行,從而觸發(fā)這個(gè)action之前所有的transformation的執(zhí)行。這是action的特性。
transformation和action原理剖析

val lines = sc.textFile("spark.txt")
通過textFile()方法,針對外部文件創(chuàng)建了一個(gè)RDD——lines,但是實(shí)際上,程序執(zhí)行到這里為止,spark.txt文件的數(shù)據(jù)是不會加載到內(nèi)存中的。lines只是代表了一個(gè)指向spark.txt文件的引用。
val lineLengths = lines.map(line => line.length)
這里對lines RDD進(jìn)行了map算子,獲取了一個(gè)轉(zhuǎn)換后的lineLengths RDD。但是這里連數(shù)據(jù)都沒有,當(dāng)然也不會做任何操作。lineLengths RDD也只是一個(gè)概念上的東西而已。
val totalLength = lineLengths.reduce(_ + _)
之后,執(zhí)行了一個(gè)action操作——reduce。此時(shí)就會觸發(fā)之前所有transformation操作的執(zhí)行,Spark會將操作拆分成多個(gè)task到多個(gè)機(jī)器上并行執(zhí)行,每個(gè)task會在本地執(zhí)行map操作,并且進(jìn)行本地的reduce聚合。最后會進(jìn)行一個(gè)全局的reduce聚合,然后將結(jié)果返回給Driver程序。
注意,Spark有些特殊的算子,也就是特殊的transformation操作。比如groupByKey、sortByKey、reduceByKey等,其實(shí)只是針對特殊的RDD的。即包含key-value對的RDD。而這種RDD中的元素,實(shí)際上是scala中的一種類型,即Tuple2,也就是包含兩個(gè)值的Tuple。
在scala中,需要手動導(dǎo)入Spark的相關(guān)隱式轉(zhuǎn)換,import org.apache.spark.SparkContext._。然后,對應(yīng)包含Tuple2的RDD,會自動隱式轉(zhuǎn)換為PairRDDFunction,并提供reduceByKey等方法。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* 統(tǒng)計(jì)每行出現(xiàn)的次數(shù)
* @author Administrator
*
*/
public class LineCount {
public static void main(String[] args) {
// 創(chuàng)建SparkConf
SparkConf conf = new SparkConf()
.setAppName("LineCount")
.setMaster("local");
// 創(chuàng)建JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 創(chuàng)建初始RDD,lines,每個(gè)元素是一行文本
JavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//hello.txt");
// 對lines RDD執(zhí)行mapToPair算子,將每一行映射為(line, 1)的這種key-value對的格式
// 然后后面才能統(tǒng)計(jì)每一行出現(xiàn)的次數(shù)
JavaPairRDD<String, Integer> pairs = lines.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1); // key-value對里面的元素是Tuple2
}
});
// 對pairs RDD執(zhí)行reduceByKey算子,統(tǒng)計(jì)出每一行出現(xiàn)的總次數(shù)
JavaPairRDD<String, Integer> lineCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 執(zhí)行一個(gè)action操作,foreach,打印出每一行出現(xiàn)的次數(shù)
lineCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
// lineCounts里面的元素是Tuple2
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + " appears " + t._2 + " times.");
}
});
// 關(guān)閉JavaSparkContext
sc.close();
}
}
// 統(tǒng)計(jì)文件每一行出現(xiàn)的次數(shù)
val lines = sc.textFile("hello.txt")
val linePairs = lines.map(line => (line, 1))
val lineCounts = linePairs.reduceByKey(_ + _)
lineCounts.foreach(lineCount => println(lineCount._1 + " appears " + llineCount._2 + " times."))
常用transformation和action

