apache iceberg v1寫記錄

一、概述
基于Iceberg-Flink- V1相關(guān)api來完成的數(shù)據(jù)寫入


寫入過程

二、實(shí)現(xiàn)

關(guān)鍵源碼---FlinkSink.chainIcebergOperators

private <T> DataStreamSink<T> chainIcebergOperators() {
  // 省略部分代碼
  // 加載iceberg table
  if (table == null) {
    tableLoader.open();  // open TableLoader
    try (TableLoader loader = tableLoader) {
      this.table = loader.loadTable();
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
    }
  }

  // 需將icebeg table schema轉(zhuǎn)為Flink Row類型
  // Convert the requested flink table schema to flink row type.
  RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);

  // 根據(jù)指定partition field并且對應(yīng)的分發(fā)模式是hash,通過keyBy(equality fields)來將輸入進(jìn)行分發(fā)
  // 若是分發(fā)模式是None不需要對輸入流做任何處理直接返回
  // 若是分發(fā)模式是Range 當(dāng)前版本還不支持
  // Distribute the records from input data stream based on the write.distribution-mode.
  DataStream<RowData> distributeStream = distributeDataStream(
      rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);

  // 此處通過并行writer來完成datafile寫入
  // 不過需要注意的寫入是否開啟upsert,一旦開啟則不能對datafile進(jìn)行overwrite只能使用append
  // Add parallel writers that append rows to files
  SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);

  // 在checkpoint完成時或輸入流結(jié)束,則通過單并行度完成data file的提交
  // Add single-parallelism committer that commits files
  // after successful checkpoint or end of input
  SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
  // 若是本次checkpoint成功,但commit文件失敗,通過指定虛擬discard接收器在下次checkpoint成功在進(jìn)行提交
  // Add dummy discard sink
  return appendDummySink(committerStream);
}

操作代碼拆解

- distributeDataStream:輸入流內(nèi)容分組

private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
                                                   Map<String, String> properties,
                                                   PartitionSpec partitionSpec,
                                                   Schema iSchema,
                                                   RowType flinkRowType) {
    // 首先獲取指定的write.distribution-mode                                                   
    DistributionMode writeMode;
    if (distributionMode == null) { // 默認(rèn)使用NONE
      // Fallback to use distribution mode parsed from table properties if don't specify in job level.
      String modeName = PropertyUtil.propertyAsString(properties,
          WRITE_DISTRIBUTION_MODE,
          WRITE_DISTRIBUTION_MODE_DEFAULT);

      writeMode = DistributionMode.fromName(modeName);
    } else {   // HASH和RANGE
      writeMode = distributionMode;
    }

    switch (writeMode) {
      case NONE:  // 不需要做任何處理 直接返回
        return input;

      case HASH:  // 目前只要時按照partition field來進(jìn)行分組的
        if (partitionSpec.isUnpartitioned()) { // 未分區(qū)
          return input;
        } else { // 若是分區(qū),則通過partition field 進(jìn)行keyBy
          return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
        }

      case RANGE: // 目前不支持,直接返回
        LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
            WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
        return input;

      default:
        throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
    }
  }
}

- appendWriter: 并行寫入datafile

private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
   // 1、獲取指定EquailityFields 
  // Find out the equality field id list based on the user-provided equality field column names.
  List<Integer> equalityFieldIds = Lists.newArrayList();
  if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
    for (String column : equalityFieldColumns) {
      org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
      Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
          column, table.schema());
      equalityFieldIds.add(field.fieldId());
    }
  }
  // 2、判斷當(dāng)前操作是否時upsert(可以通過job級別和table級別進(jìn)行設(shè)置)
  // Fallback to use upsert mode parsed from table properties if don't specify in job level.
  boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
      UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);

  // Validate the equality fields and partition fields if we enable the upsert mode.
  // 若是當(dāng)前操作upsert 則通過需要進(jìn)行EquailityFields檢查(若是寫入時分區(qū)表,則對應(yīng)的partition field也需要存在其中),
  // 并且當(dāng)前針對datafile操作是不能進(jìn)行overwrite
  if (upsertMode) {
    // upsert操作下 不能進(jìn)行overwrite  
    Preconditions.checkState(!overwrite,
        "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
    // upsert操作下 對應(yīng)的EquailityFields是不允許為空     
    Preconditions.checkState(!equalityFieldIds.isEmpty(),
        "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
    // 是否分區(qū)表    
    if (!table.spec().isUnpartitioned()) {
      for (PartitionField partitionField : table.spec().fields()) { // 分區(qū)表對應(yīng)的分區(qū)字段必須存在EquailityFields集合中
        Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
            "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
            partitionField, equalityFieldColumns);
      }
    }
  }

  // 3、構(gòu)建StreamWriter
  IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
  // 3.1 將輸入流寫入到datafile是可以并發(fā)進(jìn)行的
  int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
  SingleOutputStreamOperator<WriteResult> writerStream = input
      .transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
      .setParallelism(parallelism);
  if (uidPrefix != null) {
    writerStream = writerStream.uid(uidPrefix + "-writer");
  }
  return writerStream;
}

