spark streaming 窗口操作 和join 操作

Window Operation

Spark Streaming 也可以提供基于窗口的計(jì)算,這樣允許你操作一個(gè)滑動(dòng)窗口時(shí)間內(nèi)的數(shù)據(jù)。下圖展示了滑動(dòng)窗口


image.png

如圖所示,
每當(dāng)窗口在輸入數(shù)據(jù)流上滑動(dòng)一次,在這個(gè)窗口內(nèi)的源RDDs 就會(huì)被聚合和操作然后產(chǎn)生 基于窗口流的RDDs。在這個(gè)例子中,過(guò)去三個(gè)時(shí)間單元的數(shù)據(jù)會(huì)被操作一次,然后每次滑動(dòng)兩個(gè)時(shí)間單元。這就是說(shuō) 任何窗口操作都需要指定兩個(gè)參數(shù):

  • 窗口長(zhǎng)度:窗口持續(xù)時(shí)間(圖中是值3)
  • 滑動(dòng)間隔:每個(gè)窗口操作的時(shí)間間隔(圖中是值2)

這兩個(gè)參數(shù)必須是輸入源數(shù)據(jù)流間隔時(shí)間的倍數(shù),(圖中是值 1)
讓我們來(lái)用例子演示一下。比方說(shuō),你想要擴(kuò)展一下之前的例子,要求能夠每隔10s中,計(jì)算出過(guò)去30s的單詞的統(tǒng)計(jì)值。為了做到這一點(diǎn),我們需要在過(guò)去30s的鍵值對(duì)(word,1)的數(shù)據(jù)流(DataStream)留上使用 reduceByKey 這個(gè)操作.使用reduceByKeyAndWindow這個(gè)可以實(shí)現(xiàn)這個(gè)功能。

//Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a+b), Seconds(30), Seconds(10))

一些窗口的操作展示如下。所有的操作都需要?jiǎng)倓傉f(shuō)的兩個(gè)參數(shù)- 窗口長(zhǎng)度和滑動(dòng)間隔

轉(zhuǎn)化 簡(jiǎn)述
window(windowLength,slideInterval) 返回一個(gè)新的DStream ,它是基于窗口的源Dstream的batches 集合
countByWindow(windowLength,slideInterval) 返回?cái)?shù)據(jù)流的滑動(dòng)窗口中的元素的數(shù)量
reduceByWindow(func,windowLength,slideInterval) 在一個(gè)滑動(dòng)間隔內(nèi),使用函數(shù)func 聚合元素,產(chǎn)生新的氮元素的流。這個(gè)func必須是聯(lián)合交替的,才能正確的并行處理數(shù)據(jù)。
reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks]) 當(dāng)kv 鍵值對(duì)的數(shù)據(jù)流,返回一個(gè)新的kv鍵值對(duì)的新數(shù)據(jù)流,新數(shù)據(jù)流每個(gè)key通過(guò) 給定的reduce 函數(shù)func 在一個(gè)窗口內(nèi)進(jìn)行值得聚合。需要注意的: 這個(gè)使用spark默認(rèn)并行數(shù)量(local模式的話是2 ,cluster模式的話取決于 配置參數(shù) spark.default.parallelism)進(jìn)行分組。你可以傳入一個(gè)可選的參數(shù) numTasks 參數(shù)設(shè)置一個(gè)不同的task的數(shù)量
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval、[numTasks]) 比上面的reduceByKeyAndWindow 更有效的一個(gè)版本,能夠在之前window的reduce 值 加上當(dāng)前窗口計(jì)算reduce的值 。這個(gè)實(shí)現(xiàn)是通過(guò)reducing 新進(jìn)入到窗口的數(shù)據(jù),反向reducing 離開(kāi)窗口的老數(shù)據(jù)。舉個(gè)例子,隨著窗口的滑動(dòng),對(duì)key的統(tǒng)計(jì)值進(jìn)行加減。然后這個(gè)只適用于可以逆轉(zhuǎn)的函數(shù)。也就是說(shuō),這些reduce的函數(shù),有一個(gè)相關(guān)的逆向的函數(shù)。注意: 這個(gè)操作必須設(shè)置 checkpointing。
countByValueAndWindow (windowLength, slideInterval, [numTasks]) 當(dāng)kv鍵值對(duì)的數(shù)據(jù)流被調(diào)用的時(shí)候,返回一個(gè)新的kv鍵值對(duì)的數(shù)據(jù)流。就像 reduceByKeyAndWindow ,reduce的task的數(shù)量是可以通過(guò)配置修改的。

Join操作

最后,你如何在Spark Streaming中輕松的使用不同類(lèi)型的join操作 是很值得強(qiáng)調(diào)的額。

Stream-stream joins

數(shù)據(jù)流可以輕松的和其他的流進(jìn)行join。

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容