數(shù)據(jù)血緣
數(shù)據(jù)血緣(data lineage)是數(shù)據(jù)治理(data governance)的重要組成部分,也是元數(shù)據(jù)管理、數(shù)據(jù)質(zhì)量管理的有力工具。通俗地講,數(shù)據(jù)血緣就是數(shù)據(jù)在產(chǎn)生、加工、流轉(zhuǎn)到最終消費過程中形成的有層次的、可溯源的聯(lián)系。成熟的數(shù)據(jù)血緣系統(tǒng)可以幫助開發(fā)者快速定位問題,以及追蹤數(shù)據(jù)的更改,確定上下游的影響等等。
在數(shù)據(jù)倉庫的場景下,數(shù)據(jù)的載體是數(shù)據(jù)庫中的表和列(字段),相應(yīng)地,數(shù)據(jù)血緣根據(jù)粒度也可以分為較粗的表級血緣和較細(xì)的列(字段)級血緣。離線數(shù)倉的數(shù)據(jù)血緣提取已經(jīng)有了成熟的方法,如利用Hive提供的LineageLogger與Execution Hooks機制。本文就來簡要介紹一種在實時數(shù)倉中基于Calcite解析Flink SQL列級血緣的方法,在此之前,先用幾句話聊聊Calcite的關(guān)系式元數(shù)據(jù)體系。
Calcite關(guān)系式元數(shù)據(jù)
在Calcite內(nèi)部,庫表元數(shù)據(jù)由Catalog來處理,關(guān)系式元數(shù)據(jù)才會被冠以[Rel]Metadata的名稱。關(guān)系式元數(shù)據(jù)與RelNode對應(yīng),以下是與其相關(guān)的Calcite組件:
-
RelMetadataQuery:為關(guān)系式元數(shù)據(jù)提供統(tǒng)一的訪問接口; -
RelMetadataProvider:為RelMetadataQuery各接口提供實現(xiàn)的中間層; -
MetadataFactory:生產(chǎn)并維護RelMetadataProvider的工廠; -
MetadataHandler:處理關(guān)系式元數(shù)據(jù)的具體實現(xiàn)邏輯,全部位于org.apache.calcite.rel.metadata包下,且類名均以RelMd作為前綴。
Calcite內(nèi)置了許多種默認(rèn)的關(guān)系式元數(shù)據(jù)實現(xiàn),并以接口的形式統(tǒng)一維護在BuiltInMetadata抽象類里,如下圖所示,名稱都比較直白(如RowCount就表示該RelNode查詢結(jié)果的行數(shù))。

其中,ColumnOrigin.Handler就是負(fù)責(zé)解析列級血緣的MetadataHandler,對各類RelNode分別定義了相應(yīng)的尋找起源列的方法,其結(jié)構(gòu)如下圖所示。具體源碼會另外寫文章專門講解,本文先不提。