// 補(bǔ)充代碼: 創(chuàng)建StreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
                                                       RowType flinkRowType,
                                                       List<Integer> equalityFieldIds,
                                                       boolean upsert) {
  // 1、獲取指定的table 配置信息                                                         
  Preconditions.checkArgument(table != null, "Iceberg table should't be null");
  Map<String, String> props = table.properties();
  // 1.1 目標(biāo)文件大小
  long targetFileSize = getTargetFileSizeBytes(props);
      // 1.2 file format(orc/parquet/avro)
  FileFormat fileFormat = getFileFormat(props);
  // 2、構(gòu)建一個read-only且可序列化的表
  Table serializableTable = SerializableTable.copyOf(table);
  // 3、構(gòu)建TaskWriter
  TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
      serializableTable, flinkRowType, targetFileSize,
      fileFormat, equalityFieldIds, upsert);
  // 4、創(chuàng)建StreamWriter
  return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}

- appendCommitter: 當(dāng)checkpoint成功或輸入流結(jié)束,通過文件提交器完成對應(yīng)的文件commit

private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
  // 1、構(gòu)建FilesCommitter  
  IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
  // 2、單并行度文件提交器
  SingleOutputStreamOperator<Void> committerStream = writerStream
      .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
      .setParallelism(1)
      .setMaxParallelism(1);
  if (uidPrefix != null) {
    committerStream = committerStream.uid(uidPrefix + "-committer");
  }
  return committerStream;
}

三、實(shí)例:基于HadoopCatalog進(jìn)行iceberg數(shù)據(jù)寫入

// 省略部分代碼 
// flink datastream api并開啟checkpoint
final Configuration config = new Configuration();
Map<String, String> properties =
        new ImmutableMap.Builder<String, String>()
          .put(CatalogProperties.WAREHOUSE_LOCATION, "file:///Users/XXX/tests/iceberg_namespace")
          .put(TableProperties.DEFAULT_FILE_FORMAT, "parquet")
          .build();

final Catalog catalog = CatalogUtil.loadCatalog("org.apache.iceberg.hadoop.HadoopCatalog", "hadoop", properties, config);
// final HadoopCatalog catalog = new HadoopCatalog();
// catalog.setConf(config);
// catalog.initialize("hadoop", properties);
//  schema
final Schema schema = new Schema(
        required(1, "data", Types.StringType.get()),
        required(2, "nested", Types.StructType.of(
                Types.NestedField.required(3, "f1", Types.StringType.get()),
                Types.NestedField.required(4, "f2", Types.StringType.get()),
                Types.NestedField.required(5, "f3", Types.LongType.get()))),
        required(6, "id", Types.LongType.get()));
// TableIdentifier
final TableIdentifier tableIdentifier = TableIdentifier.of("iceberg_db", "iceberg_table");
Table table = null;
// table
if (catalog.tableExists(tableIdentifier)) {
  table = catalog.loadTable(tableIdentifier);
} else {
  table = catalog.createTable(tableIdentifier, schema);
}
// TableLoader
final TableLoader tableLoader = TableLoader.fromCatalog(CatalogLoader.hadoop("hadoop", config, properties), tableIdentifier);
// 省略部分代碼 datastream轉(zhuǎn)為DataStream<RowData>
// FlinkSink
FlinkSink.forRowData(DataStream<RowData>)
        .table(table)
        .tableLoader(tableLoader)
        .writeParallelism(parallelism)
        .append();
// =========  省略部分代碼  =========== // 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容