前言
最近看Apache Spark社區(qū)2.2之后Structured Streaming也已經(jīng)走上了Production Ready日程,再加之SPIP: Continuous Processing Mode for Structured Streaming真正流式執(zhí)行引擎的加持,Spark在流式計(jì)算領(lǐng)域有了真正值得一試的資本,再加上SparkSession可以離線、流式、機(jī)器學(xué)習(xí)等計(jì)算模型統(tǒng)一入口。俗話說(shuō),有入口有用戶,才有未來(lái)。是時(shí)候來(lái)一發(fā)源碼級(jí)別的了解了。
閱讀源碼方式最好的就是word count這種簡(jiǎn)單得不能再簡(jiǎn)單的case了,閑話少說(shuō),拋出示例代碼,開(kāi)始分析。
import org.apache.spark.sql.SparkSession
object WordCount extends App {
val spark = SparkSession
.builder()
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
// represents an unbounded table containing the streaming text data
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
import
import org.apache.spark.sql.SparkSession
既然統(tǒng)一以SparkSession為入口,當(dāng)然最先就該導(dǎo)入SparkSession依賴
以Maven工程為例,就應(yīng)該在pom文件中添加對(duì)應(yīng)${spark.version}的spark sql module的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
創(chuàng)建SparkSession
val spark = SparkSession
.builder()
.appName("StructuredNetworkWordCount")
.getOrCreate()
這部分用以初始化一個(gè)SparkSession的實(shí)例,和普通的2.x的Spark程序并無(wú)區(qū)別。在此不再贅述。
再次Import
import spark.implicits._
導(dǎo)入一些隱式轉(zhuǎn)換,比如和schema相關(guān)的encoder,df和ds轉(zhuǎn)換的方法等。
創(chuàng)建輸入
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
可以看到一切的開(kāi)始還是作為SparkSession實(shí)例的spark變量,終于要看源碼了,好激動(dòng),在此之前先git log看下記下是哪個(gè)commit下的源碼,不然以后變了或者看了更老的代碼就牛頭不對(duì)馬嘴了。
git log -1
commit 209b9361ac8a4410ff797cff1115e1888e2f7e66
Author: Bryan Cutler <cutlerb@gmail.com>
Date: Mon Nov 13 13:16:01 2017 +0900
進(jìn)入SparkSession, 看下readStream方法做了啥?
/**
* Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
* {{{
* sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
* sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
* }}}
*
* @since 2.0.0
*/
@InterfaceStability.Evolving
def readStream: DataStreamReader = new DataStreamReader(self)
返回一個(gè)用于讀取流式數(shù)據(jù)的DataStreamReader實(shí)例,離線開(kāi)發(fā)的時(shí)候我們用的是read方法返回一個(gè)DataDrameReader的實(shí)例,可以看到流式計(jì)算和離線計(jì)算在這里開(kāi)始分開(kāi)旅行。
然后調(diào)用DataStreamReader的format方法。
/**
* Specifies the input data source format.
*
* @since 2.0.0
*/
def format(source: String): DataStreamReader = {
this.source = source
this
}
用以指定輸入數(shù)據(jù)的格式, 這必然和load數(shù)據(jù)時(shí)的DataSource API有關(guān),應(yīng)該是用對(duì)應(yīng)的名字映射成該數(shù)據(jù)格式的Class類型對(duì)象。
接著以option方法指定兩個(gè)參數(shù)host 和 port,
/**
* Adds an input option for the underlying data source.
*
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* </ul>
*
* @since 2.0.0
*/
def option(key: String, value: String): DataStreamReader = {
this.extraOptions += (key -> value)
this
}
實(shí)際就是往DataStreamReader的成員變量extraOptions里加了些屬性,看注釋知個(gè)大概,實(shí)際用以指定DataSource接口的某些輸入?yún)?shù)。目前還不知host和port參數(shù)的實(shí)際作用。
接著看load方法,
/**
* Loads input data stream in as a `DataFrame`, for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
def load(): DataFrame = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val dataSource =
DataSource(
sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = extraOptions.toMap)
Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
}
前面所調(diào)用的方法都是返回reader自身實(shí)例,本方法返回一個(gè)df,我們知道在離線計(jì)算中spark都是預(yù)定義的模式,這個(gè)過(guò)程都不會(huì)“起job進(jìn)行運(yùn)算”(注解1)。這同樣適用在Structured Streaming中。load方法中我們實(shí)例化了一個(gè)dataSource變量,并由
/**
* Used to link a streaming [[DataSource]] into a
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating
* a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]].
* It should be used to create [[Source]] and converted to [[StreamingExecutionRelation]] when
* passing to [[StreamExecution]] to run a query.
*/
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
extends LeafNode {
...
}
轉(zhuǎn)化為LogicalPlan,最后調(diào)用Dataset方法,
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
完成了DataFrame的初步預(yù)定義。到這一看除了輸入不一樣,處理方式不會(huì)和離線也一樣吧,那不是處理一次就瞎了,別急。讓我們往里面瞅瞅,先看下val qe = sparkSession.sessionState.executePlan(logicalPlan), 可以看到
def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)
這里createQueryExecution: LogicalPlan => QueryExecution是一枚函數(shù),將logicalplan轉(zhuǎn)換為QueryExecution, 離線sql中這玩意就是其執(zhí)行整個(gè)workflow,那流sql也就同理了。既然起作為sparkSession.sessionState的一枚成員變量,那我們就先看看它是怎么被定義的,首先
/**
* State isolated across sessions, including SQL configurations, temporary tables, registered
* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
* If `parentSessionState` is not null, the `SessionState` will be a copy of the parent.
*
* This is internal to Spark and there is no guarantee on interface stability.
*
* @since 2.2.0
*/
@InterfaceStability.Unstable
@transient
lazy val sessionState: SessionState = {
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self)
initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
state
}
}
上面是sessionState的初始過(guò)程,看下SparkSession.instantiateSessionState里面干了啥,
/**
* Helper method to create an instance of `SessionState` based on `className` from conf.
* The result is either `SessionState` or a Hive based `SessionState`.
*/
private def instantiateSessionState(
className: String,
sparkSession: SparkSession): SessionState = {
try {
// invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
通過(guò)java反射方式實(shí)例化一個(gè)BaseSessionStateBuilder的子類,然后調(diào)用build方法,返回一個(gè)SessionState實(shí)例,
/**
* Build the [[SessionState]].
*/
def build(): SessionState = {
new SessionState(
session.sharedState,
conf,
experimentalMethods,
functionRegistry,
udfRegistration,
() => catalog,
sqlParser,
() => analyzer,
() => optimizer,
planner,
streamingQueryManager,
listenerManager,
() => resourceLoader,
createQueryExecution,
createClone)
}
代碼倒數(shù)第二行我們看到了久違的createQueryExecution參數(shù),
/**
* Create a query execution object.
*/
protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
new QueryExecution(session, plan)
}
看了下QueryExecution定義,及其子類IncrementalExecution,
/**
* A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
* plan incrementally. Possibly preserving state in between each execution.
*/
class IncrementalExecution(
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
val outputMode: OutputMode,
val checkpointLocation: String,
val runId: UUID,
val currentBatchId: Long,
val offsetSeqMetadata: OffsetSeqMetadata)
extends QueryExecution(sparkSession, logicalPlan) with Logging
豁然開(kāi)朗,看來(lái)我們這邊是走錯(cuò)了,接下來(lái)的目的就是看我們的case中如何觸發(fā)了這個(gè)類的實(shí)例化了。
回到load方法里面,我們看到
Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
這邊的logical plan 我們接收的是一個(gè)StreamingRelation, 其有個(gè)關(guān)鍵的標(biāo)志
override def isStreaming: Boolean = true
猜測(cè)流式執(zhí)行還是離線執(zhí)行必然是放在了規(guī)則器里了,那我就先記下,等等回來(lái)再看,為啥?因?yàn)槲覀兊倪壿嬘?jì)劃還只生成了一段,必然還不涉及到query execution相關(guān)的操作,等我把整個(gè)sql執(zhí)行計(jì)劃定義完了,我們?cè)诨貋?lái)看不遲。
Transformations
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
在我們的case中,我們對(duì)我們的df進(jìn)行了一系列transformation操作,這個(gè)和spark sql中的離線計(jì)算一毛一樣,就不贅述了,最后
/**
* Count the number of rows for each group.
* The resulting `DataFrame` will also contain the grouping columns.
*
* @since 1.3.0
*/
def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))
依然返回熟悉的DataFrame。
到此為止,我們完成了我們plan的預(yù)定義.
尋找流式執(zhí)行的最后稻草
最后一兩句代碼了,spark缺點(diǎn)太大了,寫(xiě)個(gè)分布式的app就這么幾行代碼,我覺(jué)得在講不清楚,我就要卷鋪蓋走人了。
回頭看看程序代碼
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
先看看wordCounts.writeStream,
/**
* Interface for saving the content of the streaming Dataset out into external storage.
*
* @group basic
* @since 2.0.0
*/
@InterfaceStability.Evolving
def writeStream: DataStreamWriter[T] = {
if (!isStreaming) {
logicalPlan.failAnalysis(
"'writeStream' can be called only on streaming Dataset/DataFrame")
}
new DataStreamWriter[T](this)
}
已經(jīng)要把dataset落存儲(chǔ)了啊,我是不是漏掉了神馬?好方??!
還是得回來(lái)一步步往下走,DataStreamWriter的outputMode
/**
* Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
* - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
* written to the sink
* - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
* to the sink every time these is some updates
* - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
* will be written to the sink every time there are some updates. If
* the query doesn't contain aggregations, it will be equivalent to
* `OutputMode.Append()` mode.
*
* @since 2.0.0
*/
def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
this.outputMode = outputMode
this
}
設(shè)定下sink的模式,目前支持complete, append, update,就是字面意思,對(duì)應(yīng)sink的語(yǔ)義上也很好理解。
接著調(diào)用.format("console")
/**
* Specifies the underlying output data source.
*
* @since 2.0.0
*/
def format(source: String): DataStreamWriter[T] = {
this.source = source
this
}
對(duì)應(yīng)source 我們有數(shù)據(jù)格式,sink我們也有對(duì)應(yīng)的格式,這邊我們選擇的是console,直接打印到控制臺(tái)。
最后了,調(diào)用.start(),代碼挺多的,看來(lái)有戲,
/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
def start(): StreamingQuery = {
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"write files of Hive data source directly.")
}
if (source == "memory") {
assertNotPartitioned("memory")
if (extraOptions.get("queryName").isEmpty) {
throw new AnalysisException("queryName must be specified for memory sink")
}
val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
chkpointLoc,
df,
sink,
outputMode,
useTempCheckpointLocation = true,
recoverFromCheckpointLocation = recoverFromChkpoint,
trigger = trigger)
resultDf.createOrReplaceTempView(query.name)
query
} else if (source == "foreach") {
assertNotPartitioned("foreach")
val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
sink,
outputMode,
useTempCheckpointLocation = true,
trigger = trigger)
} else {
val dataSource =
DataSource(
df.sparkSession,
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
dataSource.createSink(outputMode),
outputMode,
useTempCheckpointLocation = source == "console",
recoverFromCheckpointLocation = true,
trigger = trigger)
}
}
好吧,我以為寫(xiě)scala的都討厭用if else,我么指定的source是console直接走到else分支。
首先,我們看到實(shí)例化了一個(gè)DataSource API, 回想在先前的reader里也見(jiàn)過(guò)一個(gè),有頭有尾,不錯(cuò)。
通知我們這邊也看到了dataSource.createSink(outputMode), 開(kāi)頭的讀也有了,尾巴的寫(xiě)也有了,就差一個(gè)東西將兩者以流式的方式串聯(lián)起來(lái)了
接下來(lái),看看sparkSession.sessionState.streamingQueryManager,這貨在實(shí)例化sessionState時(shí)見(jiàn)過(guò)一面,當(dāng)時(shí)沒(méi)留意,再看看看。。
/**
* Interface to start and stop streaming queries.
*/
protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)
注釋一目了然,那么邏輯自然在startQuery方法里面了,
/**
* Start a [[StreamingQuery]].
*
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param df Streaming DataFrame.
* @param sink Sink to write the streaming outputs.
* @param outputMode Output mode for the sink.
* @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
* has not specified one. If false, then error will be thrown.
* @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location.
* If false and the checkpoint location exists, then error
* will be thrown.
* @param trigger [[Trigger]] for the query.
* @param triggerClock [[Clock]] to use for the triggering.
*/
private[sql] def startQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean = false,
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
val query = createQuery(
userSpecifiedName,
userSpecifiedCheckpointLocation,
df,
sink,
outputMode,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
trigger,
triggerClock)
activeQueriesLock.synchronized {
// Make sure no other query with same name is active
userSpecifiedName.foreach { name =>
if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
}
}
// Make sure no other query with same id is active
if (activeQueries.values.exists(_.id == query.id)) {
throw new IllegalStateException(
s"Cannot start query with id ${query.id} as another query with same id is " +
s"already active. Perhaps you are attempting to restart a query from checkpoint " +
s"that is already active.")
}
activeQueries.put(query.id, query)
}
try {
// When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
// As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
// Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
// time to finish.
query.streamingQuery.start()
} catch {
case e: Throwable =>
activeQueriesLock.synchronized {
activeQueries -= query.id
}
throw e
}
query
}
參數(shù)很多,我們也沒(méi)用到幾個(gè),先不看。
方法邏輯是我小學(xué)寫(xiě)作文用的三短文結(jié)構(gòu),1、創(chuàng)建query 2、校驗(yàn)query 3、start
- 創(chuàng)建query
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
sink: Sink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
new StreamingQueryWrapper(new StreamExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
sink,
trigger,
triggerClock,
outputMode,
deleteCheckpointOnStop))
}
我們的case太簡(jiǎn)單,沒(méi)有checkpoint,沒(méi)有trigger, 所以基本就是默認(rèn)設(shè)置,上述邏輯中基本就不只是用到創(chuàng)建analyzedPlan,然后就直接實(shí)例化一個(gè)StreamingQueryWrapper, 完事,當(dāng)然重要的StreamExecution。這邊我們也注意到前期parse 完的un-resovled logical plan是由QueryExecution做的。
StreamExecution:這應(yīng)該真真就是我們的流式執(zhí)行引擎了。來(lái)看下他的主要成員:
sources: 流式數(shù)據(jù)源,前文已經(jīng)hit
logicalPlan: DataFrame/Dataset 的一系列變換,前文已經(jīng)hit
sink: 結(jié)果輸出端,前文已經(jīng)hit
再來(lái)看下,另外的重要成員變量是:
currentBatchId: 當(dāng)前執(zhí)行的 id,初始為-1,表示需要先初始化
offsetLog:WAL,記錄當(dāng)前每個(gè)batch的offset
batchCommitLog: 記錄已經(jīng)被commit掉的batch
availableOffsets:當(dāng)前可以被執(zhí)行的Offset,還未被commit到sink
committedOffsets: 當(dāng)前已經(jīng)被處理且commit到sink或者statestore的offset
watermarkMsMap: 記錄當(dāng)前操作的watermark,對(duì)于stateful的查詢而言可以處理delayed數(shù)據(jù)的一種手段。
通過(guò)StreamExecution就將整個(gè)流程穿起來(lái)了,后面通過(guò)代碼走讀在分析其工作原理。讓我們先回到三段文上來(lái)。
-
校驗(yàn)query
- Make sure no other query with same name is active
- Make sure no other query with same id is active
start query
/**
* Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
* has been posted to all the listeners.
*/
def start(): Unit = {
logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")
microBatchThread.setDaemon(true)
microBatchThread.start()
startLatch.await() // Wait until thread started and QueryStart event has been posted
}
看到這??一喜,應(yīng)該找到組織了??吹絤icroBatch??一涼,看來(lái)還不是純種的流式計(jì)算框架。
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
* running `KafkaConsumer` may cause endless loop.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
sparkSession.sparkContext.setCallSite(callSite)
runBatches()
}
}
run方法里面調(diào)了runBatches方法,有點(diǎn)兒長(zhǎng),刪掉點(diǎn)好了,并在代碼里直接加注釋分析
/**
* Repeatedly attempts to run batches as data arrives
*/
private def runBatches(): Unit = {
try {
// force initialization of the logical plan so that the sources can be created
// 強(qiáng)行調(diào)用下logical plan,以創(chuàng)建sources,這變量是lazy的。
logicalPlan
// Isolated spark session to run the batches with.
// clone一個(gè)session來(lái)做真正執(zhí)行
val sparkSessionToRunBatches = sparkSession.cloneSession()
// 記錄source offset 序列的元數(shù)據(jù)信息,
offsetSeqMetadata = OffsetSeqMetadata(
batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf)
if (state.compareAndSet(INITIALIZING, ACTIVE)) {
// 一個(gè)Trigger機(jī)制,我們質(zhì)變是一個(gè)`ProcessingTimeExecutor`,
// executor方法實(shí)際就是一個(gè)while循環(huán),循環(huán)執(zhí)行execute的函數(shù)參數(shù)。
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
// 嘗試獲取新數(shù)據(jù)
if (currentBatchId < 0) {
// We'll do this initialization only once
// 第一次嘗試
populateStartOffsets(sparkSessionToRunBatches)
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
logDebug(s"Stream running from $committedOffsets to $availableOffsets")
} else {
// 后面這么嘗試,具體就是獲取source的最新offset,寫(xiě)wal,添加
constructNextBatch()
}
// runBatch, 對(duì)當(dāng)前batch進(jìn)行計(jì)算,后續(xù)拉出來(lái)在看看
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionToRunBatches)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
// 做完運(yùn)算馬上寫(xiě)log
if (dataAvailable) {
// Update committed offsets.
batchCommitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
updateStatusMessage("Stopped")
}
}
}
}
runBatch
/**
* Processes any data available between `availableOffsets` and `committedOffsets`.
* @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
*/
private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source, available)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
assert(batch.isStreaming,
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
s"${batch.queryExecution.logical}")
logDebug(s"Retrieving data from $source: $current -> $available")
Some(source -> batch)
case _ => None
}
}
// A list of attributes that will need to be updated.
val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Replace sources in the logical plan with data that has arrived since the last batch.
val withNewSources = logicalPlan transform {
case StreamingExecutionRelation(source, output) =>
newData.get(source).map { data =>
val newPlan = data.logicalPlan
assert(output.size == newPlan.output.size,
s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
s"${Utils.truncatedString(newPlan.output, ",")}")
replacements ++= output.zip(newPlan.output)
newPlan
}.getOrElse {
LocalRelation(output, isStreaming = true)
}
}
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) =>
replacementMap(a).withMetadata(a.metadata)
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
case cd: CurrentDate =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
cd.dataType, cd.timeZoneId)
}
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
triggerLogicalPlan,
outputMode,
checkpointFile("state"),
runId,
currentBatchId,
offsetSeqMetadata)
lastExecution.executedPlan // Force the lazy generation of execution plan
}
val nextBatch =
new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
reportTimeTaken("addBatch") {
SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
sink.addBatch(currentBatchId, nextBatch)
}
}
……
}
runBatch這里有5個(gè)關(guān)鍵步驟:
1、從availableOffsets拿到最新 offset作為終點(diǎn),從 committedOffsets拿到前置batch的offset作為起點(diǎn),Source.getBatch(起點(diǎn),終點(diǎn)) 獲取本執(zhí)行新收到的數(shù)據(jù), 以Dataset/DataFrame 表示
2、以StreamExecution里預(yù)置的logicalPlan為藍(lán)本,進(jìn)行transform,得到綁定新的數(shù)據(jù)的logicalPlan (withNewSources)副本,具體做法就是模式匹配,用第一步的df替換StreamingExecutionRelation節(jié)點(diǎn)的source成員為最新。
3、接著通過(guò)該計(jì)劃創(chuàng)建IncrementalExecution ,
4、調(diào)用lastExecution.executedPlan之后會(huì)觸發(fā)Catalyst的一系列化學(xué)反應(yīng),將logicalplan轉(zhuǎn)化為可執(zhí)行的物理計(jì)劃
5、將該executedPlan打包成一個(gè)Dataset交給 Sink,即調(diào)用 Sink.addBatch觸發(fā)執(zhí)行,我們的sink是console,看下ConsoleSink的實(shí)現(xiàn),show方法觸發(fā)整個(gè)dag的運(yùn)轉(zhuǎn)
override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
val batchIdStr = if (batchId <= lastBatchId) {
s"Rerun batch: $batchId"
} else {
lastBatchId = batchId
s"Batch: $batchId"
}
// scalastyle:off println
println("-------------------------------------------")
println(batchIdStr)
println("-------------------------------------------")
// scalastyle:off println
data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.show(numRowsToShow, isTruncated)
}
總結(jié)
本文以wordcount app 簡(jiǎn)單的走讀了下Structured Streaming的源碼及執(zhí)行邏輯,記錄作為一個(gè)醫(yī)生轉(zhuǎn)行程序員看代碼的一個(gè)過(guò)程,見(jiàn)地比較淺顯,不足之處歡迎指正。
Structured Streaming中還有很多令人振奮的特性值得分析挖掘,希望在后面可以盡快分享給大家。
注解1: 對(duì)于輸入path目錄很多的情況下,spark也會(huì)起job來(lái)并行listfilestatus