用WordCount走下Spark Structured Streaming的源碼

前言

最近看Apache Spark社區(qū)2.2之后Structured Streaming也已經(jīng)走上了Production Ready日程,再加之SPIP: Continuous Processing Mode for Structured Streaming真正流式執(zhí)行引擎的加持,Spark在流式計(jì)算領(lǐng)域有了真正值得一試的資本,再加上SparkSession可以離線、流式、機(jī)器學(xué)習(xí)等計(jì)算模型統(tǒng)一入口。俗話說(shuō),有入口有用戶,才有未來(lái)。是時(shí)候來(lái)一發(fā)源碼級(jí)別的了解了。

閱讀源碼方式最好的就是word count這種簡(jiǎn)單得不能再簡(jiǎn)單的case了,閑話少說(shuō),拋出示例代碼,開(kāi)始分析。

import org.apache.spark.sql.SparkSession
object WordCount extends App {
  val spark = SparkSession
    .builder()
    .appName("StructuredNetworkWordCount")
    .getOrCreate()
  import spark.implicits._
  // represents an unbounded table containing the streaming text data
  val lines = spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
  val words = lines.as[String].flatMap(_.split(" "))
  val wordCounts = words.groupBy("value").count()
  val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .start()
  query.awaitTermination()
}

import

import org.apache.spark.sql.SparkSession

既然統(tǒng)一以SparkSession為入口,當(dāng)然最先就該導(dǎo)入SparkSession依賴

以Maven工程為例,就應(yīng)該在pom文件中添加對(duì)應(yīng)${spark.version}的spark sql module的依賴

       <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>

創(chuàng)建SparkSession

val spark = SparkSession
    .builder()
    .appName("StructuredNetworkWordCount")
    .getOrCreate()

這部分用以初始化一個(gè)SparkSession的實(shí)例,和普通的2.x的Spark程序并無(wú)區(qū)別。在此不再贅述。

再次Import

  import spark.implicits._

導(dǎo)入一些隱式轉(zhuǎn)換,比如和schema相關(guān)的encoder,df和ds轉(zhuǎn)換的方法等。

創(chuàng)建輸入

val lines = spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

可以看到一切的開(kāi)始還是作為SparkSession實(shí)例的spark變量,終于要看源碼了,好激動(dòng),在此之前先git log看下記下是哪個(gè)commit下的源碼,不然以后變了或者看了更老的代碼就牛頭不對(duì)馬嘴了。

git log -1

commit 209b9361ac8a4410ff797cff1115e1888e2f7e66
Author: Bryan Cutler <cutlerb@gmail.com>
Date:   Mon Nov 13 13:16:01 2017 +0900

進(jìn)入SparkSession, 看下readStream方法做了啥?

  /**
   * Returns a `DataStreamReader` that can be used to read streaming data in as a `DataFrame`.
   * {{{
   *   sparkSession.readStream.parquet("/path/to/directory/of/parquet/files")
   *   sparkSession.readStream.schema(schema).json("/path/to/directory/of/json/files")
   * }}}
   *
   * @since 2.0.0
   */
  @InterfaceStability.Evolving
  def readStream: DataStreamReader = new DataStreamReader(self)

返回一個(gè)用于讀取流式數(shù)據(jù)的DataStreamReader實(shí)例,離線開(kāi)發(fā)的時(shí)候我們用的是read方法返回一個(gè)DataDrameReader的實(shí)例,可以看到流式計(jì)算和離線計(jì)算在這里開(kāi)始分開(kāi)旅行。

然后調(diào)用DataStreamReaderformat方法。

/**
   * Specifies the input data source format.
   *
   * @since 2.0.0
   */
  def format(source: String): DataStreamReader = {
    this.source = source
    this
  }

用以指定輸入數(shù)據(jù)的格式, 這必然和load數(shù)據(jù)時(shí)的DataSource API有關(guān),應(yīng)該是用對(duì)應(yīng)的名字映射成該數(shù)據(jù)格式的Class類型對(duì)象。

接著以option方法指定兩個(gè)參數(shù)host 和 port,

 /**
   * Adds an input option for the underlying data source.
   *
   * You can set the following option(s):
   * <ul>
   * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
   * to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
   * </ul>
   *
   * @since 2.0.0
   */
  def option(key: String, value: String): DataStreamReader = {
    this.extraOptions += (key -> value)
    this
  }

