一、updateStateByKey
官方原話:
In every batch, Spark will apply the state update function for all existing keys,
regardless of whether they have new data in a batch or not.
If the update function returns None then the key-value pair will be eliminated.
也即是說它會統(tǒng)計(jì)全局的key的狀態(tài),就算沒有數(shù)據(jù)輸入,它也會在每一個批次的時候返回之前的key的狀態(tài)
特點(diǎn):
- 大數(shù)據(jù)量的時候不適合,尤其是key維度比較高,value狀態(tài)有比較大的時候??梢杂胷edis或者alluxio。redis適合要維護(hù)key超時刪除的機(jī)制的時候使用,alluxio適合超大吞吐量。當(dāng)然也可以用hbase。
- key超時刪除。用updatefunc返回None值。updatefanc不管是否有已存在狀態(tài)的key的新數(shù)據(jù)到來,都會被已存在狀態(tài)的key調(diào)用。當(dāng)然,新增的key也會調(diào)用。
- 初始狀態(tài)。
updateStateByKey有個重載的方法可以傳入initialRDD: RDD[(K, S)]
二、mapWithState
mapWithState:也是用于全局統(tǒng)計(jì)key的狀態(tài),但是它如果沒有數(shù)據(jù)輸入,便不會返回之前的key的狀態(tài),有一點(diǎn)增量的感覺。效率更高,生產(chǎn)中建議使用
object StatefulNetworkWordCount {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
ssc.checkpoint(".")
// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateDstream = wordDstream.mapWithState(
StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
特點(diǎn):
- 若要清除某個key的狀態(tài),可在自定義的方法中調(diào)用state.remove()
- 若要設(shè)置狀態(tài)超時時間,可以調(diào)用StateSpec.function(mappingFunc).timeout()方法設(shè)置
- 若要添加初始化的狀態(tài),可以調(diào)用StateSpec.function(mappingFunc).initialState(initialRDD)方法
- 性能比updateStateByKey好
三、源碼分析
updateStateByKey:
- 跟進(jìn) updateStateByKey . 其中updateFunc就是要傳入的參數(shù),他是一個函數(shù),Seq[V]表示當(dāng)前key對應(yīng)的所有值,Option[S] 是當(dāng)前key的歷史狀態(tài),返回的是新的狀態(tài)。
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = ssc.withScope {
updateStateByKey(updateFunc, defaultPartitioner())
}
最終調(diào)用:
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {
val cleanedFunc = ssc.sc.clean(updateFunc)
val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {
cleanedFunc(it)
}
new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
}
再跟進(jìn)去 new StateDStream:
在這里面new出了一個StateDStream對象。在其compute方法中,會先獲取上一個batch計(jì)算出的RDD(包含了至程序開始到上一個batch單詞的累計(jì)計(jì)數(shù)),然后在獲取本次batch中StateDStream的父類計(jì)算出的RDD(本次batch的單詞計(jì)數(shù))分別是prevStateRDD和parentRDD,然后在調(diào)用 computeUsingPreviousRDD 方法:
private [this] def computeUsingPreviousRDD(
batchTime: Time,
parentRDD: RDD[(K, V)],
prevStateRDD: RDD[(K, S)]) = {
// Define the function for the mapPartition operation on cogrouped RDD;
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map { t =>
val itr = t._2._2.iterator
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
}
updateFuncLocal(batchTime, i)
}
val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner)
val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning)
Some(stateRDD)
}
在這里兩個RDD進(jìn)行cogroup然后應(yīng)用updateStateByKey傳入的函數(shù)。我們知道cogroup的性能是比較低下
mapWithState:
@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](
spec: StateSpec[K, V, StateType, MappedType]
): MapWithStateDStream[K, V, StateType, MappedType] = {
new MapWithStateDStreamImpl[K, V, StateType, MappedType](
self,
spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
)
}
說明:StateSpec 封裝了狀態(tài)管理函數(shù),并在該方法中創(chuàng)建了MapWithStateDStreamImpl對象。
MapWithStateDStreamImpl 中創(chuàng)建了一個InternalMapWithStateDStream類型對象internalStream,在MapWithStateDStreamImpl的compute方法中調(diào)用了internalStream的getOrCompute方法。
MapWithStateDStreamImpl 中創(chuàng)建了一個InternalMapWithStateDStream類型對象internalStream,在MapWithStateDStreamImpl的compute方法中調(diào)用了internalStream的getOrCompute方法。
private[streaming] class MapWithStateDStreamImpl[
KeyType: ClassTag, ValueType: ClassTag, StateType: ClassTag, MappedType: ClassTag](
dataStream: DStream[(KeyType, ValueType)],
spec: StateSpecImpl[KeyType, ValueType, StateType, MappedType])
extends MapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream.context) {
private val internalStream =
new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
override def slideDuration: Duration = internalStream.slideDuration
override def dependencies: List[DStream[_]] = List(internalStream)
override def compute(validTime: Time): Option[RDD[MappedType]] = {
internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}
InternalMapWithStateDStream中沒有g(shù)etOrCompute方法,這里調(diào)用的是其父類 DStream 的getOrCpmpute方法,該方法中最終會調(diào)用InternalMapWithStateDStream的Compute方法:
/** Method that generates an RDD for the given time */
override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {
// Get the previous state or create a new empty state RDD
val prevStateRDD = getOrCompute(validTime - slideDuration) match {
case Some(rdd) =>
if (rdd.partitioner != Some(partitioner)) {
// If the RDD is not partitioned the right way, let us repartition it using the
// partition index as the key. This is to ensure that state RDD is always partitioned
// before creating another state RDD using it
MapWithStateRDD.createFromRDD[K, V, S, E](
rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)
} else {
rdd
}
case None =>
MapWithStateRDD.createFromPairRDD[K, V, S, E](
spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),
partitioner,
validTime
)
}
// Compute the new state RDD with previous state RDD and partitioned data RDD
// Even if there is no data RDD, use an empty one to create a new state RDD
val dataRDD = parent.getOrCompute(validTime).getOrElse {
context.sparkContext.emptyRDD[(K, V)]
}
val partitionedDataRDD = dataRDD.partitionBy(partitioner)
val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>
(validTime - interval).milliseconds
}
Some(new MapWithStateRDD(
prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}
根據(jù)給定的時間生成一個MapWithStateRDD,首先獲取了先前狀態(tài)的RDD:preStateRDD和當(dāng)前時間的RDD:dataRDD,然后對dataRDD基于先前狀態(tài)RDD的分區(qū)器進(jìn)行重新分區(qū)獲取partitionedDataRDD。并且保證這兩個RDD使用的是同樣的partitioner。MapWithStateRDD中的數(shù)據(jù)都是MapWithStateRDDRecord對象,每個分區(qū)對應(yīng)一個對象來保存狀態(tài)(這就是為什么兩個RDD需要用同一個Partitioner)。最后將preStateRDD,partitionedDataRDD和用戶定義的函數(shù)mappingFunction傳給新生成的MapWithStateRDD對象返回。
跟進(jìn)MapWithStateRDD的compute方法:
override def compute(
partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {
val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]
val prevStateRDDIterator = prevStateRDD.iterator(
stateRDDPartition.previousSessionRDDPartition, context)
val dataIterator = partitionedDataRDD.iterator(
stateRDDPartition.partitionedDataRDDPartition, context)
val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None
val newRecord = MapWithStateRDDRecord.updateRecordWithData(
prevRecord,
dataIterator,
mappingFunction,
batchTime,
timeoutThresholdTime,
removeTimedoutData = doFullScan // remove timed-out data only when full scan is enabled
)
Iterator(newRecord)
}
首先獲取了先前狀態(tài)的RDD:preStateRDD和當(dāng)前時間的RDD:dataRDD的迭代器,接著獲取了prevStateRDD的一條數(shù)據(jù),這個分區(qū)也只有一條MapWithStateRDDRecord類型的數(shù)據(jù),維護(hù)了對應(yīng)分區(qū)所有數(shù)據(jù)狀態(tài)。
然后調(diào)用MapWithStateRDDRecord的updateRecordWithData方法,用當(dāng)前時間的RDD數(shù)據(jù)更新之前的狀態(tài)RDD數(shù)據(jù).
private[streaming] object MapWithStateRDDRecord {
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](
prevRecord: Option[MapWithStateRDDRecord[K, S, E]],
dataIterator: Iterator[(K, V)],
mappingFunction: (Time, K, Option[V], State[S]) => Option[E],
batchTime: Time,
timeoutThresholdTime: Option[Long],
removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {
// Create a new state map by cloning the previous one (if it exists) or by creating an empty one
val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }
val mappedData = new ArrayBuffer[E]
val wrappedState = new StateImpl[S]()
// Call the mapping function on each record in the data iterator, and accordingly
// update the states touched, and collect the data returned by the mapping function
dataIterator.foreach { case (key, value) =>
wrappedState.wrap(newStateMap.get(key))
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
} else if (wrappedState.isUpdated
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
}
// Get the timed out state records, call the mapping function on each and collect the
// data returned
if (removeTimedoutData && timeoutThresholdTime.isDefined) {
newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>
wrappedState.wrapTimingOutState(state)
val returned = mappingFunction(batchTime, key, None, wrappedState)
mappedData ++= returned
newStateMap.remove(key)
}
}
MapWithStateRDDRecord(newStateMap, mappedData)
}
}
先copy了原來的狀態(tài),接著定義了兩個變量,mappedData是最終要返回的結(jié)果,wrappedState可以看成是對state的包裝,添加了一些額外的方法。
接著遍歷當(dāng)前批次的數(shù)據(jù),從狀態(tài)中取出key對應(yīng)的原來的state,并根據(jù)自定義的函數(shù)來對state進(jìn)行更新,這里涉及到state的remove&update&timeout來對newStateMap進(jìn)行更新操作,并將有更新的狀態(tài)加入到了mappedData中。
若有設(shè)置超時時間,則還會對超時了的key進(jìn)行移除,也會加入到mappedData中,最終通過新的狀態(tài)對象newStateMap和需返回的mappedData數(shù)組構(gòu)建了MapWithStateRDDRecord對象來返回。
四、總結(jié)
updateStateByKey底層是將preSateRDD和parentRDD進(jìn)行co-group,然后對所有數(shù)據(jù)都將經(jīng)過自定義的mapFun函數(shù)進(jìn)行一次計(jì)算,即使當(dāng)前batch只有一條數(shù)據(jù)也會進(jìn)行這么復(fù)雜的計(jì)算,大大的降低了性能,并且計(jì)算時間會隨著維護(hù)的狀態(tài)的增加而增加。
mapWithstate底層是創(chuàng)建了一個MapWithStateRDD,存的數(shù)據(jù)是MapWithStateRDDRecord對象,一個Partition對應(yīng)一個MapWithStateRDDRecord對象,該對象記錄了對應(yīng)Partition所有的狀態(tài),每次只會對當(dāng)前batch有的數(shù)據(jù)進(jìn)行跟新,而不會像updateStateByKey一樣對所有數(shù)據(jù)計(jì)算。