RDD算子操作分類
測(cè)試用例說(shuō)明 前置方法
/**
* 提供初始化方法,完成輸出目錄的清理
* 在每個(gè)Test方法之前先運(yùn)行
*/
@Before
def init(): Unit ={
val fileSystem: FileSystem = FileSystem.get(new Configuration())
val path = new Path("output")
// 如果輸出目錄存在,就刪除
if (fileSystem.exists(path)){
fileSystem.delete(path,true)
}
}
后置方法
/**
* 每次測(cè)試完成后運(yùn)行
*/
@After
def stop(): Unit ={
sc.stop()
}
成員變量
val sc = new SparkContext(new SparkConf().setAppName("My app").setMaster("local[*]"))
1.taansformation(轉(zhuǎn)換)
它可以實(shí)現(xiàn)把一個(gè)rdd轉(zhuǎn)換成一個(gè)新的rdd,它是延遲加載,不會(huì)立即觸發(fā)任務(wù)的真正運(yùn)行。
比如 flatMap/map/reduceBykey
1.1map
/**
* map 特點(diǎn): 分區(qū)之間是并行(真正并行取決于cores)運(yùn)算
* *
* 同一個(gè)分區(qū)內(nèi),一個(gè)元素執(zhí)行完所有的轉(zhuǎn)換操作后,才開(kāi)始下一個(gè)元素!
*/
@Test
def testMap(): Unit = {
val list: List[Int] = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val rdd2: RDD[Int] = rdd.map(x => {
println(x + "執(zhí)行了第一次Map操作!")
x
})
val rdd3: RDD[Int] = rdd2.map(x => {
println(x+"執(zhí)行了第二次Map操作!")
x
})
rdd3.saveAsTextFile("output")
}

image.png
/*
map : def map[U: ClassTag](f: T => U): RDD[U]
對(duì)當(dāng)前RDD中的每個(gè)元素執(zhí)行map操作,返回一個(gè)新的元素,將元素放入新的MapPartitionsRDD中!
特點(diǎn): ①map操作后,不會(huì)改變分區(qū)數(shù)
②分區(qū)間的數(shù)據(jù)也不會(huì)發(fā)生交換
*/
@Test
def test2(): Unit = {
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val rdd1: RDD[Int] = rdd.map(x => x + 1)
rdd1.saveAsTextFile("output")
}

image.png

image.png
1.2 mapPartitions
/*
mapPartitions : 將一個(gè)分區(qū)作為一個(gè)整體,調(diào)用一次map函數(shù),轉(zhuǎn)換后生成新的分區(qū)集合!
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
和map的區(qū)別: ①傳入的函數(shù)不同,map將一個(gè)元素轉(zhuǎn)為另一個(gè)元素, mapPartitions將一個(gè)集合變?yōu)榱硪粋€(gè)集合!
② mapPartiion邏輯: cleanedF(iter) 批處理
map邏輯: iter.map(cleanF) 個(gè)體處理
③map是全量處理: RDD中有x個(gè)元素,返回的集合也有x個(gè)元素
mapPartition只要返回一個(gè)集合,進(jìn)行過(guò)濾或添加操作!
④ 本質(zhì)是mapPartition是一個(gè)集合調(diào)用一次
在特殊場(chǎng)景,節(jié)省性能,例如將一個(gè)分區(qū)的數(shù)據(jù),寫入到數(shù)據(jù)庫(kù)中
⑤ map是將一個(gè)元素的所有轉(zhuǎn)換操作運(yùn)行結(jié)束后,再繼續(xù)開(kāi)始下一個(gè)元素!
mapPartition: 多個(gè)分區(qū)并行開(kāi)始轉(zhuǎn)換操作,一個(gè)分區(qū)的所有數(shù)據(jù)全部運(yùn)行結(jié)束后,mapPartition才結(jié)束!
一旦某個(gè)分區(qū)中的元素沒(méi)有處理完,整個(gè)分區(qū)的數(shù)據(jù)都無(wú)法釋放!需要更大的內(nèi)存!
spark是分布式運(yùn)算: 時(shí)刻分清Driver 和 Executor
Executor執(zhí)行的是Task(封裝了RDD的執(zhí)行邏輯)
*/
@Test
def testMapPartitions(): Unit = {
// 封裝RDD操作的邏輯
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
// 將分區(qū)中的奇數(shù)提取出來(lái)
val result: RDD[Int] = rdd.mapPartitions(x => {
x.filter(elem => elem % 2 == 1).toIterator
})
result.saveAsTextFile("output")
}

image.png

image.png
1.3 mapPartitionsWithIndex
/*
mapPartitionsWithIndex : 執(zhí)行邏輯 f(index, iter) : index是當(dāng)前分區(qū)的索引
iter是分區(qū)的迭代器
將一個(gè)分區(qū)整體執(zhí)行一次map操作,可以使用分區(qū)的index!
*/
@Test
def test7(): Unit = {
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val result: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => iter.map(elem => (index, elem)))
result.saveAsTextFile("output")
}

image.png

image.png
1.5 flatMap
/*
flatMap : 先map再扁平化。不會(huì)改變分區(qū)和分區(qū)邏輯!
將List(List(1,2),3,List(4,5))進(jìn)行扁平化操作
*/
@Test
def test() : Unit ={
val list = List(List(1, 2), 3, List(4, 5))
val rdd: RDD[Any] = sc.makeRDD(list, 2)
val result: RDD[Any] = rdd.flatMap {
// 將單個(gè)Int,轉(zhuǎn)成集合
case x: Int => List(x)
case y: List[_] => y
}
result.saveAsTextFile("output")
}

image.png

image.png
1.6 glom
/*
glom(): 將一個(gè)分區(qū)的所有元素合并到一個(gè)Array中
*/
@Test
def test2() : Unit ={
val list = List(1,2,3,4)
val rdd: RDD[Any] = sc.makeRDD(list, 2)
val result: RDD[Array[Any]] = rdd.glom()
// result.collect() => Array [ Array[Any],Array[Any] ]
result.collect().foreach(x=>println(x.mkString(",")))
}

image.png
2.action(動(dòng)作)
它會(huì)觸發(fā)任務(wù)的真正運(yùn)行
比如collect/saveAstextFile