第二十章 流處理基礎(chǔ)
什么是流處理
流處理是連續(xù)處理新到來的數(shù)據(jù)以更新計算結(jié)果的行為。在流處理中,輸入數(shù)據(jù)是無邊界的,沒有預(yù)定的開始或結(jié)束。
流處理的挑戰(zhàn)
- 基于應(yīng)用程序時間戳處理無序數(shù)據(jù)
- 維持大量的狀態(tài)
- 支持高吞吐
- 即使有機器故障也僅需對事件進行一次處理
- 處理負(fù)載不平衡和拖延者
- 快速響應(yīng)事件
- 與其他存儲中的數(shù)據(jù)進行連接
- 確定新事件到達(dá)時如何更新輸出
流處理設(shè)計要點
記錄級別API和申明式API
- 流處理API最簡單的實現(xiàn)方法就是將每個事件傳遞給應(yīng)用程序,并使用自定義代碼進行響應(yīng)。提供這種記錄級別的API的流處理系統(tǒng)只是給用戶提供一個獲取每條流數(shù)據(jù)記錄的接口,這樣許多復(fù)雜狀態(tài)需要由應(yīng)用程序負(fù)責(zé)。
- 因此后續(xù)的流處理系統(tǒng)提供了聲明式API,應(yīng)用程序為了響應(yīng)每個新事件指定要計算的內(nèi)容,而不是如何計算,也不需要考慮如何從失敗中恢復(fù)。DStream API可以自動追蹤每個操作處理的數(shù)據(jù)量,可靠地保存相關(guān)狀態(tài),并在需要的時候從失敗中恢復(fù)計算。
連續(xù)處理與微批處理
-
連續(xù)處理模式中,每個節(jié)點都不斷偵聽來自其他節(jié)點的消息并將新的更新輸出到其子節(jié)點。map-reduce中,map的每個節(jié)點將從輸入源一個一個地讀取記錄,根據(jù)計算邏輯將它們發(fā)送到相應(yīng)的reducer,reducer獲取新記錄時,將更新狀態(tài)
連續(xù)處理 -
微批處理系統(tǒng)等待積累少量輸入數(shù)據(jù),然后使用分布式任務(wù)集合并行處理每個批次
微批處理
Spark的流處理API
DStream API
- 它完全基于Java/Python對象函數(shù),而不是DataFrame Dataset中的結(jié)構(gòu)化表概念
- 完全基于處理時間
- 僅支持微批處理
結(jié)構(gòu)化流處理
- Spark 2.2僅支持微批處理
- 提供結(jié)構(gòu)化處理流式數(shù)據(jù)的可能
第二十一章 結(jié)構(gòu)化流處理基礎(chǔ)
結(jié)構(gòu)化流處理概述
結(jié)構(gòu)化流處理背后的主要思想是將數(shù)據(jù)流視為連續(xù)追加數(shù)據(jù)的數(shù)據(jù)表
結(jié)構(gòu)化流即是以流處理方式處理的DataFrame

