25-SparkStreaming01

Spark Streaming

基于Spark之上的流處理

流:source ==> compute ==> store

離線是特殊的流

letting you write streaming jobs

the same way you write batch jobs

out of the box? 開(kāi)箱即用 OOTB

編程模型:DStream : represents a continuous stream of data

Core:RDD

SQL:? DF/DS

Streaming入口:StreamingContext

Core:SparkContext

SQL:

SparkSession

SQLContext/HiveContext

import org.apache.spark._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()? ? ? ? ? ?

ssc.awaitTermination()?

Streaming job running receiver 0? *******

18/09/07 22:42:41 WARN StreamingContext:

spark.master should be set as local[n], n > 1

in local mode

if you have receivers to get data,

otherwise Spark jobs will not get resources

to process the received data.

socket: 有receiver 占用一個(gè)core

對(duì)DStream做一個(gè)操作,其實(shí)就是對(duì)這個(gè)DStream底層的所有RDD都做相同的操作

import org.apache.spark._

import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(10))

val lines = ssc.textFileStream("/streaming/input/")

val words = lines.flatMap(_.split("\t"))

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()

Exception in thread "main" java.lang.IllegalArgumentException:

requirement failed:

The checkpoint directory has not been set.

Please set it by StreamingContext.checkpoint().

def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {

val curr = currentValues.sum

val pre = preValues.getOrElse(0)

Some(curr + pre)

}

val ssc = new StreamingContext(sc, Seconds(10))

ssc.checkpoint("/streaming/checkpoint/")

val lines = ssc.socketTextStream("hadoop000",8888)

val words = lines.flatMap(_.split(" "))

val pairs = words.map(word => (word, 1))

val result = pairs.updateStateByKey(updateFunction)

result.print()

ssc.start()

ssc.awaitTermination()

./spark-submit \

--master local[2] \

--name StreamingStateApp \

--class com.ruozedata.spark.streaming.day01.StreamingStateApp \

/home/hadoop/lib/g3-spark-1.0.jar

(love,3)

(juren,2)

(you,3)

(ruoze,2)

(say,2)

(i,3)

(zidong,4)

? juren say

? juren say

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容