背景
Flink LookupTableSource 通過使用流數(shù)據(jù)的一列或者多列的值,加載外部存儲數(shù)據(jù)(維表數(shù)據(jù)),進而完成對流數(shù)據(jù)的字段擴展。在維表數(shù)據(jù)不頻繁變更的情況下,為提高系統(tǒng)的處理能力,通常將流表數(shù)據(jù)緩存到TM內(nèi)存中。
當前,F(xiàn)link SQL 維表Join 生成的 Operator 數(shù)據(jù)下發(fā)方式為 Forward,意味著每個subTask中緩存著相同的數(shù)據(jù),此時緩存命中率較低。如果把維表Join的key作為Hash的條件,這樣就能保證下游每一個算子緩存不同的維表數(shù)據(jù),從而有效提升緩存命中率。
我們希望,在DDL語句中新增屬性信息來控制加載維表數(shù)據(jù),是否進行KeyBy功能。當Join多張維表時,根據(jù)表對應(yīng)屬性信息,選擇是否進行Key操作。
AST 轉(zhuǎn)換過程
FlinkStreamProgram 定義了一些列優(yōu)化規(guī)則,應(yīng)用在執(zhí)行樹的各個階段。維表JOIN涉及的主要階段包含temporal_join_rewrite、logical、physical、physical_rewrite,physical_rewrite 主要是對最終的物理執(zhí)行樹節(jié)點添加一些Trait,例如ChangelogMod,MiniBatchInterval等。不同階段生成的關(guān)系表達式樹:





實現(xiàn)方法一
在physical_rewrite階段添加優(yōu)化規(guī)則?;贔link 1.13.1版本進行擴展,以Join 多張mysql維表為例,完成維表KeyBy功能。
新增 LookupJoinHashRule 優(yōu)化規(guī)則,添加到FlinkStreamRuleSets#PHYSICAL_REWRITE階段。
在 PHYSICAL_REWRITE 階段添加是因為,F(xiàn)link對FlinkRelDistribution Trait的處理是創(chuàng)建了
StreamPhysicalExchange 物理執(zhí)行節(jié)點,我們只需要在形成的物理執(zhí)行計劃的StreamPhysicalLookupJoin 節(jié)點前增加 StreamPhysicalExchange 即可。為 JdbcDynamicTableFactory 新增 lookup.enable_hash 屬性信息,進行KeyBy控制。
public static final ConfigOption<String> LOOKUP_ENABLE_HASH =
ConfigOptions.key("lookup.enable_hash")
.stringType()
.defaultValue("false")
.withDescription("Dimension table join enable hash.");
- 在 CommonPhysicalLookupJoin 新增獲取維表 TableIdentifier 的方法。這樣才能從CatalogManager中獲取表的元數(shù)據(jù)信息。
CommonPhysicalLookupJoin#getTableIdentifier
def getTableIdentifier():ObjectIdentifier={
val tableIdentifier: ObjectIdentifier = temporalTable match {
case t: TableSourceTable => t.tableIdentifier
case t: LegacyTableSourceTable[_] => t.tableIdentifier
}
tableIdentifier
}
LookupJoinHashRule代碼:
public class LookupJoinHashRule extends RelOptRule {
public static LookupJoinHashRule INSTANCE = new LookupJoinHashRule();
private LookupJoinHashRule() {
// note: 當前規(guī)則僅適用于 StreamPhysicalLookupJoin 節(jié)點。
super(operand(StreamPhysicalLookupJoin.class, any()), "LookupJoinHashRule");
}
@Override
public boolean matches(RelOptRuleCall call) {
ObjectIdentifier tableIdentifier = ((StreamPhysicalLookupJoin) call.rel(0)).getTableIdentifier();
CatalogManager catalogManager = call.getPlanner().getContext().unwrap(FlinkContext.class).getCatalogManager();
CatalogManager.TableLookupResult tableLookupResult = catalogManager.getTable(tableIdentifier).get();
// note: 讀取維表的屬性信息
Map<String, String> options = tableLookupResult.getTable().getOptions();
String enabledHash = options.getOrDefault(JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.key(), JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.defaultValue());
return BooleanUtils.toBoolean(enabledHash);
}
@Override
public void onMatch(RelOptRuleCall relOptRuleCall) {
RelNode streamPhysicalLookupJoin = relOptRuleCall.rel(0);
JoinInfo joinInfo = ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).joinInfo();
//note: 構(gòu)建 FlinkRelDistribution Trait
FlinkRelDistribution requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true);
//note: 為StreamPhysicalLookupJoin的輸入節(jié)點新增StreamPhysicalExchange
RelNode hashInput = FlinkExpandConversionRule.satisfyDistribution(
FlinkConventions.STREAM_PHYSICAL(),
((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).getInput(),
requiredDistribution
);
// note: 使用新的物理執(zhí)行節(jié)點
relOptRuleCall.transformTo(streamPhysicalLookupJoin.copy(streamPhysicalLookupJoin.getTraitSet(), Arrays.asList(hashInput)));
}
}
運行測試
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
tableEnvironment.executeSql("CREATE TABLE kafka_table (\n" +
" user_id int,\n" +
" order_amount bigint,\n" +
" sname String,\n" +
" log_ts TIMESTAMP(3),\n" +
" proctime as PROCTIME()" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.kafka.max.poll.records' = '1',\n" +
" 'properties.max.poll.records ' = '1',\n" +
" 'topic' = 'mqTest02',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")");
// note: 開啟HASH
tableEnvironment.executeSql("CREATE TABLE jdbc_table2 (\n" +
" id int,\n" +
" name varchar,\n" +
" description STRING,\n" +
" catalog STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'scan.partition.column' = 'id',\n" +
" 'scan.partition.num' = '2',\n" +
" 'lookup.enable_hash' = 'true',\n" +
" 'scan.partition.lower-bound' = '1',\n" +
" 'scan.partition.upper-bound' = '1000',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'test1'\n" +
")");
// note: 不開啟HASH
tableEnvironment.executeSql("CREATE TABLE jdbc_table3 (\n" +
" id int,\n" +
" name varchar,\n" +
" description STRING,\n" +
" catalog STRING\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'scan.partition.column' = 'id',\n" +
" 'scan.partition.num' = '2',\n" +
" 'lookup.enable_hash' = 'false',\n" +
" 'scan.partition.lower-bound' = '1',\n" +
" 'scan.partition.upper-bound' = '1000',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'table-name' = 'test2'\n" +
")");
tableEnvironment.executeSql("CREATE TABLE fs_table (\n" +
" id bigint,\n" +
" name STRING,\n" +
" s3Name STRING,\n" +
" order_amount bigint,\n" +
" description STRING\n" +
") WITH (\n" +
"'connector' = 'print'" +
")");
tableEnvironment.executeSql("INSERT INTO fs_table select s1.user_id,s2.name,s3.name,s1.order_amount,s2.description " +
" from kafka_table s1 " +
" join jdbc_table2 FOR SYSTEM_TIME AS OF s1.proctime AS s2 " +
" ON s1.user_id=s2.id " +
" join jdbc_table3 FOR SYSTEM_TIME AS OF s1.proctime AS s3 " +
" ON s1.user_id=s3.id" +
"");
}
兩張維表都開啟Hash操作后,運行在Yarn上的拓撲圖:

