Structured Streaming 官方示例運行及問題解決

  1. 示例代碼
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.streaming.OutputMode
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 監(jiān)聽網(wǎng)絡端口發(fā)來的內(nèi)容,然后進行 WordCount
      */
    object StructuredStreamingDemo {
    
      def main(args: Array[String]): Unit = {
    
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        val conf = new SparkConf()
          .setIfMissing("spark.master", "local[4]")
          .setAppName("Structured Network Count")
          .set("fs.defaultFS","file://D:/temp/defaultFS/")
    
        // 創(chuàng)建程序入口 SparkSession,并引入 spark.implicits 來允許 Scalaobject 隱式轉(zhuǎn)換為 DataFrame
        val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
        import spark.implicits._
        
        // 第二步: 創(chuàng)建流。配置從 socket 讀取流數(shù)據(jù),地址和端口為 localhost: 9999
        val lines: DataFrame = spark.readStream.format("socket")
        .option("host", "192.168.1.101")
        .option("port", "9999")
        .load()
    
        // 第三步: 進行單詞統(tǒng)計。這里 lines 是 DataFrame ,使用 as[String]給它定義類型轉(zhuǎn)換為 Dataset, 之后在 Dataset 里進行單詞統(tǒng)計。
        val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))
        val wordcount: DataFrame = words.groupBy("value").count()
    
        // 第四步: 創(chuàng)建查詢句柄,定義打印結果方式并啟動程序 這里使用 writeStream 方法, 輸出模式為全部輸出到控制臺。
        val query: StreamingQuery = wordcount.writeStream
          .outputMode(OutputMode.Complete)
          .format("console")
          .start()
        // 調(diào)用 awaitTermination 方法來防止程序在處理數(shù)據(jù)時停止
        query.awaitTermination()
      }
    }
    
  2. 運行結果
    ...
    Connected to the target VM, address: '127.0.0.1:64497', transport: 'socket'
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/12/06 10:41:31 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    3|
    |  cat|    1|
    +-----+-----+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    3|
    |  cat|    2|
    |  owl|    1|
    +-----+-----+
    
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +-----+-----+
    |value|count|
    +-----+-----+
    |  dog|    4|
    |  cat|    2|
    |  owl|    2|
    +-----+-----+
    ...
    
  3. 遇到錯誤及解決

    錯誤日志:

    Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket'
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename.
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196)
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
     at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
     at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221)
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
     at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40)
     at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala)
    Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket'
    
    Process finished with exit code 1
    

    解決辦法:

    1. 去掉 core-site.xml 配置文件或注釋掉該文件中的 fs.defaultFS 配置
      <property>
        <name>fs.defaultFS</name>
        <value>hdfs://cdh01:8020</value>
      </property>
      
    2. 代碼中添加 set("fs.defaultFS","file://D:/temp/defaultFS/")
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容