Structured Streaming
structured streaming是一種可伸縮的、容錯的、基于Spark SQL引擎的流式計算引擎。你可以使用,與針對靜態(tài)數(shù)據(jù)的批處理計算操作一樣的方式來編寫流式計算操作。隨著數(shù)據(jù)不斷地到達,Spark SQL引擎會以一種增量的方式來執(zhí)行這些操作,并且持續(xù)更新結(jié)算結(jié)果??梢允褂胘ava、scala等編程語言,以及dataset/dataframe api來編寫計算操作,執(zhí)行數(shù)據(jù)流的聚合、基于event的滑動窗口、流式數(shù)據(jù)與離線數(shù)據(jù)的join等操作。所有這些操作都與Spark SQL使用一套引擎來執(zhí)行。此外,structured streaming會通過checkpoint和預(yù)寫日志等機制來實現(xiàn)一次且僅一次的語義。簡單來說,對于開發(fā)人員來說,根本不用去考慮是流式計算,還是批處理,只要使用同樣的方式來編寫計算操作即可,structured streaming在底層會自動去實現(xiàn)快速、可伸縮、容錯、一次且僅一次語義。
spark 2.0僅僅是提供beta版本的structured streaming,所有的相關(guān)api都是實驗性質(zhì)的。
WordCount入門案例
object StructuredNetworkWordCount {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
.format("socket")
.option("host", "spark-project-1")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}