Spark 常用算子及代碼

sc.parallelize 和 sc.markRDD

parallelize()源碼

def parallelize[T: ClassTag](  
      seq: Seq[T],  
      numSlices: Int = defaultParallelism): RDD[T] = withScope {  
    assertNotStopped()  
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())  
  }  

makeRDD(),有兩種重構方法

/** Distribute a local Scala collection to form an RDD.  
   *  
   * This method is identical to `parallelize`.  
   */  
  def makeRDD[T: ClassTag](  
      seq: Seq[T],  
      numSlices: Int = defaultParallelism): RDD[T] = withScope {  
    parallelize(seq, numSlices)  
  }  
/**  
   * Distribute a local Scala collection to form an RDD, with one or more  
   * location preferences (hostnames of Spark nodes) for each object.  
   * Create a new partition for each collection item.  
   */  
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {  
    assertNotStopped()  
    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap  
    new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)  
  }  

注釋的意思為:分配一個本地Scala集合形成一個RDD,為每個集合對象創(chuàng)建一個最佳分區(qū)

測試使用

object MyTask2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("rdd maker").setMaster("local")
    val sc = new SparkContext(conf)
    val list = List(("A",List("a1","a2","a3")),("B",List("b1","b2","b3"),("C",List("c1","c2","c3"))))
    val rddmaker = sc.makeRDD(list)
    val rddP = sc.parallelize(list)

    println("rddmaker partitions size:",rddmaker.partitions.size)
    println("rddP partitions size:",rddP.partitions.size)
  }

}
//(rddmaker partitions size:,1)
//(rddP partitions size:,1)

distinct

代碼

object MyTask3 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("task3"))
    println("rdd partitions size is ",rdd.partitions.size)
    val rdd = sc.parallelize(List("a","b","c","b","b","a"))
    rdd.distinct().collect().foreach(print(_))
  }
}
//(rdd partitions size is ,1)
//abc

union

代碼

object MyTask4 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddUnion = rddLeft.union(rddRight)
    rddUnion.collect().foreach(item => print(item + ","))
  }
}
//2,3,4,5,1,3,5,7,

intersection 求交集

object MyTask5 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddIntersec = rddLeft.intersection(rddRight)
    rddIntersec.collect().foreach(item => print(item + ","))
  }
}
//5,3,

subtract 把Rdd中的與另一個Rdd相同的元素去掉

代碼

object MyTask6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task6"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddSubtract = rddLeft.subtract(rddRight)
    rddSubtract.collect().foreach(item => print(item + ","))
  }
}
//2,4,

cartesian 笛卡爾積

代碼

object MyTask7 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task7"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddCartesian = rddLeft.cartesian(rddRight)
    rddCartesian.collect().foreach(item => print(item + ","))
  }
}
//(2,1),(2,3),(2,5),(2,7),(3,1),(3,3),(3,5),(3,7),(4,1),(4,3),(4,5),(4,7),(5,1),(5,3),(5,5),(5,7)

countByValue 求出value出現(xiàn)的次數(shù)

代碼

object MyTask8 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task8"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddUnion = rddLeft.union(rddRight)
    val rddCountByValue:scala.collection.Map[String, scala.Long] = rddUnion.countByValue
    rddCountByValue.foreach(item => println(item._1 + "," + item._2))
  }
}
/*
4,1
5,2
1,1
2,1
7,1
3,2
*/

reduce 并行計算出函數(shù)

代碼

object MyTask9 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task9"))
    val rdd = sc.parallelize(1 to 11)
    val result = rdd.reduce((x,y) => x+y)
    println(result)
  }
}
//66

fold

代碼

object MyTask10 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task10"))
    val rdd = sc.parallelize(1 to 11,2)
    val result = rdd.fold(10)(_+_)
    println(result)
  }
}
//96

解釋,與reduce類似,只是多了一個初始值。

aggregate

函數(shù)簽名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

解釋:
aggregate先對每個分區(qū)的元素做聚集,然后對所有分區(qū)的結果做聚集,聚集過程中,使用的是給定的聚集函數(shù)以及初始值”zero value”。這個函數(shù)能返回一個與原始RDD不同的類型U,因此,需要一個合并RDD類型T到結果類型U的函數(shù),還需要一個合并類型U的函數(shù)。這兩個函數(shù)都可以修改和返回他們的第一個參數(shù),而不是重新新建一個U類型的參數(shù)以避免重新分配內存。
參數(shù)zeroValue:seqOp運算符的每個分區(qū)的累積結果的初始值以及combOp運算符的不同分區(qū)的組合結果的初始值 - 這通常將是初始元素(例如“Nil”表的列表 連接或“0”表示求和)
參數(shù)seqOp: 每個分區(qū)累積結果的聚集函數(shù)。
參數(shù)combOp: 一個關聯(lián)運算符用于組合不同分區(qū)的結果

代碼

object MyTask11 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task11"))
    val rdd = sc.parallelize(1 to 4,3)
    val result = rdd.aggregate((0,0,0))(
      (acc,number) => (acc._1+number,acc._1,acc._3+1),
      (x,y) => (x._1 + y._1,x._2 + y._2,x._3+y._3)
    )
    println(result)
  }
}
//(10,3,4)
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
禁止轉載,如需轉載請通過簡信或評論聯(lián)系作者。

相關閱讀更多精彩內容

友情鏈接更多精彩內容