下面是一段創(chuàng)建structured streaming的Dataset的代碼:
val lines = spark.readStream.format("socket")
.option("host", "localhost").option("port", 9999).load();
會(huì)創(chuàng)建一個(gè)socket類(lèi)型的Source,該name2class的映射由DataSource.lookupDataSource()完成
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
...
serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList
...
應(yīng)該是從當(dāng)前類(lèi)路徑中查找所有的DataSourceRegister,并讀取它的shortName(),如果是"socket"就確定了由該DataSourceRegister來(lái)創(chuàng)建對(duì)應(yīng)的DataSource
果然,有一個(gè)TextSocketSourceProvider
class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegister with Logging {
...
override def shortName(): String = "socket"
override def createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
val host = parameters("host")
val port = parameters("port").toInt
new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
}
}
TextSocketSourceProvider的createSource創(chuàng)建一個(gè)TextSocketSource
TextSocketSource是一個(gè)Source,Source接口如下:
trait Source {
def schema: StructType
def getOffset: Option[Offset]
def getBatch(start: Option[Offset], end: Offset): DataFrame
def commit(end: Offset) : Unit = {}
def stop(): Unit
}