-
示例代碼
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession} /** * 監(jiān)聽網(wǎng)絡端口發(fā)來的內(nèi)容,然后進行 WordCount */ object StructuredStreamingDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) val conf = new SparkConf() .setIfMissing("spark.master", "local[4]") .setAppName("Structured Network Count") .set("fs.defaultFS","file://D:/temp/defaultFS/") // 創(chuàng)建程序入口 SparkSession,并引入 spark.implicits 來允許 Scalaobject 隱式轉(zhuǎn)換為 DataFrame val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate() import spark.implicits._ // 第二步: 創(chuàng)建流。配置從 socket 讀取流數(shù)據(jù),地址和端口為 localhost: 9999 val lines: DataFrame = spark.readStream.format("socket") .option("host", "192.168.1.101") .option("port", "9999") .load() // 第三步: 進行單詞統(tǒng)計。這里 lines 是 DataFrame ,使用 as[String]給它定義類型轉(zhuǎn)換為 Dataset, 之后在 Dataset 里進行單詞統(tǒng)計。 val words: Dataset[String] = lines.as[String].flatMap(_.split(" ")) val wordcount: DataFrame = words.groupBy("value").count() // 第四步: 創(chuàng)建查詢句柄,定義打印結果方式并啟動程序 這里使用 writeStream 方法, 輸出模式為全部輸出到控制臺。 val query: StreamingQuery = wordcount.writeStream .outputMode(OutputMode.Complete) .format("console") .start() // 調(diào)用 awaitTermination 方法來防止程序在處理數(shù)據(jù)時停止 query.awaitTermination() } } -
運行結果
... Connected to the target VM, address: '127.0.0.1:64497', transport: 'socket' Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/12/06 10:41:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery. ------------------------------------------- Batch: 0 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 3| | cat| 1| +-----+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 3| | cat| 2| | owl| 1| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | dog| 4| | cat| 2| | owl| 2| +-----+-----+ ... -
遇到錯誤及解決
錯誤日志:
Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket' Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery. Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196) at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40) at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala) Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket' Process finished with exit code 1解決辦法:
- 去掉
core-site.xml配置文件或注釋掉該文件中的fs.defaultFS配置<property> <name>fs.defaultFS</name> <value>hdfs://cdh01:8020</value> </property> - 代碼中添加
set("fs.defaultFS","file://D:/temp/defaultFS/")
- 去掉
Structured Streaming 官方示例運行及問題解決
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
相關閱讀更多精彩內(nèi)容
- > `HDFS_DELEGATION_TOKEN` 這個BUG在很多文章中都出現(xiàn)著,講了很多原理,但是只給出了官方...
- 問題分析: 問題一 org.apache.hadoop.ipc.RemoteException(org.apach...
- 1. Overview: Structured Streaming是基于Spark SQL引擎的可擴展、具有容錯性...