實(shí)際就是往DataStreamReader的成員變量extraOptions里加了些屬性,看注釋知個(gè)大概,實(shí)際用以指定DataSource接口的某些輸入?yún)?shù)。目前還不知host和port參數(shù)的實(shí)際作用。

接著看load方法,

  /**
   * Loads input data stream in as a `DataFrame`, for data streams that don't require a path
   * (e.g. external key-value stores).
   *
   * @since 2.0.0
   */
  def load(): DataFrame = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "read files of Hive data source directly.")
    }

    val dataSource =
      DataSource(
        sparkSession,
        userSpecifiedSchema = userSpecifiedSchema,
        className = source,
        options = extraOptions.toMap)
    Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
  }

前面所調(diào)用的方法都是返回reader自身實(shí)例,本方法返回一個(gè)df,我們知道在離線計(jì)算中spark都是預(yù)定義的模式,這個(gè)過(guò)程都不會(huì)“起job進(jìn)行運(yùn)算”(注解1)。這同樣適用在Structured Streaming中。load方法中我們實(shí)例化了一個(gè)dataSource變量,并由

/**
 * Used to link a streaming [[DataSource]] into a
 * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. This is only used for creating
 * a streaming [[org.apache.spark.sql.DataFrame]] from [[org.apache.spark.sql.DataFrameReader]].
 * It should be used to create [[Source]] and converted to [[StreamingExecutionRelation]] when
 * passing to [[StreamExecution]] to run a query.
 */
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
  extends LeafNode {
...
}

轉(zhuǎn)化為LogicalPlan,最后調(diào)用Dataset方法,


  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

完成了DataFrame的初步預(yù)定義。到這一看除了輸入不一樣,處理方式不會(huì)和離線也一樣吧,那不是處理一次就瞎了,別急。讓我們往里面瞅瞅,先看下val qe = sparkSession.sessionState.executePlan(logicalPlan), 可以看到

def executePlan(plan: LogicalPlan): QueryExecution = createQueryExecution(plan)

這里createQueryExecution: LogicalPlan => QueryExecution是一枚函數(shù),將logicalplan轉(zhuǎn)換為QueryExecution, 離線sql中這玩意就是其執(zhí)行整個(gè)workflow,那流sql也就同理了。既然起作為sparkSession.sessionState的一枚成員變量,那我們就先看看它是怎么被定義的,首先

/**
   * State isolated across sessions, including SQL configurations, temporary tables, registered
   * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
   * If `parentSessionState` is not null, the `SessionState` will be a copy of the parent.
   *
   * This is internal to Spark and there is no guarantee on interface stability.
   *
   * @since 2.2.0
   */
  @InterfaceStability.Unstable
  @transient
  lazy val sessionState: SessionState = {
    parentSessionState
      .map(_.clone(this))
      .getOrElse {
        val state = SparkSession.instantiateSessionState(
          SparkSession.sessionStateClassName(sparkContext.conf),
          self)
        initialSessionOptions.foreach { case (k, v) => state.conf.setConfString(k, v) }
        state
      }
  }

上面是sessionState的初始過(guò)程,看下SparkSession.instantiateSessionState里面干了啥,

 /**
   * Helper method to create an instance of `SessionState` based on `className` from conf.
   * The result is either `SessionState` or a Hive based `SessionState`.
   */
  private def instantiateSessionState(
      className: String,
      sparkSession: SparkSession): SessionState = {
    try {
      // invoke `new [Hive]SessionStateBuilder(SparkSession, Option[SessionState])`
      val clazz = Utils.classForName(className)
      val ctor = clazz.getConstructors.head
      ctor.newInstance(sparkSession, None).asInstanceOf[BaseSessionStateBuilder].build()
    } catch {
      case NonFatal(e) =>
        throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
    }
  }

通過(guò)java反射方式實(shí)例化一個(gè)BaseSessionStateBuilder的子類,然后調(diào)用build方法,返回一個(gè)SessionState實(shí)例,

 /**
   * Build the [[SessionState]].
   */
  def build(): SessionState = {
    new SessionState(
      session.sharedState,
      conf,
      experimentalMethods,
      functionRegistry,
      udfRegistration,
      () => catalog,
      sqlParser,
      () => analyzer,
      () => optimizer,
      planner,
      streamingQueryManager,
      listenerManager,
      () => resourceLoader,
      createQueryExecution,
      createClone)
  }

