Window Operation
Spark Streaming 也可以提供基于窗口的計(jì)算,這樣允許你操作一個(gè)滑動(dòng)窗口時(shí)間內(nèi)的數(shù)據(jù)。下圖展示了滑動(dòng)窗口

如圖所示,
每當(dāng)窗口在輸入數(shù)據(jù)流上滑動(dòng)一次,在這個(gè)窗口內(nèi)的源RDDs 就會(huì)被聚合和操作然后產(chǎn)生 基于窗口流的RDDs。在這個(gè)例子中,過(guò)去三個(gè)時(shí)間單元的數(shù)據(jù)會(huì)被操作一次,然后每次滑動(dòng)兩個(gè)時(shí)間單元。這就是說(shuō) 任何窗口操作都需要指定兩個(gè)參數(shù):
- 窗口長(zhǎng)度:窗口持續(xù)時(shí)間(圖中是值3)
- 滑動(dòng)間隔:每個(gè)窗口操作的時(shí)間間隔(圖中是值2)
這兩個(gè)參數(shù)必須是輸入源數(shù)據(jù)流間隔時(shí)間的倍數(shù),(圖中是值 1)
讓我們來(lái)用例子演示一下。比方說(shuō),你想要擴(kuò)展一下之前的例子,要求能夠每隔10s中,計(jì)算出過(guò)去30s的單詞的統(tǒng)計(jì)值。為了做到這一點(diǎn),我們需要在過(guò)去30s的鍵值對(duì)(word,1)的數(shù)據(jù)流(DataStream)留上使用 reduceByKey 這個(gè)操作.使用reduceByKeyAndWindow這個(gè)可以實(shí)現(xiàn)這個(gè)功能。
//Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a+b), Seconds(30), Seconds(10))
一些窗口的操作展示如下。所有的操作都需要?jiǎng)倓傉f(shuō)的兩個(gè)參數(shù)- 窗口長(zhǎng)度和滑動(dòng)間隔
| 轉(zhuǎn)化 | 簡(jiǎn)述 |
|---|---|
| window(windowLength,slideInterval) | 返回一個(gè)新的DStream ,它是基于窗口的源Dstream的batches 集合 |
| countByWindow(windowLength,slideInterval) | 返回?cái)?shù)據(jù)流的滑動(dòng)窗口中的元素的數(shù)量 |
| reduceByWindow(func,windowLength,slideInterval) | 在一個(gè)滑動(dòng)間隔內(nèi),使用函數(shù)func 聚合元素,產(chǎn)生新的氮元素的流。這個(gè)func必須是聯(lián)合交替的,才能正確的并行處理數(shù)據(jù)。 |
| reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) | 當(dāng)kv 鍵值對(duì)的數(shù)據(jù)流,返回一個(gè)新的kv鍵值對(duì)的新數(shù)據(jù)流,新數(shù)據(jù)流每個(gè)key通過(guò) 給定的reduce 函數(shù)func 在一個(gè)窗口內(nèi)進(jìn)行值得聚合。需要注意的: 這個(gè)使用spark默認(rèn)并行數(shù)量(local模式的話是2 ,cluster模式的話取決于 配置參數(shù) spark.default.parallelism)進(jìn)行分組。你可以傳入一個(gè)可選的參數(shù) numTasks 參數(shù)設(shè)置一個(gè)不同的task的數(shù)量 |
| reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval、[numTasks]) | 比上面的reduceByKeyAndWindow 更有效的一個(gè)版本,能夠在之前window的reduce 值 加上當(dāng)前窗口計(jì)算reduce的值 。這個(gè)實(shí)現(xiàn)是通過(guò)reducing 新進(jìn)入到窗口的數(shù)據(jù),反向reducing 離開(kāi)窗口的老數(shù)據(jù)。舉個(gè)例子,隨著窗口的滑動(dòng),對(duì)key的統(tǒng)計(jì)值進(jìn)行加減。然后這個(gè)只適用于可以逆轉(zhuǎn)的函數(shù)。也就是說(shuō),這些reduce的函數(shù),有一個(gè)相關(guān)的逆向的函數(shù)。注意: 這個(gè)操作必須設(shè)置 checkpointing。 |
| countByValueAndWindow (windowLength, slideInterval, [numTasks]) | 當(dāng)kv鍵值對(duì)的數(shù)據(jù)流被調(diào)用的時(shí)候,返回一個(gè)新的kv鍵值對(duì)的數(shù)據(jù)流。就像 reduceByKeyAndWindow ,reduce的task的數(shù)量是可以通過(guò)配置修改的。 |
Join操作
最后,你如何在Spark Streaming中輕松的使用不同類(lèi)型的join操作 是很值得強(qiáng)調(diào)的額。
Stream-stream joins
數(shù)據(jù)流可以輕松的和其他的流進(jìn)行join。
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)