[Spark] Schema Inference - Spark推斷Schema的實(shí)現(xiàn)

1. 背景

Spark在的Dataframe在使用的過(guò)程中或涉及到schema的問(wèn)題,schema就是這個(gè)Row的數(shù)據(jù)結(jié)構(gòu)(StructType),在代碼中就是這個(gè)類(lèi)的定義。如果你想解析一個(gè)json或者csv文件成dataframe,那么就需要知道他的StructType。

徒手寫(xiě)一個(gè)復(fù)雜類(lèi)的StructType是個(gè)吃力不討好的事情,所以Spark默認(rèn)是支持自動(dòng)推斷schema的。但是如果使用流處理(Streaming)的話,他的支持力度是很受限的,最近在做Streaming處理的時(shí)候,遇到一些schema inference的問(wèn)題,所以借機(jī)學(xué)習(xí)整理下Spark源碼是如何實(shí)現(xiàn)的。

2. Spark版本

以下的代碼基于spark的版本:

項(xiàng)目 version
scala 2.11
spark-core 2.4.0
spark-sql 2.4.0
mongo-spark-connector 2.11

gradle的配置:

providedRuntime group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
providedRuntime group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
providedRuntime group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: '2.3.1'

3. Schema inference

3.1 spark的Schema inference

3.1.1 通過(guò)DDL來(lái)解析Schema

DDL的格式類(lèi)似于:"a INT, b STRING, c DOUBLE",

深入學(xué)習(xí)看這里:Open Data Description Language (OpenDDL)

StructType提供了接口直接通過(guò)解析DDL來(lái)識(shí)別StructType

    this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))

先把DDL string解析成SqlBaseLexer

val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))

然后, 然后...就看的不太懂了...

3.1.2 解析一個(gè)Json的Schema

Spark中Dataframe的文件讀取是通過(guò)DataFrameReader來(lái)完成的.

都是通過(guò)DataSet的ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan)方法轉(zhuǎn)為DataFrame

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

schema是由QueryExecution得到的

  def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
    val qe = sparkSession.sessionState.executePlan(logicalPlan)
    qe.assertAnalyzed()
    new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
  }

其中的qe.analyzed.schema這句就是QueryExecution先分析生成LogicPlan,分析的源碼在CheckAnalysis.scala中的def checkAnalysis(plan: LogicalPlan): Unit

 def checkAnalysis(plan: LogicalPlan): Unit = {
    // We transform up and order the rules so as to catch the first possible failure instead
    // of the result of cascading resolution failures.
    plan.foreachUp {

      case p if p.analyzed => // Skip already analyzed sub-plans

      case u: UnresolvedRelation =>
        u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")

      case operator: LogicalPlan =>
        // Check argument data types of higher-order functions downwards first.
        // If the arguments of the higher-order functions are resolved but the type check fails,
        // the argument functions will not get resolved, but we should report the argument type
        // check failure instead of claiming the argument functions are unresolved.
        operator transformExpressionsDown {
          case hof: HigherOrderFunction
              if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure =>
            hof.checkArgumentDataTypes() match {
              case TypeCheckResult.TypeCheckFailure(message) =>
                hof.failAnalysis(
                  s"cannot resolve '${hof.sql}' due to argument data type mismatch: $message")
            }
      
      
      ...
      ...
      
}       

最終由Logic的output: Seq[Attribute]轉(zhuǎn)為StructType:

  lazy val schema: StructType = StructType.fromAttributes(output)

具體每個(gè)Attribute轉(zhuǎn)你為StructType的代碼如下:

  private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))

3.1.3 Kafka的Schema

在使用Kafka的Streaming的時(shí)候,自動(dòng)推斷只能推斷到固定的幾個(gè)StructField, 如果value是Json的話,也不會(huì)進(jìn)一步解析出來(lái)。
這個(gè)是因?yàn)镵afka和json的dataSource是不一樣的
DataFrame在load的時(shí)候,會(huì)有DataSource基于provider name來(lái)找到這個(gè)provider的data source的類(lèi)定義

// DataSource.scala line 613
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
    val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
        classOf[OrcFileFormat].getCanonicalName
      case name if name.equalsIgnoreCase("orc") &&
          conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
        "org.apache.spark.sql.hive.orc.OrcFileFormat"
      case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
        "org.apache.spark.sql.avro.AvroFileFormat"
      case name => name
    }
    val provider2 = s"$provider1.DefaultSource" 
    ...
}
  • 如果輸入provider name是"json"返回的是JsonFileFormat
  • 如果是”kafka“返回的是KafkaSourceProvider
    KafkaSourceProvidersourceSchemakafkaSchema
  override def sourceSchema(
      sqlContext: SQLContext,
      schema: Option[StructType],
      providerName: String,
      parameters: Map[String, String]): (String, StructType) = {
    validateStreamOptions(parameters)
    require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
    (shortName(), KafkaOffsetReader.kafkaSchema)
  }