代碼倒數(shù)第二行我們看到了久違的createQueryExecution參數(shù),

 /**
   * Create a query execution object.
   */
  protected def createQueryExecution: LogicalPlan => QueryExecution = { plan =>
    new QueryExecution(session, plan)
  }

看了下QueryExecution定義,及其子類IncrementalExecution,

/**
 * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]]
 * plan incrementally. Possibly preserving state in between each execution.
 */
class IncrementalExecution(
    sparkSession: SparkSession,
    logicalPlan: LogicalPlan,
    val outputMode: OutputMode,
    val checkpointLocation: String,
    val runId: UUID,
    val currentBatchId: Long,
    val offsetSeqMetadata: OffsetSeqMetadata)
  extends QueryExecution(sparkSession, logicalPlan) with Logging

豁然開(kāi)朗,看來(lái)我們這邊是走錯(cuò)了,接下來(lái)的目的就是看我們的case中如何觸發(fā)了這個(gè)類的實(shí)例化了。

回到load方法里面,我們看到

Dataset.ofRows(sparkSession, StreamingRelation(dataSource))

這邊的logical plan 我們接收的是一個(gè)StreamingRelation, 其有個(gè)關(guān)鍵的標(biāo)志

override def isStreaming: Boolean = true

猜測(cè)流式執(zhí)行還是離線執(zhí)行必然是放在了規(guī)則器里了,那我就先記下,等等回來(lái)再看,為啥?因?yàn)槲覀兊倪壿嬘?jì)劃還只生成了一段,必然還不涉及到query execution相關(guān)的操作,等我把整個(gè)sql執(zhí)行計(jì)劃定義完了,我們?cè)诨貋?lái)看不遲。

Transformations

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

在我們的case中,我們對(duì)我們的df進(jìn)行了一系列transformation操作,這個(gè)和spark sql中的離線計(jì)算一毛一樣,就不贅述了,最后

  /**
   * Count the number of rows for each group.
   * The resulting `DataFrame` will also contain the grouping columns.
   *
   * @since 1.3.0
   */
  def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))

依然返回熟悉的DataFrame。

到此為止,我們完成了我們plan的預(yù)定義.

尋找流式執(zhí)行的最后稻草

最后一兩句代碼了,spark缺點(diǎn)太大了,寫(xiě)個(gè)分布式的app就這么幾行代碼,我覺(jué)得在講不清楚,我就要卷鋪蓋走人了。

回頭看看程序代碼

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .start()

先看看wordCounts.writeStream,


  /**
   * Interface for saving the content of the streaming Dataset out into external storage.
   *
   * @group basic
   * @since 2.0.0
   */
  @InterfaceStability.Evolving
  def writeStream: DataStreamWriter[T] = {
    if (!isStreaming) {
      logicalPlan.failAnalysis(
        "'writeStream' can be called only on streaming Dataset/DataFrame")
    }
    new DataStreamWriter[T](this)
  }

已經(jīng)要把dataset落存儲(chǔ)了啊,我是不是漏掉了神馬?好方??!

還是得回來(lái)一步步往下走,DataStreamWriteroutputMode

 /**
   * Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
   *   - `OutputMode.Append()`: only the new rows in the streaming DataFrame/Dataset will be
   *                            written to the sink
   *   - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written
   *                              to the sink every time these is some updates
   *   - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset
   *                            will be written to the sink every time there are some updates. If
   *                            the query doesn't contain aggregations, it will be equivalent to
   *                            `OutputMode.Append()` mode.
   *
   * @since 2.0.0
   */
  def outputMode(outputMode: OutputMode): DataStreamWriter[T] = {
    this.outputMode = outputMode
    this
  }

設(shè)定下sink的模式,目前支持complete, append, update,就是字面意思,對(duì)應(yīng)sink的語(yǔ)義上也很好理解。

接著調(diào)用.format("console")

 /**
   * Specifies the underlying output data source.
   *
   * @since 2.0.0
   */
  def format(source: String): DataStreamWriter[T] = {
    this.source = source
    this
  }

對(duì)應(yīng)source 我們有數(shù)據(jù)格式,sink我們也有對(duì)應(yīng)的格式,這邊我們選擇的是console,直接打印到控制臺(tái)。