結(jié)構(gòu)化流處理的輸入
核心概念
轉(zhuǎn)換和動作
同樣適用這兩個概念,只是略微有一些限制
輸入源
- Kafka
- HDFS
- 用于測試的socket源
接收器
- 需要指定數(shù)據(jù)源來讀取數(shù)據(jù)流,接收器(sink)和執(zhí)行引擎還負(fù)責(zé)可靠地跟蹤號數(shù)據(jù)處理的進度
- 支持的接收器:kafka、文件、用于測試的控制臺接收器、用于調(diào)試的內(nèi)存接收器、用于在輸出記錄上運行任意計算的foreach接收器
輸出模式
- append (向輸出接收器中添加新記錄)
- update (更新有變化的記錄)
- complete (重寫所有的輸出)
觸發(fā)器
- 定義了數(shù)據(jù)何時被輸出
事件時間處理
- watermarks 允許你制定在事件時間內(nèi)查看數(shù)據(jù)的
- 支持事件時間的系統(tǒng)通常允許設(shè)置watermarks來限制它們記住舊數(shù)據(jù)的時長
結(jié)構(gòu)化流處理實例
val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema
val streaming = spark.readStream.schema(dataSchema)
.option("maxFilesPerTrigger", 1).json("/data/activity-data")
val activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val activityQuery = activityCounts.writeStream.queryName("activity_counts")
.format("memory").outputMode("complete")
.start()
// 啟動流式計算
activityQuery.awaitTermination()
// 查詢流數(shù)據(jù)
for( i <- 1 to 5 ) {
spark.sql("SELECT * FROM activity_counts").show()
Thread.sleep(1000)
}
數(shù)據(jù)輸入和輸出
// 讀取kafka
val ds3 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
// 寫入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream.format("kafka")
.option("checkpointLocation", "/to/HDFS-compatible/dir")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open a database connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
// 觸發(fā)器
import org.apache.spark.sql.streaming.Trigger
activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
.format("console").outputMode("complete").start()
第二十二章 事件時間和有狀態(tài)處理
有狀態(tài)處理
- 當(dāng)你執(zhí)行有狀態(tài)操作時,Spark會為你處理所有復(fù)雜的事情。例如,在實現(xiàn)分組操作時,結(jié)構(gòu)化流處理會為你維護并更新信息,你只需指定處理邏輯。在執(zhí)行有狀態(tài)操作時,Spark會將中間結(jié)果信息存儲在狀態(tài)存儲中。Spark當(dāng)前的狀態(tài)存儲實現(xiàn)是一個內(nèi)存狀態(tài)存儲,它通過將中間狀態(tài)存儲到檢查點目錄來實現(xiàn)容錯。
滾動窗口
- 窗口不會發(fā)生重疊,指定窗口的間隔
-
時間窗口實際上是一個結(jié)構(gòu)體
滾動窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
滑動窗口
-
窗口可以重疊
滑動窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
使用水位處理延遲數(shù)據(jù)
- 指定水位可以確定過期數(shù)據(jù)
- 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
.withWatermark("event_time", "5 hours")
.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
.count()
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("complete")
.start()
在流中刪除重復(fù)項
dropDuplicates
import org.apache.spark.sql.functions.expr
withEventTime
.withWatermark("event_time", "5 seconds")
.dropDuplicates("User", "event_time")
.groupBy("User")
.count()
.writeStream
.queryName("deduplicated")
.format("memory")
.outputMode("complete")
.start()
任意有狀態(tài)處理
- 可以根據(jù)給定鍵的計數(shù)創(chuàng)建窗口
- 如果特定時間范圍發(fā)生多個特定事件則報警
- 如果不確定時間內(nèi)維護用戶會話,保存這些會話一遍稍后進行分析
執(zhí)行這類處理時需要做以下兩件事
- 映射數(shù)據(jù)中的組,對每組數(shù)據(jù)進行操作,并為每個組生成至多一行(mapGroups WithState)
- 映射數(shù)據(jù)中的組,對每個組生成一行或多行(flatMapGroups WithState)
超時
可以通過GroupState.setTimeoutTimes tamp(...) API設(shè)置超時時間戳
輸出模式
- mapGroupWithState僅支持update更新
- flatMapGroupsWithState支持append追加輸出和update更新輸出
mapGroupsWithState
需要給出如下定義
- 三個類定義:輸入定義、狀態(tài)定義、可選的輸出定義
- 基于鍵、事件迭代器和先前狀態(tài)的一個更新狀態(tài)函數(shù)
- 超時時間函數(shù)
// 類定義
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
var activity:String,
var start:java.sql.Timestamp,
var end:java.sql.Timestamp)
// 事件迭代器、狀態(tài)更新函數(shù)
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
if (Option(input.timestamp).isEmpty) {
return state
}
if (state.activity == input.activity) {
if (input.timestamp.after(state.end)) {
state.end = input.timestamp
}
if (input.timestamp.before(state.start)) {
state.start = input.timestamp
}
} else {
if (input.timestamp.after(state.end)) {
state.start = input.timestamp
state.end = input.timestamp
state.activity = input.activity
}
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserState]):UserState = {
var state:UserState = if (oldState.exists) oldState.get else UserState(user,
"",
new java.sql.Timestamp(6284160000000L),
new java.sql.Timestamp(6284160L)
)
// we simply specify an old date that we can compare against and
// immediately update based on the values in our data
for (input <- inputs) {
state = updateUserStateWithEvent(state, input)
oldState.update(state)
}
state
}
// 啟動
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
.selectExpr("User as user",
"cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
.as[InputRow]
.groupByKey(_.user)
.mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
.writeStream
.queryName("events_per_window")
.format("memory")
.outputMode("update")
.start()
flatMapGroupsWithState
需要定義以下內(nèi)容
- 三個類定義:輸入定義、狀態(tài)定義、可選的輸出定義
- 一個函數(shù),輸入?yún)?shù)為一個鍵、一個多事件迭代器和先前狀態(tài)
- 超時時間函數(shù)
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
var xAvg:Double)
def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
// handle malformed dates
if (Option(input.timestamp).isEmpty) {
return state
}
state.timestamp = input.timestamp
state.values = state.values ++ Array(input.x)
if (!state.activities.contains(input.activity)) {
state.activities = state.activities ++ Array(input.activity)
}
state
}
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
GroupState}
def updateAcrossEvents(uid:String,
inputs: Iterator[InputRow],
oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {
inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
val state = if (oldState.exists) oldState.get else UserSession(
uid,
new java.sql.Timestamp(6284160000000L),
Array(),
Array())
val newState = updateWithEvent(state, input)
if (oldState.hasTimedOut) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else if (state.values.length > 1000) {
val state = oldState.get
oldState.remove()
Iterator(UserSessionOutput(uid,
state.activities,
newState.values.sum / newState.values.length.toDouble))
} else {
oldState.update(newState)
oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
Iterator()
}
}
}
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime.where("x is not null")
.selectExpr("user as uid",
"cast(Creation_Time/1000000000 as timestamp) as timestamp",
"x", "gt as activity")
.as[InputRow]
.withWatermark("timestamp", "5 seconds")
.groupByKey(_.uid)
.flatMapGroupsWithState(OutputMode.Append,
GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
.writeStream
.queryName("count_based_device")
.format("memory")
.start()



