spark常用算子有兩種:
- transformation:RDD中所有轉(zhuǎn)換算子都是延遲加載,從一個(gè)RDD到另一個(gè)RDD轉(zhuǎn)換沒有立即轉(zhuǎn)換,僅記錄數(shù)據(jù)的邏輯操作,只有要求結(jié)果還回到Driver時(shí)的動(dòng)作時(shí)才會(huì)真正運(yùn)行。
-
action:觸發(fā)action時(shí)才會(huì)真正的執(zhí)行action操作動(dòng)作
transformation常用算子類型如下:
1.textFile (path: String) : RDD[String] 讀取hdfs的文本數(shù)據(jù),還回String元素的RDD,文本一行即為RDD一行;
val lineRdd: RDD[String] = sc.textFile(file_path, numPartitions)
2.reduceByKey (func: (V, V) => V, numPartitions: Int): RDD[(K, V)] 相同的key對(duì)value做聚合,先分區(qū)內(nèi)再整體做聚合,還回與value相同的數(shù)據(jù)類型;
val rdd2 = rdd1.reduceByKey((pre, after) => (pre + after))
3.mapPartitionsWithIndex [U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] f函數(shù)參數(shù)包含分區(qū)編號(hào)和該分區(qū)對(duì)應(yīng)的數(shù)據(jù)集合兩個(gè)參數(shù),在轉(zhuǎn)換的時(shí)候可以把分區(qū)index數(shù)據(jù)加上;參數(shù)preservesPartitioning表示是否保留父RDD的partitioner分區(qū)信息。
val rdd = sc.parallelize(List(12, 13, 14, 15,22,33,45,67,18,98),6)
val partitionIndex = (index: Int, iter: Iterator[Int]) => {
iter.toList.map(item => "index:" + index + ": value: " + item).iterator
}
rdd.mapPartitionsWithIndex(partitionIndex,true)
未完待續(xù)