一張維表開啟Hash,一張未開啟Hash情況下,運行在Yarn上的拓撲圖:

實現(xiàn)方法二
在ExecNode轉(zhuǎn)Transformation時進擴展。修改執(zhí)行節(jié)點 CommonExecLookupJoin 在 translateToPlanInternal 中添加 PartitionTransformation,這種方式形成的的物理執(zhí)行計劃樹和不進行hash生成的數(shù)結(jié)構(gòu)一樣。
public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
// validate whether the node is valid and supported.
validate(temporalTable);
final ExecEdge inputEdge = getInputEdges().get(0);
RowType inputRowType = (RowType) inputEdge.getOutputType();
RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
RowType resultRowType = (RowType) getOutputType();
validateLookupKeyType(lookupKeys, inputRowType, tableSourceRowType);
boolean isAsyncEnabled = false;
UserDefinedFunction userDefinedFunction =
LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
UserDefinedFunctionHelper.prepareInstance(
planner.getTableConfig().getConfiguration(), userDefinedFunction);
if (userDefinedFunction instanceof AsyncTableFunction) {
isAsyncEnabled = true;
}
boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
StreamOperatorFactory<RowData> operatorFactory;
if (isAsyncEnabled) {
operatorFactory =
createAsyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(AsyncTableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin);
} else {
operatorFactory =
createSyncLookupJoin(
temporalTable,
planner.getTableConfig(),
lookupKeys,
(TableFunction<Object>) userDefinedFunction,
planner.getRelBuilder(),
inputRowType,
tableSourceRowType,
resultRowType,
isLeftOuterJoin,
planner.getExecEnv().getConfig().isObjectReuseEnabled());
}
Transformation<RowData> inputTransformation =
(Transformation<RowData>) inputEdge.translateToPlan(planner);
// note: 新增 partitionTransformation
int[] hashKeys = lookupKeys.keySet().stream().mapToInt(key -> key).toArray();
final RowDataKeySelector keySelector =
KeySelectorUtil.getRowDataSelector(hashKeys, InternalTypeInfo.of(inputRowType));
final StreamPartitioner<RowData> partitioner =
new KeyGroupStreamPartitioner<>(
keySelector, DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
final Transformation<RowData> partitionTransformation =
new PartitionTransformation<>(inputTransformation, partitioner);
// note: 并行度比上一個多2
partitionTransformation.setParallelism(inputTransformation.getParallelism() + 2);
OneInputTransformation<RowData, RowData> inputTransform = new OneInputTransformation<>(
partitionTransformation,
getDescription(),
operatorFactory,
InternalTypeInfo.of(resultRowType),
partitionTransformation.getParallelism());
inputTransform.setParallelism(partitionTransformation.getParallelism());
inputTransform.setOutputType(InternalTypeInfo.of(resultRowType));
return inputTransform;
return transformation;
}
生成的拓撲圖:

實現(xiàn)方法三
在logical階段為節(jié)點添加FlinkRelDistribution特質(zhì),在physical階段該特質(zhì)生成 StreamPhysicalExchange。在StreamPhysicalLookupJoinRule中將FlinkLogicalRel中的默認FlinkRelDistribution Trait,替換成 hash。這樣在對物理執(zhí)行節(jié)點優(yōu)化時,會為該Trait 生成Exchange 節(jié)點。
org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalLookupJoinRule#doTransform
private def doTransform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
temporalTable: RelOptTable,
calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {
val joinInfo = join.analyzeCondition
val cluster = join.getCluster
val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
// note: 使用該方法獲取維表配置信息, 是否進行HASH判斷
val options = temporalTable.asInstanceOf[TableSourceTable].catalogTable.getOptions;
// note: 生成hash Distribution
val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true)
val requiredTrait = input.getTraitSet
.replace(requiredDistribution) // 替換 FlinkRelDistributionTraitDef
.replace(FlinkConventions.STREAM_PHYSICAL)
val convInput = RelOptRule.convert(input, requiredTrait)
new StreamPhysicalLookupJoin(
cluster,
providedTrait,
convInput,
temporalTable,
calcProgram,
joinInfo,
join.getJoinType)
}
生成的拓撲圖