最后了,調(diào)用.start(),代碼挺多的,看來(lái)有戲,

/**
   * Starts the execution of the streaming query, which will continually output results to the given
   * path as new data arrives. The returned [[StreamingQuery]] object can be used to interact with
   * the stream.
   *
   * @since 2.0.0
   */
  def start(): StreamingQuery = {
    if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
        "write files of Hive data source directly.")
    }

    if (source == "memory") {
      assertNotPartitioned("memory")
      if (extraOptions.get("queryName").isEmpty) {
        throw new AnalysisException("queryName must be specified for memory sink")
      }
      val sink = new MemorySink(df.schema, outputMode)
      val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
      val chkpointLoc = extraOptions.get("checkpointLocation")
      val recoverFromChkpoint = outputMode == OutputMode.Complete()
      val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        chkpointLoc,
        df,
        sink,
        outputMode,
        useTempCheckpointLocation = true,
        recoverFromCheckpointLocation = recoverFromChkpoint,
        trigger = trigger)
      resultDf.createOrReplaceTempView(query.name)
      query
    } else if (source == "foreach") {
      assertNotPartitioned("foreach")
      val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
      df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        extraOptions.get("checkpointLocation"),
        df,
        sink,
        outputMode,
        useTempCheckpointLocation = true,
        trigger = trigger)
    } else {
      val dataSource =
        DataSource(
          df.sparkSession,
          className = source,
          options = extraOptions.toMap,
          partitionColumns = normalizedParCols.getOrElse(Nil))
      df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        extraOptions.get("checkpointLocation"),
        df,
        dataSource.createSink(outputMode),
        outputMode,
        useTempCheckpointLocation = source == "console",
        recoverFromCheckpointLocation = true,
        trigger = trigger)
    }
  }

好吧,我以為寫(xiě)scala的都討厭用if else,我么指定的source是console直接走到else分支。

首先,我們看到實(shí)例化了一個(gè)DataSource API, 回想在先前的reader里也見(jiàn)過(guò)一個(gè),有頭有尾,不錯(cuò)。

通知我們這邊也看到了dataSource.createSink(outputMode), 開(kāi)頭的讀也有了,尾巴的寫(xiě)也有了,就差一個(gè)東西將兩者以流式的方式串聯(lián)起來(lái)了

接下來(lái),看看sparkSession.sessionState.streamingQueryManager,這貨在實(shí)例化sessionState時(shí)見(jiàn)過(guò)一面,當(dāng)時(shí)沒(méi)留意,再看看看。。

  /**
   * Interface to start and stop streaming queries.
   */
  protected def streamingQueryManager: StreamingQueryManager = new StreamingQueryManager(session)

注釋一目了然,那么邏輯自然在startQuery方法里面了,

  /**
   * Start a [[StreamingQuery]].
   *
   * @param userSpecifiedName Query name optionally specified by the user.
   * @param userSpecifiedCheckpointLocation  Checkpoint location optionally specified by the user.
   * @param df Streaming DataFrame.
   * @param sink  Sink to write the streaming outputs.
   * @param outputMode  Output mode for the sink.
   * @param useTempCheckpointLocation  Whether to use a temporary checkpoint location when the user
   *                                   has not specified one. If false, then error will be thrown.
   * @param recoverFromCheckpointLocation  Whether to recover query from the checkpoint location.
   *                                       If false and the checkpoint location exists, then error
   *                                       will be thrown.
   * @param trigger [[Trigger]] for the query.
   * @param triggerClock [[Clock]] to use for the triggering.
   */
  private[sql] def startQuery(
      userSpecifiedName: Option[String],
      userSpecifiedCheckpointLocation: Option[String],
      df: DataFrame,
      sink: Sink,
      outputMode: OutputMode,
      useTempCheckpointLocation: Boolean = false,
      recoverFromCheckpointLocation: Boolean = true,
      trigger: Trigger = ProcessingTime(0),
      triggerClock: Clock = new SystemClock()): StreamingQuery = {
    val query = createQuery(
      userSpecifiedName,
      userSpecifiedCheckpointLocation,
      df,
      sink,
      outputMode,
      useTempCheckpointLocation,
      recoverFromCheckpointLocation,
      trigger,
      triggerClock)

    activeQueriesLock.synchronized {
      // Make sure no other query with same name is active
      userSpecifiedName.foreach { name =>
        if (activeQueries.values.exists(_.name == name)) {
          throw new IllegalArgumentException(
            s"Cannot start query with name $name as a query with that name is already active")
        }
      }

      // Make sure no other query with same id is active
      if (activeQueries.values.exists(_.id == query.id)) {
        throw new IllegalStateException(
          s"Cannot start query with id ${query.id} as another query with same id is " +
            s"already active. Perhaps you are attempting to restart a query from checkpoint " +
            s"that is already active.")
      }

      activeQueries.put(query.id, query)
    }
    try {
      // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously.
      // As it's provided by the user and can run arbitrary codes, we must not hold any lock here.
      // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long
      // time to finish.
      query.streamingQuery.start()
    } catch {
      case e: Throwable =>
        activeQueriesLock.synchronized {
          activeQueries -= query.id
        }
        throw e
    }
    query
  }