具體kafkaSchema的定義如下:

  def kafkaSchema: StructType = StructType(Seq(
    StructField("key", BinaryType),
    StructField("value", BinaryType),
    StructField("topic", StringType),
    StructField("partition", IntegerType),
    StructField("offset", LongType),
    StructField("timestamp", TimestampType),
    StructField("timestampType", IntegerType)
  ))

3.2 mongo-spark的Schema inference

3.2.1 MongoInferSchema源碼分析

看mongoSpark源碼的時(shí)候,意外從一個(gè)toDF的方法里發(fā)現(xiàn)了有個(gè)MongoInferSchema實(shí)現(xiàn)了類(lèi)型推斷.

  /**
   * Creates a `DataFrame` based on the schema derived from the optional type.
   *
   * '''Note:''' Prefer [[toDS[T<:Product]()*]] as computations will be more efficient.
   *  The rdd must contain an `_id` for MongoDB versions < 3.2.
   *
   * @tparam T The optional type of the data from MongoDB, if not provided the schema will be inferred from the collection
   * @return a DataFrame
   */
  def toDF[T <: Product: TypeTag](): DataFrame = {
    val schema: StructType = MongoInferSchema.reflectSchema[T]() match {
      case Some(reflectedSchema) => reflectedSchema
      case None                  => MongoInferSchema(toBsonDocumentRDD)
    }
    toDF(schema)
  }

于是研究了下,發(fā)現(xiàn)MongoInferSchema的實(shí)現(xiàn)分兩種情況:

  • 給定了要解析的class類(lèi)型

如果是給定了要解析的class類(lèi)型,那就很好辦,直接基于Spark的ScalaReflectionschemaFor函數(shù)將class轉(zhuǎn)為Schema:

case class Schema(dataType: DataType, nullable: Boolean)

這個(gè)SchemaScalaReflection中定義的一個(gè)case class,本質(zhì)是個(gè)catalyst DataType
所以可以再進(jìn)一步直接轉(zhuǎn)為StructType, 所以代碼實(shí)現(xiàn)很簡(jiǎn)單:

ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
  • 未給定要解析的class類(lèi)型

如果沒(méi)有給定要解析的class類(lèi)型,那就直接根據(jù)從mongo里讀取的RDD來(lái)推斷Schema. 這個(gè)具體的實(shí)現(xiàn)方式是對(duì)RDD進(jìn)行采樣,采樣數(shù)可以在readConfig中設(shè)置,默認(rèn)值是1000(private val DefaultSampleSize: Int = 1000).

因?yàn)閺膍ongo讀取出來(lái)的格式就是BsonDocument, 所以采樣的過(guò)程就是將每個(gè)BsonDocument轉(zhuǎn)為StructType

  private def getSchemaFromDocument(document: BsonDocument, readConfig: ReadConfig): StructType = {
    val fields = new util.ArrayList[StructField]()
    document.entrySet.asScala.foreach(kv => fields.add(DataTypes.createStructField(kv.getKey, getDataType(kv.getValue, readConfig), true)))
    DataTypes.createStructType(fields)
  }

然后將采樣的1000個(gè)集合進(jìn)行兩兩merge,獲取兼容的類(lèi)型,最終得到RootType,即為所需的Schema:

// perform schema inference on each row and merge afterwards
val rootType: DataType = sampleData
.map(getSchemaFromDocument(_, mongoRDD.readConfig))
.treeAggregate[DataType](StructType(Seq()))(
compatibleType(_, _, mongoRDD.readConfig, nested = false),
compatibleType(_, _, mongoRDD.readConfig, nested = false)
)

3.2.2 MongoInferSchema存在的問(wèn)題

3.2.2.1 Java兼容性問(wèn)題

雖然scala脫胎于java,但是在類(lèi)型和結(jié)構(gòu)上也逐漸出現(xiàn)了很多的不同點(diǎn),包括部分基礎(chǔ)結(jié)構(gòu)和各種各樣的復(fù)雜結(jié)構(gòu)。所以如果要推斷的類(lèi)是java類(lèi),MongoInferSchema 也提供了MongoInferSchemaJava 實(shí)現(xiàn)類(lèi)型反射:

