Spark權(quán)威指南讀書筆記(五):流處理

第二十章 流處理基礎(chǔ)

什么是流處理

流處理是連續(xù)處理新到來的數(shù)據(jù)以更新計算結(jié)果的行為。在流處理中,輸入數(shù)據(jù)是無邊界的,沒有預(yù)定的開始或結(jié)束。

流處理的挑戰(zhàn)
  • 基于應(yīng)用程序時間戳處理無序數(shù)據(jù)
  • 維持大量的狀態(tài)
  • 支持高吞吐
  • 即使有機器故障也僅需對事件進行一次處理
  • 處理負(fù)載不平衡和拖延者
  • 快速響應(yīng)事件
  • 與其他存儲中的數(shù)據(jù)進行連接
  • 確定新事件到達(dá)時如何更新輸出

流處理設(shè)計要點

記錄級別API和申明式API
  • 流處理API最簡單的實現(xiàn)方法就是將每個事件傳遞給應(yīng)用程序,并使用自定義代碼進行響應(yīng)。提供這種記錄級別的API的流處理系統(tǒng)只是給用戶提供一個獲取每條流數(shù)據(jù)記錄的接口,這樣許多復(fù)雜狀態(tài)需要由應(yīng)用程序負(fù)責(zé)。
  • 因此后續(xù)的流處理系統(tǒng)提供了聲明式API,應(yīng)用程序為了響應(yīng)每個新事件指定要計算的內(nèi)容,而不是如何計算,也不需要考慮如何從失敗中恢復(fù)。DStream API可以自動追蹤每個操作處理的數(shù)據(jù)量,可靠地保存相關(guān)狀態(tài),并在需要的時候從失敗中恢復(fù)計算。
連續(xù)處理與微批處理
  • 連續(xù)處理模式中,每個節(jié)點都不斷偵聽來自其他節(jié)點的消息并將新的更新輸出到其子節(jié)點。map-reduce中,map的每個節(jié)點將從輸入源一個一個地讀取記錄,根據(jù)計算邏輯將它們發(fā)送到相應(yīng)的reducer,reducer獲取新記錄時,將更新狀態(tài)


    連續(xù)處理
  • 微批處理系統(tǒng)等待積累少量輸入數(shù)據(jù),然后使用分布式任務(wù)集合并行處理每個批次


    微批處理

Spark的流處理API

DStream API
  • 它完全基于Java/Python對象函數(shù),而不是DataFrame Dataset中的結(jié)構(gòu)化表概念
  • 完全基于處理時間
  • 僅支持微批處理
結(jié)構(gòu)化流處理
  • Spark 2.2僅支持微批處理
  • 提供結(jié)構(gòu)化處理流式數(shù)據(jù)的可能

第二十一章 結(jié)構(gòu)化流處理基礎(chǔ)

結(jié)構(gòu)化流處理概述

結(jié)構(gòu)化流處理背后的主要思想是將數(shù)據(jù)流視為連續(xù)追加數(shù)據(jù)的數(shù)據(jù)表
結(jié)構(gòu)化流即是以流處理方式處理的DataFrame

結(jié)構(gòu)化流處理的輸入

核心概念

轉(zhuǎn)換和動作

同樣適用這兩個概念,只是略微有一些限制

輸入源
  • Kafka
  • HDFS
  • 用于測試的socket源
接收器
  • 需要指定數(shù)據(jù)源來讀取數(shù)據(jù)流,接收器(sink)和執(zhí)行引擎還負(fù)責(zé)可靠地跟蹤號數(shù)據(jù)處理的進度
  • 支持的接收器:kafka、文件、用于測試的控制臺接收器、用于調(diào)試的內(nèi)存接收器、用于在輸出記錄上運行任意計算的foreach接收器
輸出模式
  • append (向輸出接收器中添加新記錄)
  • update (更新有變化的記錄)
  • complete (重寫所有的輸出)
觸發(fā)器
  • 定義了數(shù)據(jù)何時被輸出
事件時間處理
  • watermarks 允許你制定在事件時間內(nèi)查看數(shù)據(jù)的
  • 支持事件時間的系統(tǒng)通常允許設(shè)置watermarks來限制它們記住舊數(shù)據(jù)的時長