參數(shù)很多,我們也沒(méi)用到幾個(gè),先不看。

方法邏輯是我小學(xué)寫(xiě)作文用的三短文結(jié)構(gòu),1、創(chuàng)建query 2、校驗(yàn)query 3、start

  • 創(chuàng)建query
 private def createQuery(
      userSpecifiedName: Option[String],
      userSpecifiedCheckpointLocation: Option[String],
      df: DataFrame,
      sink: Sink,
      outputMode: OutputMode,
      useTempCheckpointLocation: Boolean,
      recoverFromCheckpointLocation: Boolean,
      trigger: Trigger,
      triggerClock: Clock): StreamingQueryWrapper = {
    var deleteCheckpointOnStop = false
    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
      new Path(userSpecified).toUri.toString
    }.orElse {
      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
      }
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        throw new AnalysisException(
          "checkpointLocation must be specified either " +
            """through option("checkpointLocation", ...) or """ +
            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
      }
    }

    // If offsets have already been created, we trying to resume a query.
    if (!recoverFromCheckpointLocation) {
      val checkpointPath = new Path(checkpointLocation, "offsets")
      val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
      if (fs.exists(checkpointPath)) {
        throw new AnalysisException(
          s"This query does not support recovering from checkpoint location. " +
            s"Delete $checkpointPath to start over.")
      }
    }

    val analyzedPlan = df.queryExecution.analyzed
    df.queryExecution.assertAnalyzed()

    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
      UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
    }

    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
          "is not supported in streaming DataFrames/Datasets and will be disabled.")
    }

    new StreamingQueryWrapper(new StreamExecution(
      sparkSession,
      userSpecifiedName.orNull,
      checkpointLocation,
      analyzedPlan,
      sink,
      trigger,
      triggerClock,
      outputMode,
      deleteCheckpointOnStop))
  }

我們的case太簡(jiǎn)單,沒(méi)有checkpoint,沒(méi)有trigger, 所以基本就是默認(rèn)設(shè)置,上述邏輯中基本就不只是用到創(chuàng)建analyzedPlan,然后就直接實(shí)例化一個(gè)StreamingQueryWrapper, 完事,當(dāng)然重要的StreamExecution。這邊我們也注意到前期parse 完的un-resovled logical plan是由QueryExecution做的。

StreamExecution:這應(yīng)該真真就是我們的流式執(zhí)行引擎了。來(lái)看下他的主要成員:
sources: 流式數(shù)據(jù)源,前文已經(jīng)hit
logicalPlan: DataFrame/Dataset 的一系列變換,前文已經(jīng)hit
sink: 結(jié)果輸出端,前文已經(jīng)hit

再來(lái)看下,另外的重要成員變量是:
currentBatchId: 當(dāng)前執(zhí)行的 id,初始為-1,表示需要先初始化
offsetLog:WAL,記錄當(dāng)前每個(gè)batch的offset
batchCommitLog: 記錄已經(jīng)被commit掉的batch
availableOffsets:當(dāng)前可以被執(zhí)行的Offset,還未被commit到sink
committedOffsets: 當(dāng)前已經(jīng)被處理且commit到sink或者statestore的offset
watermarkMsMap: 記錄當(dāng)前操作的watermark,對(duì)于stateful的查詢而言可以處理delayed數(shù)據(jù)的一種手段。

