Spark 創(chuàng)建算子源碼解析

Spark創(chuàng)建方式可以通過(guò)集合進(jìn)行創(chuàng)建,或者通過(guò)HDFS等存儲(chǔ)文件創(chuàng)建,還可以基于其他算子進(jìn)行轉(zhuǎn)換操作。

1. 基于集合的創(chuàng)建

parallelize(seq, numSlices)

  • 使用方式

通過(guò)parallelize創(chuàng)建RDD, 可以將driver端的集合創(chuàng)建為RDD。通過(guò)傳入Array或Seq,并設(shè)置其分區(qū)值,創(chuàng)建ParallelCollectionRDD。

val rdd = spark.sparkContext.parallelize(Array(("a", 1), ("b", 2), ("c", 3)), 2)
  • 源碼解析
override def getPartitions: Array[Partition] = {
  // RDD調(diào)用slice方法
  val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
  slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}

parallelize實(shí)質(zhì)是使用ParallelCollectionRDD.slice將數(shù)組中的數(shù)據(jù)進(jìn)行切分,并分配到各個(gè)分區(qū)中。

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  (0 until numSlices).iterator.map { i =>
    val start = ((i * length) / numSlices).toInt
    val end = (((i + 1) * length) / numSlices).toInt
    (start, end)
  }
}

拆分的規(guī)則,如上所示,將start =(分區(qū)id * 數(shù)據(jù)總長(zhǎng)度)/ 分區(qū)數(shù), end=(分區(qū)id + 1 * 數(shù)據(jù)總長(zhǎng)度)/ 分區(qū)數(shù), 分區(qū)id從0開始。最后調(diào)用Array.slice方法將數(shù)據(jù)進(jìn)行切分。

分區(qū)數(shù)默認(rèn)為:conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)), 機(jī)器總核數(shù)和2的最大值。

  • makeRDD 實(shí)質(zhì)是調(diào)用parallelize(seq, numSlices)算子。不過(guò)其還有另一個(gè)方法,def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]), 可以指定數(shù)據(jù)的優(yōu)先位置。

eg:

val rdd = spark.sparkContext.makeRDD(Seq((1 to 10,Seq("host1", "host2")),
Seq(11 to 20,Seq("host3"))))
println(rdd.preferredLocations(rdd.partitions(0)))

2. 基于存儲(chǔ)的創(chuàng)建

textfile(path, minPartitions): RDD[String]

textfile函數(shù)是用來(lái)讀取hdfs文件系統(tǒng)上的文件,并返回String類型的數(shù)據(jù)。

其是基于HadoopRDD實(shí)現(xiàn)的。

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

hadoopRDD的返回值是key-value形式,key為分區(qū)id, 再經(jīng)過(guò)map操作,過(guò)濾為僅僅value數(shù)據(jù)值。

textFile在讀取hdfs上文件前,先從本地獲取hadoopConfiguration配置信息,并將其封裝為廣播變量,broadcast(new SerializableConfiguration(hadoopConfiguration))。

override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf()
  ...
  try {
    // 獲取輸入文件的切分
    val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
    val inputSplits = if (ignoreEmptySplits) {
      allInputSplits.filter(_.getLength > 0)
    } else {
      allInputSplits
    }
    // 分區(qū)數(shù)等于inputSplits數(shù)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }
    ...
}

分區(qū)數(shù)即為獲取輸入文件的切分?jǐn)?shù)。

而切分?jǐn)?shù)和幾個(gè)因素有關(guān):minPartitions, goalSize,blockSize

總結(jié)下HadoopRDD分區(qū)規(guī)則:

1.如果textFile指定分區(qū)數(shù)量為0或者1的話,defaultMinPartitions值為1,則有多少個(gè)文件,就會(huì)有多少個(gè)分區(qū)。

2.如果不指定默認(rèn)分區(qū)數(shù)量,則默認(rèn)分區(qū)數(shù)量為2,則會(huì)根據(jù)所有文件字節(jié)大小totalSize除以分區(qū)數(shù)量,得到的值goalSize,然后比較goalSize和hdfs指定分塊大?。ㄟ@里是128M)作比較,以較小的最為goalSize作為切分大小,對(duì)每個(gè)文件進(jìn)行切分,若文件大于大于goalSize,則會(huì)生成該(文件大小/goalSize)個(gè)分區(qū),如果文件內(nèi)的數(shù)據(jù)不能除盡則分區(qū)數(shù)會(huì)+1,則為(fileSize/goalSize)+1。

3.如果指定分區(qū)數(shù)量大于等于2,則默認(rèn)分區(qū)數(shù)量為指定值,生成實(shí)際分區(qū)數(shù)量規(guī)則任然同2中的規(guī)則一致。

總之:文件總大小除以分區(qū)數(shù),大于分塊大小,則與分塊大小相關(guān),否則以得到的商相關(guān)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容