spark structured streaming的source解析與自定義

下面是一段創(chuàng)建structured streaming的Dataset的代碼:

val lines = spark.readStream.format("socket")
    .option("host", "localhost").option("port", 9999).load();

會(huì)創(chuàng)建一個(gè)socket類(lèi)型的Source,該name2class的映射由DataSource.lookupDataSource()完成

val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
...
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList
...

應(yīng)該是從當(dāng)前類(lèi)路徑中查找所有的DataSourceRegister,并讀取它的shortName(),如果是"socket"就確定了由該DataSourceRegister來(lái)創(chuàng)建對(duì)應(yīng)的DataSource

果然,有一個(gè)TextSocketSourceProvider

class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
...
override def shortName(): String = "socket"

  override def createSource(
      sqlContext: SQLContext,
      metadataPath: String,
      schema: Option[StructType],
      providerName: String,
      parameters: Map[String, String]): Source = {
    val host = parameters("host")
    val port = parameters("port").toInt
    new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
  }
}

TextSocketSourceProvider的createSource創(chuàng)建一個(gè)TextSocketSource

TextSocketSource是一個(gè)Source,Source接口如下:

trait Source  {
  def schema: StructType
  def getOffset: Option[Offset]
  def getBatch(start: Option[Offset], end: Offset): DataFrame
  def commit(end: Offset) : Unit = {}
  def stop(): Unit
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容