Flink SQL 擴展維表 Keyby 的三種實現(xiàn)方式

背景

Flink LookupTableSource 通過使用流數(shù)據(jù)的一列或者多列的值,加載外部存儲數(shù)據(jù)(維表數(shù)據(jù)),進而完成對流數(shù)據(jù)的字段擴展。在維表數(shù)據(jù)不頻繁變更的情況下,為提高系統(tǒng)的處理能力,通常將流表數(shù)據(jù)緩存到TM內(nèi)存中。

當前,F(xiàn)link SQL 維表Join 生成的 Operator 數(shù)據(jù)下發(fā)方式為 Forward,意味著每個subTask中緩存著相同的數(shù)據(jù),此時緩存命中率較低。如果把維表Join的key作為Hash的條件,這樣就能保證下游每一個算子緩存不同的維表數(shù)據(jù),從而有效提升緩存命中率。

我們希望,在DDL語句中新增屬性信息來控制加載維表數(shù)據(jù),是否進行KeyBy功能。當Join多張維表時,根據(jù)表對應(yīng)屬性信息,選擇是否進行Key操作。

AST 轉(zhuǎn)換過程

FlinkStreamProgram 定義了一些列優(yōu)化規(guī)則,應(yīng)用在執(zhí)行樹的各個階段。維表JOIN涉及的主要階段包含temporal_join_rewrite、logical、physical、physical_rewrite,physical_rewrite 主要是對最終的物理執(zhí)行樹節(jié)點添加一些Trait,例如ChangelogMod,MiniBatchInterval等。不同階段生成的關(guān)系表達式樹:


初始階段
重寫temporal_join階段
邏輯優(yōu)化階段
物理優(yōu)化階段
最終生成的執(zhí)行樹

實現(xiàn)方法一

在physical_rewrite階段添加優(yōu)化規(guī)則?;贔link 1.13.1版本進行擴展,以Join 多張mysql維表為例,完成維表KeyBy功能。

  1. 新增 LookupJoinHashRule 優(yōu)化規(guī)則,添加到FlinkStreamRuleSets#PHYSICAL_REWRITE階段。
    在 PHYSICAL_REWRITE 階段添加是因為,F(xiàn)link對FlinkRelDistribution Trait的處理是創(chuàng)建了
    StreamPhysicalExchange 物理執(zhí)行節(jié)點,我們只需要在形成的物理執(zhí)行計劃的StreamPhysicalLookupJoin 節(jié)點前增加 StreamPhysicalExchange 即可。

  2. 為 JdbcDynamicTableFactory 新增 lookup.enable_hash 屬性信息,進行KeyBy控制。

public static final ConfigOption<String> LOOKUP_ENABLE_HASH =
        ConfigOptions.key("lookup.enable_hash")
                .stringType()
                .defaultValue("false")
                .withDescription("Dimension table  join enable hash.");

  1. 在 CommonPhysicalLookupJoin 新增獲取維表 TableIdentifier 的方法。這樣才能從CatalogManager中獲取表的元數(shù)據(jù)信息。
CommonPhysicalLookupJoin#getTableIdentifier
def getTableIdentifier():ObjectIdentifier={
    val tableIdentifier: ObjectIdentifier = temporalTable match {
        case t: TableSourceTable => t.tableIdentifier
            case t: LegacyTableSourceTable[_] => t.tableIdentifier
        }
    tableIdentifier
}


LookupJoinHashRule代碼:

public class LookupJoinHashRule extends RelOptRule {
    public static LookupJoinHashRule INSTANCE = new LookupJoinHashRule();

    private LookupJoinHashRule() {
        // note: 當前規(guī)則僅適用于 StreamPhysicalLookupJoin 節(jié)點。
        super(operand(StreamPhysicalLookupJoin.class, any()), "LookupJoinHashRule");
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        ObjectIdentifier tableIdentifier = ((StreamPhysicalLookupJoin) call.rel(0)).getTableIdentifier();
        CatalogManager catalogManager = call.getPlanner().getContext().unwrap(FlinkContext.class).getCatalogManager();
        CatalogManager.TableLookupResult tableLookupResult = catalogManager.getTable(tableIdentifier).get();
        // note: 讀取維表的屬性信息
        Map<String, String> options = tableLookupResult.getTable().getOptions();
        String enabledHash = options.getOrDefault(JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.key(), JdbcDynamicTableFactory.LOOKUP_ENABLE_HASH.defaultValue());
        return BooleanUtils.toBoolean(enabledHash);
    }