通過(guò)StreamExecution就將整個(gè)流程穿起來(lái)了,后面通過(guò)代碼走讀在分析其工作原理。讓我們先回到三段文上來(lái)。

  • 校驗(yàn)query

    • Make sure no other query with same name is active
    • Make sure no other query with same id is active
  • start query

  /**
   * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
   * has been posted to all the listeners.
   */
  def start(): Unit = {
    logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.")
    microBatchThread.setDaemon(true)
    microBatchThread.start()
    startLatch.await()  // Wait until thread started and QueryStart event has been posted
  }

看到這??一喜,應(yīng)該找到組織了??吹絤icroBatch??一涼,看來(lái)還不是純種的流式計(jì)算框架。

 /**
   * The thread that runs the micro-batches of this stream. Note that this thread must be
   * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a
   * running `KafkaConsumer` may cause endless loop.
   */
  val microBatchThread =
    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
      override def run(): Unit = {
        // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
        // thread to this micro batch thread
        sparkSession.sparkContext.setCallSite(callSite)
        runBatches()
      }
    }

run方法里面調(diào)了runBatches方法,有點(diǎn)兒長(zhǎng),刪掉點(diǎn)好了,并在代碼里直接加注釋分析

 /**
   * Repeatedly attempts to run batches as data arrives
   */
  private def runBatches(): Unit = {
    try {
      // force initialization of the logical plan so that the sources can be created
      // 強(qiáng)行調(diào)用下logical plan,以創(chuàng)建sources,這變量是lazy的。  
      logicalPlan

      // Isolated spark session to run the batches with.
      // clone一個(gè)session來(lái)做真正執(zhí)行
      val sparkSessionToRunBatches = sparkSession.cloneSession()
      // 記錄source offset 序列的元數(shù)據(jù)信息,
      offsetSeqMetadata = OffsetSeqMetadata(
        batchWatermarkMs = 0, batchTimestampMs = 0, sparkSessionToRunBatches.conf)

      if (state.compareAndSet(INITIALIZING, ACTIVE)) {
        // 一個(gè)Trigger機(jī)制,我們質(zhì)變是一個(gè)`ProcessingTimeExecutor`,
        // executor方法實(shí)際就是一個(gè)while循環(huán),循環(huán)執(zhí)行execute的函數(shù)參數(shù)。
        triggerExecutor.execute(() => {
          startTrigger()

          if (isActive) {
            reportTimeTaken("triggerExecution") {
              // 嘗試獲取新數(shù)據(jù)
              if (currentBatchId < 0) {
                // We'll do this initialization only once
                // 第一次嘗試
                populateStartOffsets(sparkSessionToRunBatches)
                sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
                logDebug(s"Stream running from $committedOffsets to $availableOffsets")
              } else {
                // 后面這么嘗試,具體就是獲取source的最新offset,寫(xiě)wal,添加
                constructNextBatch()
              }
              // runBatch, 對(duì)當(dāng)前batch進(jìn)行計(jì)算,后續(xù)拉出來(lái)在看看
              if (dataAvailable) {
                currentStatus = currentStatus.copy(isDataAvailable = true)
                updateStatusMessage("Processing new data")
                runBatch(sparkSessionToRunBatches)
              }
            }
            // Report trigger as finished and construct progress object.
            finishTrigger(dataAvailable)
            // 做完運(yùn)算馬上寫(xiě)log
            if (dataAvailable) {
              // Update committed offsets.
              batchCommitLog.add(currentBatchId)
              committedOffsets ++= availableOffsets
              logDebug(s"batch ${currentBatchId} committed")
              // We'll increase currentBatchId after we complete processing current batch's data
              currentBatchId += 1
              sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
            } else {
              currentStatus = currentStatus.copy(isDataAvailable = false)
              updateStatusMessage("Waiting for data to arrive")
              Thread.sleep(pollingDelayMs)
            }
          }
          updateStatusMessage("Waiting for next trigger")
          isActive
        })
        updateStatusMessage("Stopped")
      } 
    } 
    }
  }

runBatch

