創(chuàng)建流式的dataset和dataframe
流式dataframe可以通過DataStreamReader接口來創(chuàng)建,DataStreamReader對象是通過SparkSession的readStream()方法返回的。與創(chuàng)建靜態(tài)dataframe的read()方法類似,我們可以指定數(shù)據(jù)源的一些配置信息,比如data format、schema、option等。spark 2.0中初步提供了一些內(nèi)置的source支持。
- file source
以數(shù)據(jù)流的方式讀取一個目錄中的文件。支持text、csv、json、parquet等文件類型。文件必須是被移動到目錄中的,比如用mv命令。 - socket source
從socket連接中讀取文本內(nèi)容。driver是負責監(jiān)聽請求的server socket。socket source只能被用來進行測試。
代碼
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming
socketDF.printSchema
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema)
.csv("/path/to/directory")
上面的例子都是產(chǎn)生untyped類型的dataframe,這就意味著在編譯時是無法檢查其schema的,只有在計算被提交并運行時才會進行檢查。一些操作,比如map、flatMap等,需要在編譯時就知道具體的類型。為了使用一些typed類型的操作,我們可以將dataframe轉(zhuǎn)換為typed類型的dataset,比如df.as[String]。