一、概述
基于Iceberg-Flink- V1相關(guān)api來完成的數(shù)據(jù)寫入

寫入過程
二、實(shí)現(xiàn)
關(guān)鍵源碼---FlinkSink.chainIcebergOperators
private <T> DataStreamSink<T> chainIcebergOperators() {
// 省略部分代碼
// 加載iceberg table
if (table == null) {
tableLoader.open(); // open TableLoader
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);
}
}
// 需將icebeg table schema轉(zhuǎn)為Flink Row類型
// Convert the requested flink table schema to flink row type.
RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// 根據(jù)指定partition field并且對應(yīng)的分發(fā)模式是hash,通過keyBy(equality fields)來將輸入進(jìn)行分發(fā)
// 若是分發(fā)模式是None不需要對輸入流做任何處理直接返回
// 若是分發(fā)模式是Range 當(dāng)前版本還不支持
// Distribute the records from input data stream based on the write.distribution-mode.
DataStream<RowData> distributeStream = distributeDataStream(
rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
// 此處通過并行writer來完成datafile寫入
// 不過需要注意的寫入是否開啟upsert,一旦開啟則不能對datafile進(jìn)行overwrite只能使用append
// Add parallel writers that append rows to files
SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType);
// 在checkpoint完成時或輸入流結(jié)束,則通過單并行度完成data file的提交
// Add single-parallelism committer that commits files
// after successful checkpoint or end of input
SingleOutputStreamOperator<Void> committerStream = appendCommitter(writerStream);
// 若是本次checkpoint成功,但commit文件失敗,通過指定虛擬discard接收器在下次checkpoint成功在進(jìn)行提交
// Add dummy discard sink
return appendDummySink(committerStream);
}
操作代碼拆解
- distributeDataStream:輸入流內(nèi)容分組
private DataStream<RowData> distributeDataStream(DataStream<RowData> input,
Map<String, String> properties,
PartitionSpec partitionSpec,
Schema iSchema,
RowType flinkRowType) {
// 首先獲取指定的write.distribution-mode
DistributionMode writeMode;
if (distributionMode == null) { // 默認(rèn)使用NONE
// Fallback to use distribution mode parsed from table properties if don't specify in job level.
String modeName = PropertyUtil.propertyAsString(properties,
WRITE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_DEFAULT);
writeMode = DistributionMode.fromName(modeName);
} else { // HASH和RANGE
writeMode = distributionMode;
}
switch (writeMode) {
case NONE: // 不需要做任何處理 直接返回
return input;
case HASH: // 目前只要時按照partition field來進(jìn)行分組的
if (partitionSpec.isUnpartitioned()) { // 未分區(qū)
return input;
} else { // 若是分區(qū),則通過partition field 進(jìn)行keyBy
return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
}
case RANGE: // 目前不支持,直接返回
LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now",
WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName());
return input;
default:
throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode);
}
}
}
- appendWriter: 并行寫入datafile
private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
// 1、獲取指定EquailityFields
// Find out the equality field id list based on the user-provided equality field column names.
List<Integer> equalityFieldIds = Lists.newArrayList();
if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",
column, table.schema());
equalityFieldIds.add(field.fieldId());
}
}
// 2、判斷當(dāng)前操作是否時upsert(可以通過job級別和table級別進(jìn)行設(shè)置)
// Fallback to use upsert mode parsed from table properties if don't specify in job level.
boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
// Validate the equality fields and partition fields if we enable the upsert mode.
// 若是當(dāng)前操作upsert 則通過需要進(jìn)行EquailityFields檢查(若是寫入時分區(qū)表,則對應(yīng)的partition field也需要存在其中),
// 并且當(dāng)前針對datafile操作是不能進(jìn)行overwrite
if (upsertMode) {
// upsert操作下 不能進(jìn)行overwrite
Preconditions.checkState(!overwrite,
"OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
// upsert操作下 對應(yīng)的EquailityFields是不允許為空
Preconditions.checkState(!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
// 是否分區(qū)表
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) { // 分區(qū)表對應(yīng)的分區(qū)字段必須存在EquailityFields集合中
Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
"In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
partitionField, equalityFieldColumns);
}
}
}
// 3、構(gòu)建StreamWriter
IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
// 3.1 將輸入流寫入到datafile是可以并發(fā)進(jìn)行的
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
.transform(operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter)
.setParallelism(parallelism);
if (uidPrefix != null) {
writerStream = writerStream.uid(uidPrefix + "-writer");
}
return writerStream;
}
// 補(bǔ)充代碼: 創(chuàng)建StreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
RowType flinkRowType,
List<Integer> equalityFieldIds,
boolean upsert) {
// 1、獲取指定的table 配置信息
Preconditions.checkArgument(table != null, "Iceberg table should't be null");
Map<String, String> props = table.properties();
// 1.1 目標(biāo)文件大小
long targetFileSize = getTargetFileSizeBytes(props);
// 1.2 file format(orc/parquet/avro)
FileFormat fileFormat = getFileFormat(props);
// 2、構(gòu)建一個read-only且可序列化的表
Table serializableTable = SerializableTable.copyOf(table);
// 3、構(gòu)建TaskWriter
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
serializableTable, flinkRowType, targetFileSize,
fileFormat, equalityFieldIds, upsert);
// 4、創(chuàng)建StreamWriter
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
- appendCommitter: 當(dāng)checkpoint成功或輸入流結(jié)束,通過文件提交器完成對應(yīng)的文件commit
private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
// 1、構(gòu)建FilesCommitter
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
// 2、單并行度文件提交器
SingleOutputStreamOperator<Void> committerStream = writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
.setParallelism(1)
.setMaxParallelism(1);
if (uidPrefix != null) {
committerStream = committerStream.uid(uidPrefix + "-committer");
}
return committerStream;
}
三、實(shí)例:基于HadoopCatalog進(jìn)行iceberg數(shù)據(jù)寫入
// 省略部分代碼
// flink datastream api并開啟checkpoint
final Configuration config = new Configuration();
Map<String, String> properties =
new ImmutableMap.Builder<String, String>()
.put(CatalogProperties.WAREHOUSE_LOCATION, "file:///Users/XXX/tests/iceberg_namespace")
.put(TableProperties.DEFAULT_FILE_FORMAT, "parquet")
.build();
final Catalog catalog = CatalogUtil.loadCatalog("org.apache.iceberg.hadoop.HadoopCatalog", "hadoop", properties, config);
// final HadoopCatalog catalog = new HadoopCatalog();
// catalog.setConf(config);
// catalog.initialize("hadoop", properties);
// schema
final Schema schema = new Schema(
required(1, "data", Types.StringType.get()),
required(2, "nested", Types.StructType.of(
Types.NestedField.required(3, "f1", Types.StringType.get()),
Types.NestedField.required(4, "f2", Types.StringType.get()),
Types.NestedField.required(5, "f3", Types.LongType.get()))),
required(6, "id", Types.LongType.get()));
// TableIdentifier
final TableIdentifier tableIdentifier = TableIdentifier.of("iceberg_db", "iceberg_table");
Table table = null;
// table
if (catalog.tableExists(tableIdentifier)) {
table = catalog.loadTable(tableIdentifier);
} else {
table = catalog.createTable(tableIdentifier, schema);
}
// TableLoader
final TableLoader tableLoader = TableLoader.fromCatalog(CatalogLoader.hadoop("hadoop", config, properties), tableIdentifier);
// 省略部分代碼 datastream轉(zhuǎn)為DataStream<RowData>
// FlinkSink
FlinkSink.forRowData(DataStream<RowData>)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
.append();
// ========= 省略部分代碼 =========== //