SparkRDD
在Spark中RDD具有5個主要的屬性:
- A list of partitions
- A function(compute) to computing each split(partation)
- A list of dependencies on other RDDs
- Optionally, a partitioner for key-value RDDs
- Optionally,a list of preferred locations to compute each split on
其中,Partition 是數(shù)據(jù)集的基本單位,每個partition都會被一幾個計算任務(wù)處理,它決定了并行計算的粒度。RDD的分片數(shù)可以在創(chuàng)建時指定,也可以使用默認值spark.default.parallelism,在Local模式下默認值是local[n]中n,Mesos模式下是8,在其他模式下是max(coreNumber,2),coreNumber是Application分配到的CPU core數(shù)(虛擬core數(shù),不一定CPU物理core數(shù))。
Compute是計算每個Partition的的函數(shù),它會對迭代器進行復(fù)合,不需要保存每次計算的結(jié)果。
Dependencies是RDD之間的依賴關(guān)系,實際上它保存了所有父RDD的信息。RDD之間的這些依賴關(guān)系詮釋了RDD之間的血統(tǒng)(lineage),Spark在劃分stage的時候就是根據(jù)這些信息來劃分的。
Partitioner是RDD的分片函數(shù),目前Spark實現(xiàn)了兩種類型的分片函數(shù),基于Hash的HashPartitioner和基于范圍的RangePartitioner,只有Key-value的RDD才會有Partitioner,非key-value的RDD的Partitioner的值是None。Partitioner既決定了本身的分片數(shù)量,也決定了父RDD shuffle時輸出的分片數(shù)量。
Preferred location存儲每個Partition的優(yōu)先位置,對于HDFS文件這個列表保存的就是Partition所在的block的位置。按照“移動數(shù)據(jù)不如移動計算”的理念,Spark在進行任務(wù)調(diào)度時會盡可能的將計算任務(wù)分配到其所需處理數(shù)據(jù)塊的存儲位置。
RDD中有5個方法代表上述屬性
/**
* 輸入一個Partition并對其代表的數(shù)據(jù)進行計算
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
/**
* 數(shù)據(jù)如何split的邏輯
*/
protected def getPartitions: Array[Partition]
/**
* 這個RDD的依賴,及其父RDD
*/
protected def getDependencies: Seq[Dependency[_]] = deps
/**
* 存儲這個RDD的每個分片的Preferred Location
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
* key-value RDD分片
*/
@transient val partitioner: Option[Partitioner] = None
Compute
RDD抽象類要求其所有子類都必須實現(xiàn)compute方法,方法的參數(shù)是Partition和TaskContext,目的是計算該分區(qū)中的數(shù)據(jù)。
下面是BlockRDD的compute方法實現(xiàn),直觀的該方法就是從blockManager獲取這個block代表的具體數(shù)據(jù)。
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
assertValid()
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
blockManager.get[T](blockId) match {
case Some(block) => block.data.asInstanceOf[Iterator[T]]
case None =>
throw new Exception(s"Could not compute split, block $blockId of RDD $id not found")
}
}
MapPartitionsRDD中的compute實現(xiàn)如下:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
override def clearDependencies() {
super.clearDependencies()
prev = null
}
MapPartitionsRDD類的compute方法調(diào)用當(dāng)前RDD的第一個父RDD的iterator方法(iterator方法是拉取父RDD對應(yīng)分區(qū)的數(shù)據(jù)并返回一個Iterator對象,Iterator內(nèi)部存儲的每個元素即父RDD對應(yīng)分區(qū)的數(shù)據(jù)記錄),RDD會對每個分區(qū)(不是一條一條的數(shù)據(jù)記錄)內(nèi)的數(shù)據(jù)執(zhí)行操作f,最終返回包含所有經(jīng)過轉(zhuǎn)換過的數(shù)據(jù)記錄的新的迭代器,即新的Partition。
Partition
A partition(split) is a logical chunk of large distributed data set.
RDD代表的原始數(shù)據(jù)會被按照某種邏輯切分成N分,每份數(shù)據(jù)對應(yīng)RDD中的一個Partition,Partition的數(shù)量決定Task的數(shù)量,影響著程序的并行的。Spark通過Partition來管理數(shù)據(jù)可以方便的進行數(shù)據(jù)并行處理以及減少不同executors之間的網(wǎng)絡(luò)傳輸,通常spark會從距離較近的節(jié)點讀取數(shù)據(jù)。
可以使用def getPartitions: Array[Partition]來獲取某個RDD的Partitions or someRDD.partitions.size
Spark只能為RDD的每個分區(qū)運行1個并發(fā)任務(wù),最多可以為集群中的cores數(shù)。例如有一個包含50個core的集群,那么RDD至少有50個分區(qū)(可能是該分區(qū)數(shù)的2-3倍-100-150)。
另外,Partition數(shù)也影響著在保存RDD時需要創(chuàng)建多少個文件,每個Partition的大小受限于Executor的內(nèi)存大小。
Tips: 當(dāng)使用sc.texeFile讀取壓縮文件(file.txt.gz,demo.gz)時,Spark產(chǎn)生的RDD只有1個Partition,在這種情況下需要主動使用reparation進行分區(qū)。
rdd = sc.textFile("demo.gz")
nrdd = rdd.repartation(100)
Partition的定義
Spark源碼中對Partition的定義如下:
/**
* An identifier for a partition in an RDD.
*/
trait Partition extends Serializable {
/**
* 序列號 從0開始一次遞增
* Get the partition's index within its parent RDD
*/
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
Partition和RDD是伴生的,每種RDD都有其對應(yīng)的Partition實現(xiàn),分析RDD只要是分析其子類。如HadoopPartition的實現(xiàn)如下:
/**
* A Spark split class that wraps around a Hadoop InputSplit.
*/
private[spark] class HadoopPartition(rddId: Int, override val index: Int, s: InputSplit)
extends Partition {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = 31 * (31 + rddId) + index
override def equals(other: Any): Boolean = super.equals(other)
/**
* Get any environment variables that should be added to the users environment when running pipes
* @return a Map with the environment variables and corresponding values, it could be empty
*/
def getPipeEnvVars(): Map[String, String] = {
...
}
}
Partition對性能的影響
-
Partition數(shù)量太少
資源不能被充分利用
-
Partition數(shù)量太多
導(dǎo)致Task過多,序列化和傳輸時間開銷增大。
根據(jù)Spark doc給出的建議:Typically you want 2-4 partitions for each CPU in your cluster.
Partition調(diào)整
-
reparation
reparation是coalesce(numPartitions,shuffle=true),reparation不僅會調(diào)整Partition的數(shù)量,同時也會將Partitioner修改為HashPartitioner,產(chǎn)生shuffle操作。
/**
* 返回一個擁有numPartitons的 新RDD
*
* 可以增加或減小RDD的parallelism,相應(yīng)的會進行shuffle
*
* 如果是要減少Partition數(shù),使用coalesce能獲得更好的性能
*/
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
-
coalesce
coalesce可以控制是否進行shuffle,但是當(dāng)shuffle=false時只能減小Partition數(shù),不能增大。
/**
* 返回一個具有numPartitions的 新RDD
* 當(dāng)減小Partition數(shù)時不會 shuffle
* When increase numPartitions you should set shuffle = true
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](
mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
Dependency
Dependency記錄了RDD在transformation過程中Partition的演化,通過dependency的類型可以判斷如何處理數(shù)據(jù),即pipeline或者shuffle。
Spark中的Dependency分為Narrow Dependency和Wide Dependency。其中Narrow Dependency指的是父RDD的一個Partition最多對應(yīng)子RDD的一個Partition,在處理過程中不需要進行shuffle,可以在同一個Map Task中完成;Wide Dependency中父RDD的一個Partition可能對應(yīng)子RDD的多個Partition,因此處理時需要進行shuffle操作,Wide Dependency是Spark DAGScheduler劃分Stage的依據(jù)。
Dependency 的定義
屬性rdd對應(yīng)RDD的父RDD。Dependency可以說是對父RDD的包裝,通過Dependency的類型說明當(dāng)前transformation對應(yīng)的數(shù)據(jù)處理方式。
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
Narrow Dependency
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* 根據(jù)RDD的Partition ID返回對應(yīng)的父RDD的Partition ID
* @param partitionId 子RDD的Partition ID
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
- OneToOneDependency
OneToOneDependency表示子RDD和父RDD的Partition之間是1對1的關(guān)系,即子RDD的PartitionId和父RDD的PartitionId是一致性的。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
- RangeDependency
RangeDependency表示子RDD和父RDD的Partition之間的關(guān)系是一個區(qū)間內(nèi)的1對1關(guān)系。
/**
* :: DeveloperApi ::
* 如子RDD Partition index 3 4 5 6
* 父RDD Partition index 9 10 11 12
* 那么求子RDD的Partition中index為4對應(yīng)的父RDD的Partition index就是
* 4 - 3 + 9 = 10
* 對應(yīng)代碼中的partitionId - outStart + inStart
*
* @param rdd 父RDD
* @param inStart 父RDD Partition Range的起始位置
* @param outStart 子RDD Partition Range的起始位置
* @param length range的長度
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
ShuffleDependency
ShuffleDependecy的實現(xiàn)要相對復(fù)雜一些,shuffle過程需要涉及到網(wǎng)絡(luò)傳輸,所有需要有Serializer以減少網(wǎng)絡(luò)傳輸,可以增加map端聚合,通過mapSideCombine和aggregator控制,還有和key排序相關(guān)的keyOrdering,以及重輸出的數(shù)據(jù)如何分區(qū)的Partitioner,其他信息包括k,v和combiner的class信息以及shuffleId。Partition之間的關(guān)系在shuffle處戛然而止,因此shuffle是劃分state的依據(jù)。
/**
* :: DeveloperApi ::
* 代表shuffle階段的Dependency。在shuffle階段,RDD是瞬態(tài)的,因為executor段不需要它。
*
* @param _rdd 父 RDD
* @param partitioner shuffle output的分區(qū)方式
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. 可以通過spark.serializer設(shè)置
* @param keyOrdering shuffle結(jié)果key如何排序
* @param aggregator map/reduce-side aggregator for RDD's shuffle
* @param mapSideCombine map端是否進行聚合
*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Serializer = SparkEnv.get.serializer,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
Partitioner
Partitioner在shuffle階段起作用,在map階段處理結(jié)果時需要根據(jù)RDD的Partitioner將結(jié)果寫到不同的bucket中,不同的bucket后續(xù)被不同的reducer使用。
Partitioner的定義
抽象類Partitioner具有兩個方法,numPartitions返回這個RDD具有多少個Partition,而getPartition則根據(jù)element的key返回其應(yīng)該寫入的Partition ID。
/**
* 決定key-value RDD的element如何根據(jù)key進行分區(qū)
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*
* Partitioner必須是確定的,partition id 和 對應(yīng)的Partition key 必須返回相同的結(jié)果
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
Partitioner的伴生對象中定義了defaultPartitioner方法,實現(xiàn)類似于cogroup這類操作中如何從父RDD中選擇Partitioner。
默認Partitioner的選取策略是先判斷RDD中是否有Partitioner,如果有則選擇其中Partition數(shù)最大的并判斷這個分區(qū)數(shù)和在上游分區(qū)的最大數(shù)量是否在單個數(shù)量級內(nèi)(大于或小于),或者最大的分區(qū)數(shù)大于默認的分區(qū)數(shù)據(jù)數(shù)(default partitions number),那么選擇這個Partitioner,否則需要使用具有default partitions number的HashPartitioner。
Object Partitioner{
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
// If the existing max partitioner is an eligible one, or its partitions number is larger
// than the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
}
/**
* Returns true if the number of partitions of the RDD is either greater than or is less than and
* within a single order of magnitude of the max number of upstream partitions, otherwise returns
* false.
*/
private def isEligiblePartitioner(
hasMaxPartitioner: RDD[_],
rdds: Seq[RDD[_]]): Boolean = {
val maxPartitions = rdds.map(_.partitions.length).max
log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
}
}
}
Partitioner在Spark中有兩個具體的實現(xiàn)HashPartitioner和RangePartitioner。
HashPartitioner
numPartitions返回傳入的分區(qū)數(shù),getPartition方法使用傳入的key的hashCode對numPartitions取模得到Partition ID。
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
RangePartitioner
RangePartitioner的運行機制可以概述為如何選取分區(qū)的分隔符
Spark根據(jù)分隔符確定key屬于哪個分區(qū),分區(qū)內(nèi)(key)數(shù)據(jù)無序,分區(qū)間有序
- 使用reservoir Sample抽樣方法,對每個Partition進行抽樣
- 計算權(quán)重, 對數(shù)據(jù)多(大于sampleSizePerPartition)的分區(qū)再進行抽樣
- 由權(quán)重信息計算分區(qū)分隔符rangeBounds
- 由rangeBounds計算分區(qū)數(shù)和key屬于哪個分區(qū)
/**
* [[org.apache.spark.Partitioner]]按范圍將可排序記錄劃分為大致相等的范圍。 范圍通過對傳入的RDD的內(nèi)容進行采樣來確定。
* 在采樣記錄數(shù)小于`partitions`的情況下,RangePartitioner創(chuàng)建的實際分區(qū)數(shù)可能與`partitions`參數(shù)不同。
* @param partitions 分區(qū)數(shù)
* @param rdd RDD
* @param ascending 升序或者降序
* @param samplePointsPerPartitionHint 每個分區(qū)的采樣數(shù)
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
extends Partitioner {
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
}
// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")
private var ordering = implicitly[Ordering[K]]
RangePartitioner的numPartitions和getPartition方法實現(xiàn)如下,其中g(shù)etPartition使用線性查找或者二分查找確定key所在的分區(qū);rangeBounds保存的是每個分區(qū)的上界,即分隔符。
//返回分區(qū)數(shù),partitions和rangeBounds.length+1可能不相等
def numPartitions: Int = rangeBounds.length + 1
//二分查找策略
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
//返回key對應(yīng)的分區(qū)索引
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length <= 128) {
// 如果分區(qū)數(shù)小于128,則直接使用線性查找,二則使用二分查找
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
}
} else {
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
// 根據(jù)升序或者降序返回Partition Id
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}
分區(qū)間的間隔符通過下面的方法得到:
// An array of upper bounds for the first (partitions - 1) partitions
private var rangeBounds: Array[K] = {
if (partitions <= 1) {
Array.empty
} else {
// 確定采樣的數(shù)量,上限為1M
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// 假設(shè)分區(qū)大致平衡并稍微過采樣,保證分區(qū)數(shù)少時也能收集更多樣本
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
//對每個Partition進行抽樣
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
} else {
// 如果分區(qū)包含的內(nèi)容遠遠超過平均樣本數(shù)(單個分區(qū)記錄數(shù)*fraction > sampleSizePartition),我們將從中重新進行抽樣
// 以確保從該分區(qū)收集足夠的樣本。
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
val candidates = ArrayBuffer.empty[(K, Float)]
// 使用imbalancedPartitions存儲,后續(xù)再進行采樣,確保數(shù)據(jù)多的分區(qū)抽取足夠多的樣本
val imbalancedPartitions = mutable.Set.empty[Int]
sketched.foreach { case (idx, n, sample) =>
if (fraction * n > sampleSizePerPartition) {
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
}
}
if (imbalancedPartitions.nonEmpty) {
// 使用所需的采樣概率重新采樣不平衡分區(qū),同時調(diào)整權(quán)重
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
// 調(diào)整權(quán)重
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x => (x, weight))
}
//根據(jù)(key,weight)選取邊界
RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
}
}
}
抽樣方法sketch根據(jù)rdd的id獲得抽樣的seed(用于reservoir sampling中產(chǎn)生隨機數(shù)),然后調(diào)用SamplingUtils.reservirSampleAndCount方法進行采樣。
private[spark] object RangePartitioner {
/**
* 核心抽樣算法:reservoir sampling
* Sketches the input RDD via reservoir sampling on each partition.
*
* @param rdd the input RDD to sketch
* @param sampleSizePerPartition max sample size per partition
* @return (total number of items, an array of (partitionId, number of items, sample))
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K]
// to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
reservoir sampling
- 先獲取大小為k的樣本數(shù)到reservoir數(shù)組中,若果記錄數(shù)小于k則直接返回樣本。
- 否則使用seed生成一個隨機數(shù)生成器,繼續(xù)遍歷數(shù)據(jù),同時每個record對應(yīng)生成一個隨機數(shù),如果隨機數(shù)小于k,則把1步中的樣本數(shù)據(jù)下標對應(yīng)的數(shù)據(jù)替換。
/**
* Reservoir sampling implementation that also returns the input size.
*
* @param input input size
* @param k reservoir size 分區(qū)樣本數(shù)
* @param seed random seed 隨機種子
* @return (samples, input size)
*/
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// 把前k個records放到reservoir數(shù)組中,k為每個分區(qū)的采樣數(shù)
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// I如果分區(qū)records數(shù)小于k,直接返回,否則進行隨機替換
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
l += 1
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
}
(reservoir, l)
}
}
determinBounds返回分區(qū)間隔符,實現(xiàn)邏輯如下:
先將candidate(Array[(key, weight)])按照key進行排序
然后計算出總權(quán)重sumWeights;
總權(quán)重除以分區(qū)數(shù),得到每個分區(qū)的平均權(quán)重step;
while循環(huán)遍歷已排序的candidate,累加其權(quán)重cumWeight,每當(dāng)累加的權(quán)重達到一個分區(qū)的平均權(quán)重step,就獲取一個key作為分區(qū)間隔符;
最后返回所有獲取到的分隔符,determineBounds執(zhí)行完畢,也就返回了變量rangeBounds(分區(qū)分隔符)。
/**
* 計算分區(qū)分隔符
* Determines the bounds for range partitioning from candidates with weights indicating how many
* items each represents. Usually this is 1 over the probability used to sample this candidate.
*
* @param candidates unordered candidates with weights
* @param partitions number of partitions
* @return selected bounds
*/
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 將candidate按照key進行排序
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
// 計算總權(quán)重
val sumWeights = ordered.map(_._2.toDouble).sum
// 每個分區(qū)的平均權(quán)重
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0
var previousBound = Option.empty[K]
// 遍歷已排序的candidate,累加其權(quán)重cumWeight,每當(dāng)權(quán)重達到一個分區(qū)的
// 平均權(quán)重step,就獲取一個key作為分區(qū)的間隔符,最后返回所有獲取到的分隔符
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}
自定義Partitioner
Partitioner主要的作用實在shuffle過程中對數(shù)據(jù)的Partition進行重新分區(qū),其主要實現(xiàn)的函數(shù)是:
- 獲得重新分區(qū)的分區(qū)個數(shù)
- 針對某個k-v對根據(jù)其中的key,將它按照特定的方法進行分區(qū)
class MyPartitioner(partitions: Int) extends Partitioner{
def numPartitions: Int = partitions
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[String]
return k.length() % partitions
}
}
具體調(diào)用如下:
val data = sc.textFile("demo.txt")
val dataAry = data.flatMap(_.split(",")).map((_,1))
.partitionBy(new MyPartitioner(10)).reduceByKey(_ + _)
.collect()
PreferredLocations
因為Spark的每個Partition運算都是由一個Task進行的,那么Partition的PreferredLocation會成為Task的PreferredLocation,這是data locality的任務(wù)調(diào)度,遵循移動計算比移動數(shù)據(jù)更加高效的原則。
下面是HadoopRDD的getPreferredLocaltions的具體實現(xiàn),可以看到HadoopRDD的PreferredLocaltions就是本地(localhost)。
override def getPreferredLocations(split: Partition): Seq[String] = {
val hsplit = split.asInstanceOf[HadoopPartition].inputSplit.value
val locs = hsplit match {
case lsplit: InputSplitWithLocationInfo =>
HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
case _ => None
}
locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}
Reference:
[1] Spark學(xué)習(xí)之路 (三)Spark之RDD
[3] <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-partitions.html>
[4] <https://blog.csdn.net/u011564172/article/details/54380574>
[5] [Spark RDD之Partitioner(https://blog.csdn.net/u011564172/article/details/54667057)