/**
 * A helper for inferring the schema from Java
 *
 * In Spark 2.2.0 calling this method from Scala 2.10 caused compilation errors with the shadowed library in
 * `JavaTypeInference`. Moving it into Java stops Scala falling over and allows it to continue to work.
 *
 * See: SPARK-126
 */
final class MongoInferSchemaJava {

    @SuppressWarnings("unchecked")
    public static <T> StructType reflectSchema(final Class<T> beanClass) {
        return (StructType) JavaTypeInference.inferDataType(beanClass)._1();
    }

}

具體的推斷實(shí)現(xiàn)在def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]]函數(shù)中,代碼如下,這里就不詳細(xì)展開(kāi)了。

 /**
   * Infers the corresponding SQL data type of a Java type.
   * @param typeToken Java type
   * @return (SQL data type, nullable)
   */
  private def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]] = Set.empty)
    : (DataType, Boolean) = {
    typeToken.getRawType match {
      case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
        (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)

      case c: Class[_] if UDTRegistration.exists(c.getName) =>
        val udt = UDTRegistration.getUDTFor(c.getName).get.newInstance()
          .asInstanceOf[UserDefinedType[_ >: Null]]
        (udt, true)
       ...
       ...
       ...
        val properties = getJavaBeanReadableProperties(other)
        val fields = properties.map { property =>
          val returnType = typeToken.method(property.getReadMethod).getReturnType
          val (dataType, nullable) = inferDataType(returnType, seenTypeSet + other)
          new StructField(property.getName, dataType, nullable)
        }
        (new StructType(fields), true)
    }
  }

所以如果大家要使用mongo-spark的類(lèi)型推斷,那么可以基于scala和java封裝2個(gè)接口函數(shù)用于Schema Infer, 下面是我自己封裝的2個(gè)函數(shù):

  /**
    * @see [[MongoInferSchema.apply]]
    */
  protected def inferSchemaScala[T <: Product : TypeTag](): StructType = {
    MongoInferSchema.reflectSchema[T]() match {
      case Some(reflectedSchema) => reflectedSchema
      // canonicalizeType erases all empty structs, including the only one we want to keep
      case None => StructType(Seq())
    }
  }

  /**
    * @see [[MongoInferSchema.apply]]
    */
  protected def inferSchemaJava[T](beanClass: Class[T]): StructType = {
    MongoInferSchema.reflectSchema(beanClass)
  }

3.2.2.2 采樣推斷不準(zhǔn)確問(wèn)題

產(chǎn)生不準(zhǔn)確的原因在于:

  • 采用點(diǎn)不完整
    畢竟是采樣,如果某個(gè)字段在采樣點(diǎn)沒(méi)出現(xiàn),則會(huì)導(dǎo)致最終推斷的不準(zhǔn)確
  • 集合類(lèi)結(jié)構(gòu)泛型推斷錯(cuò)誤
    另外一個(gè)問(wèn)題是,比如字段里有個(gè)Map[String , String]類(lèi)型,可能會(huì)把其中的key推斷成不同的StrutType,而不是統(tǒng)一推斷成String。我自己做過(guò)測(cè)試,會(huì)一定程度上依賴(lài)某些key是否會(huì)高頻出現(xiàn),所以說(shuō)這種infer schema具有不確定性。

解決方案:

  • 使用的數(shù)據(jù)結(jié)構(gòu)盡量簡(jiǎn)單,不要有嵌套或者復(fù)雜結(jié)構(gòu)
    但這種情況,真正的生產(chǎn)環(huán)境很難,大部分公司的代碼結(jié)構(gòu),迭代了那么久,怎么會(huì)那么簡(jiǎn)單呢,對(duì)吧?
  • 給定要解析的class類(lèi)型
    這個(gè)是個(gè)很好的方案,可以確保沒(méi)有錯(cuò)誤,同時(shí),如果類(lèi)的字段或者結(jié)構(gòu)發(fā)生變化了,可以確保無(wú)縫兼容,不用重新修改代碼。

4. 總結(jié)

以上介紹了幾種spark內(nèi)部實(shí)現(xiàn) schema inference 源碼和使用方式。在日常大部分工作中這些東西都是被spark隱藏的,而且如果沒(méi)有特殊場(chǎng)景,也是不需要涉及到這里的東西。我是因?yàn)閯偤糜龅絊park Streaming讀寫(xiě)Kafka的Topic,但發(fā)現(xiàn)讀到的RDD/DataFrame沒(méi)有很好的解析Schema,于是研究了下相關(guān)的實(shí)現(xiàn)。
最終基于項(xiàng)目選擇了MongoInferSchema的實(shí)現(xiàn)方式,友好的解決了問(wèn)題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容