    @Override
    public void onMatch(RelOptRuleCall relOptRuleCall) {
        RelNode streamPhysicalLookupJoin = relOptRuleCall.rel(0);
        JoinInfo joinInfo = ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).joinInfo();
        //note:  構(gòu)建 FlinkRelDistribution Trait
        FlinkRelDistribution requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true);
        //note:  為StreamPhysicalLookupJoin的輸入節(jié)點新增StreamPhysicalExchange
        RelNode hashInput = FlinkExpandConversionRule.satisfyDistribution(
                FlinkConventions.STREAM_PHYSICAL(),
                ((StreamPhysicalLookupJoin) streamPhysicalLookupJoin).getInput(),
                requiredDistribution
               );
        // note: 使用新的物理執(zhí)行節(jié)點
        relOptRuleCall.transformTo(streamPhysicalLookupJoin.copy(streamPhysicalLookupJoin.getTraitSet(),  Arrays.asList(hashInput)));
    }
}

運行測試

public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, envSettings);
        tableEnvironment.executeSql("CREATE TABLE kafka_table (\n" +
                "  user_id int,\n" +
                "  order_amount bigint,\n" +
                "  sname String,\n" +
                "  log_ts TIMESTAMP(3),\n" +
                "  proctime as  PROCTIME()" +
                ") WITH (\n" +
                "    'connector' = 'kafka',\n" +
                "    'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "    'properties.kafka.max.poll.records' = '1',\n" +
                "    'properties.max.poll.records ' = '1',\n" +
                "    'topic' = 'mqTest02',\n" +
                "    'format' = 'json',\n" +
                "    'scan.startup.mode' = 'latest-offset'\n" +
                ")");
        // note: 開啟HASH 
        tableEnvironment.executeSql("CREATE TABLE jdbc_table2 (\n" +
                "  id int,\n" +
                "  name varchar,\n" +
                "  description STRING,\n" +
                "  catalog STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'scan.partition.column' = 'id',\n" +
                "    'scan.partition.num' = '2',\n" +
                "    'lookup.enable_hash' = 'true',\n" +
                "    'scan.partition.lower-bound' = '1',\n" +
                "    'scan.partition.upper-bound' = '1000',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'table-name' = 'test1'\n" +
                ")");
        // note: 不開啟HASH 
        tableEnvironment.executeSql("CREATE TABLE jdbc_table3 (\n" +
                "  id int,\n" +
                "  name varchar,\n" +
                "  description STRING,\n" +
                "  catalog STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'scan.partition.column' = 'id',\n" +
                "    'scan.partition.num' = '2',\n" +
                "    'lookup.enable_hash' = 'false',\n" +
                "    'scan.partition.lower-bound' = '1',\n" +
                "    'scan.partition.upper-bound' = '1000',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/mqTest?useUnicode=true&characterEncoding=utf-8',\n" +
                "    'username' = 'root',\n" +
                "    'password' = '123456',\n" +
                "    'table-name' = 'test2'\n" +
                ")");

        tableEnvironment.executeSql("CREATE TABLE fs_table (\n" +
                "  id bigint,\n" +
                "  name STRING,\n" +
                "  s3Name STRING,\n" +
                "  order_amount bigint,\n" +
                "  description STRING\n" +
                ") WITH (\n" +
                       "'connector' = 'print'" +
                ")");

        tableEnvironment.executeSql("INSERT INTO fs_table select s1.user_id,s2.name,s3.name,s1.order_amount,s2.description " +
                "  from kafka_table s1 " +
                "  join jdbc_table2 FOR SYSTEM_TIME AS OF s1.proctime AS s2 " +
                "       ON s1.user_id=s2.id " +
                "  join jdbc_table3 FOR SYSTEM_TIME AS OF s1.proctime  AS s3 " +
                "       ON s1.user_id=s3.id" +
                "");

}

兩張維表都開啟Hash操作后,運行在Yarn上的拓撲圖:


兩張維表都開啟HASH

一張維表開啟Hash,一張未開啟Hash情況下,運行在Yarn上的拓撲圖:


一張維表開啟HASH

實現(xiàn)方法二

在ExecNode轉(zhuǎn)Transformation時進擴展。修改執(zhí)行節(jié)點 CommonExecLookupJoin 在 translateToPlanInternal 中添加 PartitionTransformation,這種方式形成的的物理執(zhí)行計劃樹和不進行hash生成的數(shù)結(jié)構(gòu)一樣。

public Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    RelOptTable temporalTable = temporalTableSourceSpec.getTemporalTable(planner);
    // validate whether the node is valid and supported.
    validate(temporalTable);
    final ExecEdge inputEdge = getInputEdges().get(0);
    RowType inputRowType = (RowType) inputEdge.getOutputType();
    RowType tableSourceRowType = FlinkTypeFactory.toLogicalRowType(temporalTable.getRowType());
    RowType resultRowType = (RowType) getOutputType();
    validateLookupKeyType(lookupKeys, inputRowType, tableSourceRowType);

    boolean isAsyncEnabled = false;
    UserDefinedFunction userDefinedFunction =
            LookupJoinUtil.getLookupFunction(temporalTable, lookupKeys.keySet());
    UserDefinedFunctionHelper.prepareInstance(
            planner.getTableConfig().getConfiguration(), userDefinedFunction);

    if (userDefinedFunction instanceof AsyncTableFunction) {
        isAsyncEnabled = true;
    }

    boolean isLeftOuterJoin = joinType == FlinkJoinType.LEFT;
    StreamOperatorFactory<RowData> operatorFactory;
    if (isAsyncEnabled) {
        operatorFactory =
                createAsyncLookupJoin(
                        temporalTable,
                        planner.getTableConfig(),
                        lookupKeys,
                        (AsyncTableFunction<Object>) userDefinedFunction,
                        planner.getRelBuilder(),
                        inputRowType,
                        tableSourceRowType,
                        resultRowType,
                        isLeftOuterJoin);
    } else {
        operatorFactory =
                createSyncLookupJoin(
                        temporalTable,
                        planner.getTableConfig(),
                        lookupKeys,
                        (TableFunction<Object>) userDefinedFunction,
                        planner.getRelBuilder(),
                        inputRowType,
                        tableSourceRowType,
                        resultRowType,
                        isLeftOuterJoin,
                        planner.getExecEnv().getConfig().isObjectReuseEnabled());
    }

      Transformation<RowData> inputTransformation =
            (Transformation<RowData>) inputEdge.translateToPlan(planner);

    //  note: 新增 partitionTransformation
    int[] hashKeys = lookupKeys.keySet().stream().mapToInt(key -> key).toArray();
    final RowDataKeySelector keySelector =
        KeySelectorUtil.getRowDataSelector(hashKeys, InternalTypeInfo.of(inputRowType));
    final StreamPartitioner<RowData> partitioner =
        new KeyGroupStreamPartitioner<>(
            keySelector, DEFAULT_LOWER_BOUND_MAX_PARALLELISM);

    final Transformation<RowData> partitionTransformation =
        new PartitionTransformation<>(inputTransformation, partitioner);
    // note: 并行度比上一個多2
    partitionTransformation.setParallelism(inputTransformation.getParallelism() + 2);

    OneInputTransformation<RowData, RowData> inputTransform = new OneInputTransformation<>(
        partitionTransformation,
        getDescription(),
        operatorFactory,
        InternalTypeInfo.of(resultRowType),
        partitionTransformation.getParallelism());
    inputTransform.setParallelism(partitionTransformation.getParallelism());
    inputTransform.setOutputType(InternalTypeInfo.of(resultRowType));
    return inputTransform;
    return transformation;
}

生成的拓撲圖:


維表hash.png

實現(xiàn)方法三

在logical階段為節(jié)點添加FlinkRelDistribution特質(zhì),在physical階段該特質(zhì)生成 StreamPhysicalExchange。在StreamPhysicalLookupJoinRule中將FlinkLogicalRel中的默認FlinkRelDistribution Trait,替換成 hash。這樣在對物理執(zhí)行節(jié)點優(yōu)化時,會為該Trait 生成Exchange 節(jié)點。

org.apache.flink.table.planner.plan.rules.physical.stream.StreamPhysicalLookupJoinRule#doTransform
private def doTransform(
  join: FlinkLogicalJoin,
  input: FlinkLogicalRel,
  temporalTable: RelOptTable,
  calcProgram: Option[RexProgram]): StreamPhysicalLookupJoin = {

  val joinInfo = join.analyzeCondition
  val cluster = join.getCluster

  val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  // note: 使用該方法獲取維表配置信息, 是否進行HASH判斷
  val options =  temporalTable.asInstanceOf[TableSourceTable].catalogTable.getOptions;
  // note: 生成hash Distribution
  val requiredDistribution = FlinkRelDistribution.hash(joinInfo.leftKeys, true)
  val requiredTrait = input.getTraitSet
       .replace(requiredDistribution)   // 替換 FlinkRelDistributionTraitDef
      .replace(FlinkConventions.STREAM_PHYSICAL)

  val convInput = RelOptRule.convert(input, requiredTrait)
  new StreamPhysicalLookupJoin(
    cluster,
    providedTrait,
    convInput,
    temporalTable,
    calcProgram,
    joinInfo,
    join.getJoinType)
}

生成的拓撲圖


維表hash
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容