/**
   * Processes any data available between `availableOffsets` and `committedOffsets`.
   * @param sparkSessionToRunBatch Isolated [[SparkSession]] to run this batch with.
   */
  private def runBatch(sparkSessionToRunBatch: SparkSession): Unit = {
    // Request unprocessed data from all sources.
    newData = reportTimeTaken("getBatch") {
      availableOffsets.flatMap {
        case (source, available)
          if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
          val current = committedOffsets.get(source)
          val batch = source.getBatch(current, available)
          assert(batch.isStreaming,
            s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
              s"${batch.queryExecution.logical}")
          logDebug(s"Retrieving data from $source: $current -> $available")
          Some(source -> batch)
        case _ => None
      }
    }

    // A list of attributes that will need to be updated.
    val replacements = new ArrayBuffer[(Attribute, Attribute)]
    // Replace sources in the logical plan with data that has arrived since the last batch.
    val withNewSources = logicalPlan transform {
      case StreamingExecutionRelation(source, output) =>
        newData.get(source).map { data =>
          val newPlan = data.logicalPlan
          assert(output.size == newPlan.output.size,
            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
            s"${Utils.truncatedString(newPlan.output, ",")}")
          replacements ++= output.zip(newPlan.output)
          newPlan
        }.getOrElse {
          LocalRelation(output, isStreaming = true)
        }
    }

    // Rewire the plan to use the new attributes that were returned by the source.
    val replacementMap = AttributeMap(replacements)
    val triggerLogicalPlan = withNewSources transformAllExpressions {
      case a: Attribute if replacementMap.contains(a) =>
        replacementMap(a).withMetadata(a.metadata)
      case ct: CurrentTimestamp =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
          ct.dataType)
      case cd: CurrentDate =>
        CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
          cd.dataType, cd.timeZoneId)
    }

    reportTimeTaken("queryPlanning") {
      lastExecution = new IncrementalExecution(
        sparkSessionToRunBatch,
        triggerLogicalPlan,
        outputMode,
        checkpointFile("state"),
        runId,
        currentBatchId,
        offsetSeqMetadata)
      lastExecution.executedPlan // Force the lazy generation of execution plan
    }

    val nextBatch =
      new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))

    reportTimeTaken("addBatch") {
      SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
        sink.addBatch(currentBatchId, nextBatch)
      }
    }
    ……
  }

runBatch這里有5個(gè)關(guān)鍵步驟:

1、從availableOffsets拿到最新 offset作為終點(diǎn),從 committedOffsets拿到前置batch的offset作為起點(diǎn),Source.getBatch(起點(diǎn),終點(diǎn)) 獲取本執(zhí)行新收到的數(shù)據(jù), 以Dataset/DataFrame 表示
2、以StreamExecution里預(yù)置的logicalPlan為藍(lán)本,進(jìn)行transform,得到綁定新的數(shù)據(jù)的logicalPlan (withNewSources)副本,具體做法就是模式匹配,用第一步的df替換StreamingExecutionRelation節(jié)點(diǎn)的source成員為最新。
3、接著通過(guò)該計(jì)劃創(chuàng)建IncrementalExecution ,
4、調(diào)用lastExecution.executedPlan之后會(huì)觸發(fā)Catalyst的一系列化學(xué)反應(yīng),將logicalplan轉(zhuǎn)化為可執(zhí)行的物理計(jì)劃
5、將該executedPlan打包成一個(gè)Dataset交給 Sink,即調(diào)用 Sink.addBatch觸發(fā)執(zhí)行,我們的sink是console,看下ConsoleSink的實(shí)現(xiàn),show方法觸發(fā)整個(gè)dag的運(yùn)轉(zhuǎn)

 override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {
    val batchIdStr = if (batchId <= lastBatchId) {
      s"Rerun batch: $batchId"
    } else {
      lastBatchId = batchId
      s"Batch: $batchId"
    }

    // scalastyle:off println
    println("-------------------------------------------")
    println(batchIdStr)
    println("-------------------------------------------")
    // scalastyle:off println
    data.sparkSession.createDataFrame(
      data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
      .show(numRowsToShow, isTruncated)
  }

總結(jié)

本文以wordcount app 簡(jiǎn)單的走讀了下Structured Streaming的源碼及執(zhí)行邏輯,記錄作為一個(gè)醫(yī)生轉(zhuǎn)行程序員看代碼的一個(gè)過(guò)程,見(jiàn)地比較淺顯,不足之處歡迎指正。
Structured Streaming中還有很多令人振奮的特性值得分析挖掘,希望在后面可以盡快分享給大家。

注解1: 對(duì)于輸入path目錄很多的情況下spark也會(huì)起job來(lái)并行listfilestatus

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容