注意包括ColumnOrigin.Handler在內(nèi)的絕大多數(shù)MetadataHandler都是靠ReflectiveRelMetadataProvider來發(fā)揮作用。顧名思義,ReflectiveRelMetadataProvider通過反射取得各個MetadataHandler中的方法,并在內(nèi)部維護RelNode具體類型和通過Java Proxy生成的Metadata代理對象(其中包含Handler方法)的映射。這樣,通過RelMetadataQuery獲取關(guān)系式元數(shù)據(jù)時,用戶的請求就可以根據(jù)RelNode類型正確地dispatch到對應(yīng)的方法上去。
另外,還有少數(shù)MetadataHandler(如CumulativeCost/NonCumulativeCost對應(yīng)的Handlers)在Calcite工程里找不到具體的實現(xiàn)。它們的代碼是運行時生成的,并由JaninoRelMetadataProvider做動態(tài)編譯。關(guān)于代碼生成和Janino也在計劃中,暫不贅述。
當(dāng)然實際應(yīng)用時我們不需要了解這些細(xì)節(jié),只需要與RelMetadataQuery打交道。下面就來看看如何通過它取得我們想要的Flink SQL列血緣。
解析Flink SQL列級血緣
以Flink SQL任務(wù)中最為常見的單條INSERT INTO ... SELECT ...為例,首先我們需要取得SQL語句生成的RelNode對象,即邏輯計劃樹。
為了方便講解,這里筆者簡單粗暴地在o.a.f.table.api.internal.TableEnvironmentImpl類中定義了一個getInsertOperation()方法。它負(fù)責(zé)解析、驗證SQL語句,生成CatalogSinkModifyOperation,并取得它的PlannerQueryOperation子節(jié)點(即SELECT操作)。代碼如下。
public Tuple3<String, Map<String, String>, QueryOperation> getInsertOperation(String insertStmt) {
List<Operation> operations = getParser().parse(insertStmt);
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! getInsertOperation() only accepts a single INSERT statement.");
}
Operation operation = operations.get(0);
if (operation instanceof CatalogSinkModifyOperation) {
CatalogSinkModifyOperation sinkOperation = (CatalogSinkModifyOperation) operation;
QueryOperation queryOperation = sinkOperation.getChild();
return new Tuple3<>(
sinkOperation.getTableIdentifier().asSummaryString(),
sinkOperation.getDynamicOptions(),
queryOperation);
} else {
throw new TableException("Only INSERT is supported now.");
}
}
接下來就能夠取得Sink的表名以及對應(yīng)的RelNode根節(jié)點。示例SQL來自之前的<<From Calcite to Tampering with Flink SQL>>講義。
val tableEnv = StreamTableEnvironment.create(streamEnv, EnvironmentSettings.newInstance().build())
val sql = /* language=SQL */
s"""
|INSERT INTO tmp.print_joined_result
|SELECT FROM_UNIXTIME(a.ts / 1000, 'yyyy-MM-dd HH:mm:ss') AS tss, a.userId, a.eventType, a.siteId, b.site_name AS siteName
|FROM rtdw_ods.kafka_analytics_access_log_app /*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='DiveIntoBlinkExp') */ a
|LEFT JOIN rtdw_dim.mysql_site_war_zone_mapping_relation FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.siteId AS INT) = b.main_site_id
|WHERE a.userId > 7
|""".stripMargin
val insertOp = tableEnv.asInstanceOf[TableEnvironmentImpl].getInsertOperation(sql)
val tableName = insertOp.f0
val relNode = insertOp.f2.asInstanceOf[PlannerQueryOperation].getCalciteTree
然后對取得的RelNode進行邏輯優(yōu)化,即執(zhí)行之前所講過的FlinkStreamProgram,但僅執(zhí)行到LOGICAL_REWRITE階段為止。我們在本地將FlinkStreamProgram復(fù)制一份,并刪去PHYSICAL和PHYSICAL_REWRITE兩個階段,即:
object FlinkStreamProgramLogicalOnly {
val SUBQUERY_REWRITE = "subquery_rewrite"
val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"
val DECORRELATE = "decorrelate"
val TIME_INDICATOR = "time_indicator"
val DEFAULT_REWRITE = "default_rewrite"
val PREDICATE_PUSHDOWN = "predicate_pushdown"
val JOIN_REORDER = "join_reorder"
val PROJECT_REWRITE = "project_rewrite"
val LOGICAL = "logical"
val LOGICAL_REWRITE = "logical_rewrite"
def buildProgram(config: Configuration): FlinkChainedProgram[StreamOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]()
// rewrite sub-queries to joins
chainedProgram.addLast(
SUBQUERY_REWRITE,
FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext]
// rewrite QueryOperationCatalogViewTable before rewriting sub-queries
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_REF_RULES)
.build(), "convert table references before rewriting sub-queries to semi-join")
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.SEMI_JOIN_RULES)
.build(), "rewrite sub-queries to semi-join")
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES)
.build(), "sub-queries remove")
// convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable
.addProgram(FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.TABLE_REF_RULES)
.build(), "convert table references after sub-queries removed")
.build())
// rewrite special temporal join plan
// ...
// query decorrelation
// ...
// convert time indicators
// ...
// default rewrite, includes: predicate simplification, expression reduction, window
// properties rewrite, etc.
// ...
// rule based optimization: push down predicate(s) in where clause, so it only needs to read
// the required data
// ...
// join reorder
// ...
// project rewrite
// ...
// optimize the logical plan
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder
.add(FlinkStreamRuleSets.LOGICAL_OPT_RULES)
.setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
.build())
// logical rewrite
chainedProgram.addLast(
LOGICAL_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.LOGICAL_REWRITE)
.build())
chainedProgram
}
}
執(zhí)行FlinkStreamProgramLogicalOnly即可。注意StreamOptimizeContext內(nèi)需要傳入的上下文信息,通過各種workaround取得(FunctionCatalog可以在TableEnvironmentImpl內(nèi)增加一個Getter拿到)。
val logicalProgram = FlinkStreamProgramLogicalOnly.buildProgram(tableEnvConfig)
val optRelNode = logicalProgram.optimize(relNode, new StreamOptimizeContext {
override def getTableConfig: TableConfig = tableEnv.getConfig
override def getFunctionCatalog: FunctionCatalog = tableEnv.asInstanceOf[TableEnvironmentImpl].getFunctionCatalog
override def getCatalogManager: CatalogManager = tableEnv.asInstanceOf[TableEnvironmentImpl].getCatalogManager
override def getRexBuilder: RexBuilder = relNode.getCluster.getRexBuilder
override def getSqlExprToRexConverterFactory: SqlExprToRexConverterFactory =
relNode.getCluster.getPlanner.getContext.unwrap(classOf[FlinkContext]).getSqlExprToRexConverterFactory
override def isUpdateBeforeRequired: Boolean = false
override def needFinalTimeIndicatorConversion: Boolean = true
override def getMiniBatchInterval: MiniBatchInterval = MiniBatchInterval.NONE
})
對比一下優(yōu)化前與優(yōu)化后的RelNode:
--- Original RelNode ---
LogicalProject(tss=[FROM_UNIXTIME(/($0, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss')], userId=[$3], eventType=[$4], siteId=[$8], siteName=[$46])
LogicalFilter(condition=[>($3, 7)])
LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{8, 44}])
LogicalProject(ts=[$0], tss=[$1], tssDay=[$2], userId=[$3], eventType=[$4], columnType=[$5], fromType=[$6], grouponId=[$7], /* ... */, procTime=[PROCTIME()])
LogicalTableScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], hints=[[[OPTIONS inheritPath:[] options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
LogicalFilter(condition=[=(CAST($cor0.siteId):INTEGER, $8)])
LogicalSnapshot(period=[$cor0.procTime])
LogicalTableScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]])
--- Optimized RelNode ---
FlinkLogicalCalc(select=[FROM_UNIXTIME(/(ts, 1000), _UTF-16LE'yyyy-MM-dd HH:mm:ss') AS tss, userId, eventType, siteId, site_name AS siteName])
FlinkLogicalJoin(condition=[=($4, $6)], joinType=[left])
FlinkLogicalCalc(select=[ts, userId, eventType, siteId, CAST(siteId) AS siteId0], where=[>(userId, 7)])
FlinkLogicalTableSourceScan(table=[[hive, rtdw_ods, kafka_analytics_access_log_app]], fields=[ts, tss, tssDay, userId, eventType, columnType, fromType, grouponId, /* ... */, latitude, longitude], hints=[[[OPTIONS options:{properties.group.id=DiveIntoBlinkExp, scan.startup.mode=latest-offset}]]])
FlinkLogicalSnapshot(period=[$cor0.procTime])
FlinkLogicalCalc(select=[site_name, main_site_id])
FlinkLogicalTableSourceScan(table=[[hive, rtdw_dim, mysql_site_war_zone_mapping_relation]], fields=[site_id, site_name, site_city_id, /* ... */])
這里需要注意兩個問題。
其一,Calcite中RelMdColumnOrigins這個Handler類里并沒有處理Snapshot類型的RelNode,走fallback邏輯則會對所有非葉子節(jié)點的RelNode返回空,所以默認(rèn)情況下是拿不到Lookup Join字段的血緣關(guān)系的。我們還需要修改它的源碼,在遇到Snapshot時繼續(xù)深搜:
public Set<RelColumnOrigin> getColumnOrigins(Snapshot rel,
RelMetadataQuery mq, int iOutputColumn) {
return mq.getColumnOrigins(rel.getInput(), iOutputColumn);
}
其二,F(xiàn)link使用的Calcite版本為1.26,但是該版本不會追蹤派生列(isDerived == true,例如SUM(col))的血緣。1.27版本修復(fù)了此問題,為避免大版本不兼容,可以將對應(yīng)的issue CALCITE-4251 cherry-pick到內(nèi)部的Calcite 1.26分支上來。當(dāng)然別忘了重新編譯Calcite Core和Flink Table模塊。
最后就可以通過RelMetadataQuery取得結(jié)果表中字段的起源列了。So easy.
val metadataQuery = optRelNode.getCluster.getMetadataQuery
for (i <- 0 to 4) {
val origins = metadataQuery.getColumnOrigins(optRelNode, i)
if (origins != null) {
for (rco <- origins) {
val table = rco.getOriginTable
val tableName = table.getQualifiedName.mkString(".")
val ordinal = rco.getOriginColumnOrdinal
val fields = table.getRowType.getFieldNames
println(Seq(tableName, ordinal, fields.get(ordinal)).mkString("\t"))
}
} else {
println("NULL")
}
}
/* Outputs:
hive.rtdw_ods.kafka_analytics_access_log_app 0 ts
hive.rtdw_ods.kafka_analytics_access_log_app 3 userId
hive.rtdw_ods.kafka_analytics_access_log_app 4 eventType
hive.rtdw_ods.kafka_analytics_access_log_app 8 siteId
hive.rtdw_dim.mysql_site_war_zone_mapping_relation 1 site_name
*/
上面例子中的SQL語句比較簡單,因此產(chǎn)生的ColumnOrigin也只有單列??垂倏勺孕杏枚啾鞪OIN或者有聚合邏輯的SQL來測試,多列ColumnOrigin的情況下也很好用,免去了自行折騰RelVisitor或者RelShuttle的許多麻煩。
最后的血緣可視化這一步,普遍采用Neo4j、JanusGraph等圖數(shù)據(jù)庫承載并展示列血緣關(guān)系的數(shù)據(jù)。筆者也正在探索將Flink SQL列級血緣集成到Atlas的方法,進度比較慢,期望值請勿太高。
The End
博客荒廢良久,驚動大佬出面催更,慚愧慚愧。
受疫情影響,F(xiàn)FA 2021轉(zhuǎn)為線上,不能面基真可惜(
炒雞感謝會務(wù)組發(fā)來的大禮包~

也歡迎大家屆時光臨本鶸的presentation~
民那晚安晚安。