Structured Streaming同一個進(jìn)程支持多維度的統(tǒng)計輸出

Unsupported Operations

There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. Some of them are as follows.

  • Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.

  • Limit and take first N rows are not supported on streaming Datasets.

  • Distinct operations on streaming Datasets are not supported.

  • Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.

  • Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

  • count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.

  • foreach() - Instead use ds.writeStream.foreach(...) (see next section).

  • show() - Instead use the console sink (see next section).

If you try any of these operations, you will see an AnalysisException like “operation XYZ is not supported with streaming DataFrames/Datasets”. While some of them may be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting on the input stream is not supported, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently.

由于Structured Streaming不支持兩個流之間的join,但是我們在流計算的業(yè)務(wù)場景中經(jīng)常需要,既要按天統(tǒng)計,也需要按小時來統(tǒng)計,并同時輸出的場景;但是Structured Streaming 可以支持同一個流分成兩個相同的流去分組聚合,跑兩個query;代碼如下:

package com.spark.sunny.structuredstreaming

import com.spark.sunny.util.UdfUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoders

case class DeviceData(deviceId : String, usage : String,duration : String,eventBeginTime : String, serviceType : String)

/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/23 11:01 <br>
  * @see com.spark.sunny.structuredstreaming <br>
  */
object StructuredHdfsDevice {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("StructuredHdfsDevice")
      .master("local")
      .getOrCreate()
    val schema = Encoders.product[DeviceData].schema
    val lines =  spark.readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\test")
    import spark.implicits._
    val beginTimeDevice = lines
      .withColumn("eventBeginTime", UdfUtil.fmtTimestampUdf($"eventBeginTime", lit("yyyyMMddHHmmss")))
      .withColumn("eventBeginHour", substring($"eventBeginTime", 0, 10))
      .withColumn("eventBeginDay", substring($"eventBeginTime", 0, 8))

    val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    val queryHour = hourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    val dayDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

    val queryDay = dayDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    queryHour.awaitTermination()
    queryDay.awaitTermination()
  }
}

需要注意的是:不能使用Query start之后,馬上去調(diào)用awaitTermination,因為之會阻塞第二個分支Query的執(zhí)行,而應(yīng)該在所有的Query執(zhí)行完start之后,使用sparkSession.streams.awaitAnyTermination(),只有這樣才能確保兩個分支Query都能啟動。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容