結(jié)構(gòu)化流處理實例

val static = spark.read.json("/data/activity-data/")
val dataSchema = static.schema

val streaming = spark.readStream.schema(dataSchema)
  .option("maxFilesPerTrigger", 1).json("/data/activity-data")

val activityCounts = streaming.groupBy("gt").count()

spark.conf.set("spark.sql.shuffle.partitions", 5)

val activityQuery = activityCounts.writeStream.queryName("activity_counts")
  .format("memory").outputMode("complete")
  .start()
// 啟動流式計算
activityQuery.awaitTermination()
// 查詢流數(shù)據(jù)
for( i <- 1 to 5 ) {
    spark.sql("SELECT * FROM activity_counts").show()
    Thread.sleep(1000)
}
數(shù)據(jù)輸入和輸出
// 讀取kafka
val ds3 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
// 寫入kafka
ds1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream.format("kafka")
  .option("checkpointLocation", "/to/HDFS-compatible/dir")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

// foreach接收器
datasetOfString.write.foreach(new ForeachWriter[String] {
  def open(partitionId: Long, version: Long): Boolean = {
    // open a database connection
  }
  def process(record: String) = {
    // write string to connection
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
  }
})

// 觸發(fā)器
import org.apache.spark.sql.streaming.Trigger

activityCounts.writeStream.trigger(Trigger.ProcessingTime("100 seconds"))
  .format("console").outputMode("complete").start()

第二十二章 事件時間和有狀態(tài)處理

有狀態(tài)處理

  • 當(dāng)你執(zhí)行有狀態(tài)操作時,Spark會為你處理所有復(fù)雜的事情。例如,在實現(xiàn)分組操作時,結(jié)構(gòu)化流處理會為你維護并更新信息,你只需指定處理邏輯。在執(zhí)行有狀態(tài)操作時,Spark會將中間結(jié)果信息存儲在狀態(tài)存儲中。Spark當(dāng)前的狀態(tài)存儲實現(xiàn)是一個內(nèi)存狀態(tài)存儲,它通過將中間狀態(tài)存儲到檢查點目錄來實現(xiàn)容錯。

滾動窗口

  • 窗口不會發(fā)生重疊,指定窗口的間隔
  • 時間窗口實際上是一個結(jié)構(gòu)體


    滾動窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

滑動窗口

  • 窗口可以重疊


    滑動窗口
import org.apache.spark.sql.functions.{window, col}
withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

使用水位處理延遲數(shù)據(jù)

  • 指定水位可以確定過期數(shù)據(jù)
  • 指定水位的方式
import org.apache.spark.sql.functions.{window, col}
withEventTime
  .withWatermark("event_time", "5 hours")
  .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))
  .count()
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("complete")
  .start()

在流中刪除重復(fù)項

dropDuplicates

import org.apache.spark.sql.functions.expr

withEventTime
  .withWatermark("event_time", "5 seconds")
  .dropDuplicates("User", "event_time")
  .groupBy("User")
  .count()
  .writeStream
  .queryName("deduplicated")
  .format("memory")
  .outputMode("complete")
  .start()

任意有狀態(tài)處理

  • 可以根據(jù)給定鍵的計數(shù)創(chuàng)建窗口
  • 如果特定時間范圍發(fā)生多個特定事件則報警
  • 如果不確定時間內(nèi)維護用戶會話,保存這些會話一遍稍后進行分析

執(zhí)行這類處理時需要做以下兩件事

  • 映射數(shù)據(jù)中的組,對每組數(shù)據(jù)進行操作,并為每個組生成至多一行(mapGroups WithState)
  • 映射數(shù)據(jù)中的組,對每個組生成一行或多行(flatMapGroups WithState)
超時

可以通過GroupState.setTimeoutTimes tamp(...) API設(shè)置超時時間戳

輸出模式
  • mapGroupWithState僅支持update更新
  • flatMapGroupsWithState支持append追加輸出和update更新輸出
mapGroupsWithState

需要給出如下定義

  • 三個類定義:輸入定義、狀態(tài)定義、可選的輸出定義
  • 基于鍵、事件迭代器和先前狀態(tài)的一個更新狀態(tài)函數(shù)
  • 超時時間函數(shù)
