203、Spark 2.0之Structured Streaming:創(chuàng)建流式的dataset和dataframe

創(chuàng)建流式的dataset和dataframe

流式dataframe可以通過DataStreamReader接口來創(chuàng)建,DataStreamReader對象是通過SparkSession的readStream()方法返回的。與創(chuàng)建靜態(tài)dataframe的read()方法類似,我們可以指定數(shù)據(jù)源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些內(nèi)置的source支持。

  1. file source
    以數(shù)據(jù)流的方式讀取一個目錄中的文件。支持text、csv、json、parquet等文件類型。文件必須是被移動到目錄中的,比如用mv命令。
  2. socket source
    從socket連接中讀取文本內(nèi)容。driver是負責監(jiān)聽請求的server socket。socket source只能被用來進行測試。

代碼

val socketDF = spark
    .readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()

socketDF.isStreaming    
socketDF.printSchema 

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
    .readStream
    .option("sep", ";")
    .schema(userSchema)      
    .csv("/path/to/directory")    

上面的例子都是產(chǎn)生untyped類型的dataframe,這就意味著在編譯時是無法檢查其schema的,只有在計算被提交并運行時才會進行檢查。一些操作,比如map、flatMap等,需要在編譯時就知道具體的類型。為了使用一些typed類型的操作,我們可以將dataframe轉(zhuǎn)換為typed類型的dataset,比如df.as[String]。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容