Spark創(chuàng)建方式可以通過(guò)集合進(jìn)行創(chuàng)建,或者通過(guò)HDFS等存儲(chǔ)文件創(chuàng)建,還可以基于其他算子進(jìn)行轉(zhuǎn)換操作。
1. 基于集合的創(chuàng)建
parallelize(seq, numSlices)
- 使用方式
通過(guò)parallelize創(chuàng)建RDD, 可以將driver端的集合創(chuàng)建為RDD。通過(guò)傳入Array或Seq,并設(shè)置其分區(qū)值,創(chuàng)建ParallelCollectionRDD。
val rdd = spark.sparkContext.parallelize(Array(("a", 1), ("b", 2), ("c", 3)), 2)
- 源碼解析
override def getPartitions: Array[Partition] = {
// RDD調(diào)用slice方法
val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
parallelize實(shí)質(zhì)是使用ParallelCollectionRDD.slice將數(shù)組中的數(shù)據(jù)進(jìn)行切分,并分配到各個(gè)分區(qū)中。
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
拆分的規(guī)則,如上所示,將start =(分區(qū)id * 數(shù)據(jù)總長(zhǎng)度)/ 分區(qū)數(shù), end=(分區(qū)id + 1 * 數(shù)據(jù)總長(zhǎng)度)/ 分區(qū)數(shù), 分區(qū)id從0開始。最后調(diào)用Array.slice方法將數(shù)據(jù)進(jìn)行切分。
分區(qū)數(shù)默認(rèn)為:conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)), 機(jī)器總核數(shù)和2的最大值。
- makeRDD 實(shí)質(zhì)是調(diào)用parallelize(seq, numSlices)算子。不過(guò)其還有另一個(gè)方法,def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]), 可以指定數(shù)據(jù)的優(yōu)先位置。
eg:
val rdd = spark.sparkContext.makeRDD(Seq((1 to 10,Seq("host1", "host2")),
Seq(11 to 20,Seq("host3"))))
println(rdd.preferredLocations(rdd.partitions(0)))
2. 基于存儲(chǔ)的創(chuàng)建
textfile(path, minPartitions): RDD[String]
textfile函數(shù)是用來(lái)讀取hdfs文件系統(tǒng)上的文件,并返回String類型的數(shù)據(jù)。
其是基于HadoopRDD實(shí)現(xiàn)的。
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
hadoopRDD的返回值是key-value形式,key為分區(qū)id, 再經(jīng)過(guò)map操作,過(guò)濾為僅僅value數(shù)據(jù)值。
textFile在讀取hdfs上文件前,先從本地獲取hadoopConfiguration配置信息,并將其封裝為廣播變量,broadcast(new SerializableConfiguration(hadoopConfiguration))。
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
...
try {
// 獲取輸入文件的切分
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
val inputSplits = if (ignoreEmptySplits) {
allInputSplits.filter(_.getLength > 0)
} else {
allInputSplits
}
// 分區(qū)數(shù)等于inputSplits數(shù)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
...
}
分區(qū)數(shù)即為獲取輸入文件的切分?jǐn)?shù)。
而切分?jǐn)?shù)和幾個(gè)因素有關(guān):minPartitions, goalSize,blockSize
總結(jié)下HadoopRDD分區(qū)規(guī)則:
1.如果textFile指定分區(qū)數(shù)量為0或者1的話,defaultMinPartitions值為1,則有多少個(gè)文件,就會(huì)有多少個(gè)分區(qū)。
2.如果不指定默認(rèn)分區(qū)數(shù)量,則默認(rèn)分區(qū)數(shù)量為2,則會(huì)根據(jù)所有文件字節(jié)大小totalSize除以分區(qū)數(shù)量,得到的值goalSize,然后比較goalSize和hdfs指定分塊大?。ㄟ@里是128M)作比較,以較小的最為goalSize作為切分大小,對(duì)每個(gè)文件進(jìn)行切分,若文件大于大于goalSize,則會(huì)生成該(文件大小/goalSize)個(gè)分區(qū),如果文件內(nèi)的數(shù)據(jù)不能除盡則分區(qū)數(shù)會(huì)+1,則為(fileSize/goalSize)+1。
3.如果指定分區(qū)數(shù)量大于等于2,則默認(rèn)分區(qū)數(shù)量為指定值,生成實(shí)際分區(qū)數(shù)量規(guī)則任然同2中的規(guī)則一致。
總之:文件總大小除以分區(qū)數(shù),大于分塊大小,則與分塊大小相關(guān),否則以得到的商相關(guān)。