// 類定義
case class InputRow(user:String, timestamp:java.sql.Timestamp, activity:String)
case class UserState(user:String,
  var activity:String,
  var start:java.sql.Timestamp,
  var end:java.sql.Timestamp)

// 事件迭代器、狀態(tài)更新函數(shù)
def updateUserStateWithEvent(state:UserState, input:InputRow):UserState = {
  if (Option(input.timestamp).isEmpty) {
    return state
  }
  if (state.activity == input.activity) {

    if (input.timestamp.after(state.end)) {
      state.end = input.timestamp
    }
    if (input.timestamp.before(state.start)) {
      state.start = input.timestamp
    }
  } else {
    if (input.timestamp.after(state.end)) {
      state.start = input.timestamp
      state.end = input.timestamp
      state.activity = input.activity
    }
  }

  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, GroupState}
def updateAcrossEvents(user:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserState]):UserState = {
  var state:UserState = if (oldState.exists) oldState.get else UserState(user,
        "",
        new java.sql.Timestamp(6284160000000L),
        new java.sql.Timestamp(6284160L)
    )
  // we simply specify an old date that we can compare against and
  // immediately update based on the values in our data

  for (input <- inputs) {
    state = updateUserStateWithEvent(state, input)
    oldState.update(state)
  }
  state
}

// 啟動
import org.apache.spark.sql.streaming.GroupStateTimeout
withEventTime
  .selectExpr("User as user",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp", "gt as activity")
  .as[InputRow]
  .groupByKey(_.user)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("events_per_window")
  .format("memory")
  .outputMode("update")
  .start()
flatMapGroupsWithState

需要定義以下內(nèi)容

  • 三個類定義:輸入定義、狀態(tài)定義、可選的輸出定義
  • 一個函數(shù),輸入?yún)?shù)為一個鍵、一個多事件迭代器和先前狀態(tài)
  • 超時時間函數(shù)
case class InputRow(uid:String, timestamp:java.sql.Timestamp, x:Double,
  activity:String)
case class UserSession(val uid:String, var timestamp:java.sql.Timestamp,
  var activities: Array[String], var values: Array[Double])
case class UserSessionOutput(val uid:String, var activities: Array[String],
  var xAvg:Double)

def updateWithEvent(state:UserSession, input:InputRow):UserSession = {
  // handle malformed dates
  if (Option(input.timestamp).isEmpty) {
    return state
  }

  state.timestamp = input.timestamp
  state.values = state.values ++ Array(input.x)
  if (!state.activities.contains(input.activity)) {
    state.activities = state.activities ++ Array(input.activity)
  }
  state
}

import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
  GroupState}

def updateAcrossEvents(uid:String,
  inputs: Iterator[InputRow],
  oldState: GroupState[UserSession]):Iterator[UserSessionOutput] = {

  inputs.toSeq.sortBy(_.timestamp.getTime).toIterator.flatMap { input =>
    val state = if (oldState.exists) oldState.get else UserSession(
    uid,
    new java.sql.Timestamp(6284160000000L),
    Array(),
    Array())
    val newState = updateWithEvent(state, input)

    if (oldState.hasTimedOut) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else if (state.values.length > 1000) {
      val state = oldState.get
      oldState.remove()
      Iterator(UserSessionOutput(uid,
      state.activities,
      newState.values.sum / newState.values.length.toDouble))
    } else {
      oldState.update(newState)
      oldState.setTimeoutTimestamp(newState.timestamp.getTime(), "5 seconds")
      Iterator()
    }

  }
}

import org.apache.spark.sql.streaming.GroupStateTimeout

withEventTime.where("x is not null")
  .selectExpr("user as uid",
    "cast(Creation_Time/1000000000 as timestamp) as timestamp",
    "x", "gt as activity")
  .as[InputRow]
  .withWatermark("timestamp", "5 seconds")
  .groupByKey(_.uid)
  .flatMapGroupsWithState(OutputMode.Append,
    GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
  .writeStream
  .queryName("count_based_device")
  .format("memory")
  .start()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容