1.根據(jù)網(wǎng)上文章,客戶端使用flink1.11.4+iceberg-flink-runtime-0.11.1.jar (iceberg0.12新出,使用即報(bào)錯(cuò))版本可正常操作。flink1.12.5 與flink1.13.2? 都嘗試過,皆報(bào)錯(cuò)(可能由于本人原因,尚未排查出錯(cuò)誤原因)。
?2.代碼端 flink cdc使用1.13.2 或者1.12.5 版本皆可,但pom配置某些包需降成1.11.1 不然會(huì)報(bào)缺包等錯(cuò)誤。本次操作為使用flinkcdc(flink-connector-mysql-cdc 2.0.0 jar)與flink 13.2 結(jié)合,實(shí)時(shí)監(jiān)控mysqlbinlog日志(需提前開啟binlog日志功能,此處可自行百度,修改mysql配置文件即可),入庫iceberg。此代碼很多版本問題,版本不一致會(huì)出現(xiàn)各種錯(cuò)誤,下面會(huì)本人使用pom文件和代碼,親測(cè)有效

3.pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
? ? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
? ? <modelVersion>4.0.0
? ? <groupId>org.example
? ? <artifactId>flink-mysql-iceberg
? ? <packaging>pom
? ? <version>1.0-SNAPSHOT
? ? ? ? <module>spark-hudi
? ? ? ? <java.version>1.8
? ? ? ? <maven.compiler.source>1.8
? ? ? ? <maven.compiler.target>1.8
? ? ? ? <spark.version>3.0.0
? ? ? ? <flink.version>1.12.5
? ? ? ? <scala.version>2.12
? ? ? ? <hadoop.version>3.1.3
? ? ? ? ? ? <groupId>org.slf4j
? ? ? ? ? ? <artifactId>slf4j-simple
? ? ? ? ? ? <version>1.7.25
? ? ? ? ? ? <groupId>org.projectlombok
? ? ? ? ? ? <artifactId>lombok
? ? ? ? ? ? <version>1.18.2
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-streaming-java_2.11
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-json
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-common
? ? ? ? ? ? <version>1.11.1
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-api-java
? ? ? ? ? ? <version>1.11.1
? ? ? ? <!-- 最新加入 -->
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-api-java-bridge_2.11
? ? ? ? ? ? <version>1.11.1
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-runtime-blink_2.11
? ? ? ? ? ? <version>1.11.1
? ? ? ? ? ? <groupId>com.alibaba
? ? ? ? ? ? <artifactId>fastjson
? ? ? ? ? ? <version>1.2.67
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-connector-jdbc_${scala.version}
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-clients_2.11
? ? ? ? ? ? <version>${flink.version}
? ? ? ? <!--后面追加的jar包-->
? ? ? ? ? ? <groupId>com.ververica
? ? ? ? ? ? <artifactId>flink-connector-mysql-cdc
? ? ? ? ? ? <version>2.0.0
? ? ? ? ? ? <groupId>org.apache.hadoop
? ? ? ? ? ? <artifactId>hadoop-common
? ? ? ? ? ? <version>2.7.7
? ? ? ? ? ? <scope>compile
? ? ? ? ? ? <groupId>org.apache.hadoop
? ? ? ? ? ? <artifactId>hadoop-hdfs
? ? ? ? ? ? <version>2.7.7
? ? ? ? ? ? <groupId>org.apache.iceberg
? ? ? ? ? ? <artifactId>iceberg-flink-runtime
? ? ? ? ? ? <version>0.11.0
? ? ? ? ? ? <groupId>org.apache.iceberg
? ? ? ? ? ? <artifactId>iceberg-flink
? ? ? ? ? ? <version>0.11.0
? ? ? ? ? ? <scope>provided
? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api
? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-planner-blink_2.11
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <scope>compile
? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api
? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table-planner_2.11
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <scope>compile
? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api
? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j
? ? ? ? ? ? <groupId>org.apache.flink
? ? ? ? ? ? <artifactId>flink-table
? ? ? ? ? ? <version>${flink.version}
? ? ? ? ? ? <type>pom
? ? ? ? ? ? <scope>provided
? ? ? ? ? ? <groupId>mysql
? ? ? ? ? ? <artifactId>mysql-connector-java
? ? ? ? ? ? <version>8.0.16
? ? ? ? ? ? <scope>${scope}
? ? ? ? ? ? <groupId>org.apache.hadoop
? ? ? ? ? ? <artifactId>hadoop-mapreduce-client-core
? ? ? ? ? ? <version>3.1.3
? ? ? ? <!--新加入spark jar包-->
? ? ? ? ? ? ? ? <groupId>org.apache.maven.plugins
? ? ? ? ? ? ? ? <artifactId>maven-assembly-plugin
? ? ? ? ? ? ? ? <version>3.0.0
? ? ? ? ? ? ? ? ? ? ? ? <descriptorRef>jar-with-dependencies
? ? ? ? ? ? ? ? ? ? ? ? <id>make-assembly
? ? ? ? ? ? ? ? ? ? ? ? <phase>package
? ? ? ? ? ? ? ? ? ? ? ? ? ? <goal>single
</project>

5.代碼:
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeTransformations;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.types.Types;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class rowData2iceberg {
private static final StringHADOOP_CATALOG ="iceberg_hadoop_catalog";
? ? //定義iceberg schema
? ? private static final SchemaSCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
? ? ? ? ? ? ? ? ? ? Types.NestedField.optional(2, "image", Types.BinaryType.get())
//Types.NestedField.optional(3, "age", Types.IntegerType.get()),
//Types.NestedField.optional(4, "address", Types.StringType.get()),
//Types.NestedField.optional(5, "score1", Types.IntegerType.get()));
// Types.NestedField.optional(6, "school", Types.StringType.get());
//? Types.NestedField.optional(7, "class", Types.StringType.get())
? ? ? ? ? ? ? ? ? ? );
? ? public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? ? ? ? System.setProperty("HADOOP_USER_NAME","daizhihao");
? ? ? ? CheckpointConfig checkpointConfig = env.getCheckpointConfig();
? ? ? ? checkpointConfig.setCheckpointInterval(60 *1000L);
? ? ? ? checkpointConfig.setMinPauseBetweenCheckpoints(60 *1000L);
? ? ? ? checkpointConfig.setTolerableCheckpointFailureNumber(10);
? ? ? ? checkpointConfig.setCheckpointTimeout(12 *1000L);
? ? ? ? checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
? ? ? ? //定義mysql的監(jiān)控字段
? ? ? ? TableSchema schema =
TableSchema.builder()
.add(TableColumn.of("id", DataTypes.INT()))
.add(TableColumn.of("image", DataTypes.BINARY(2)))
//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("age", DataTypes.INT()))
//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("address", DataTypes.STRING()))
//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("score1", DataTypes.INT()))
//.add(TableColumn.of("school", DataTypes.STRING()))
//.add(TableColumn.of("class", DataTypes.STRING()))
? ? ? ? ? ? ? ? ? ? ? ? .build();
? ? ? ? RowType rowType = (RowType) schema.toRowDataType().getLogicalType();
? ? ? ? DebeziumDeserializationSchema deserialer =
new RowDataDebeziumDeserializeSchema(
rowType,
? ? ? ? ? ? ? ? ? ? ? ? createTypeInfo(schema.toRowDataType()),
? ? ? ? ? ? ? ? ? ? ? ? (rowData, rowKind) -> {},
? ? ? ? ? ? ? ? ? ? ? ? ZoneId.of("Asia/Shanghai"));
? ? ? ? SourceFunction sourceFunction = MySqlSource.builder()
.hostname("localhost")
.serverTimeZone("UTC")
.port(3306)
.databaseList("demo")// monitor all tables under inventory database
? ? ? ? ? ? ? ? .tableList("demo.picture")
.username("root")
.password("root")
//? ? ? ? ? ? ? ? .hostname("Tgz3-eip-gzjfzxgputest1")
//? ? ? ? ? ? ? ? .port(23308)
//? ? ? ? ? ? ? ? .databaseList("eip_cs")
? ? ? ? ? ? ? ? .deserializer(deserialer)
// .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
? ? ? ? ? ? ? ? .build();
? ? ? ? DataStreamSource src = env.addSource(sourceFunction);
//設(shè)置Checkpoint的模式:精準(zhǔn)一次
? ? ? ? env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
? ? ? ? icebergSink_hadoop(src);
? ? ? ? env
//.addSource(sourceFunction)
//.print()
? ? ? ? ? ? ? ? .setParallelism(1); // use parallelism 1 for sink to keep message ordering
? ? ? ? ? ? env.execute();
? ? }
private static TypeInformationcreateTypeInfo(DataType producedDataType) {
final DataType internalDataType =
DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);
? ? ? ? return (TypeInformation)
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);
? ? }
private static void icebergSink_hadoop(DataStream src) {
Map properties =new HashMap<>();
? ? ? ? properties.put("type", "iceberg");
? ? ? ? properties.put("catalog-type", "hadoop");
? ? ? ? properties.put("property-version", "1");
? ? ? ? properties.put("warehouse", "hdfs://192.168.163.101:9000/user/hive/warehouse");
? ? ? ? CatalogLoader catalogLoader =
CatalogLoader.hadoop(HADOOP_CATALOG, new Configuration(), properties);
? ? ? ? icebergSink(src, catalogLoader);
? ? }
private static void icebergSink(DataStream input,? CatalogLoader loader) {
Catalog catalog = loader.loadCatalog();
? ? ? ? //iceberg 命名
? ? ? ? TableIdentifier identifier =
TableIdentifier.of(Namespace.of("iceberg_db"), "image5");
? ? ? ? Table table;
? ? ? ? if (catalog.tableExists(identifier)) {
table = catalog.loadTable(identifier);
? ? ? ? }else {
table =
catalog.buildTable(identifier, SCHEMA)
.withPartitionSpec(PartitionSpec.unpartitioned())
.create();
? ? ? ? }
// need to upgrade version to 2,otherwise 'java.lang.IllegalArgumentException: Cannot write
// delete files in a v1 table'
? ? ? ? TableOperations operations = ((BaseTable) table).operations();
? ? ? ? TableMetadata metadata = operations.current();
? ? ? ? operations.commit(metadata, metadata.upgradeToFormatVersion(2));
? ? ? ? TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);
? ? ? ? FlinkSink.forRowData(input)
.table(table)
.tableLoader(tableLoader)
.equalityFieldColumns(Arrays.asList("id"))
.writeParallelism(1)
.build();
? ? }
}
5.1代碼主要修改點(diǎn):
1.

2.

3.

4.

5.

6.flink11.1客戶端查看入庫信息。
6.1啟動(dòng)flink 集群 :./start-cluster.sh。同時(shí)需自行啟動(dòng)hdfs集群。

6.2 查看當(dāng)前進(jìn)程,若出現(xiàn)StandaloneSessionClusterEntrypoint,TaskManagerRunner即代表成功。

6.3 進(jìn)去客戶端后,創(chuàng)建hadoop_catalog(注意:本人當(dāng)使用hive_catalog時(shí),插入數(shù)據(jù)時(shí)會(huì)報(bào)錯(cuò)。尚不清楚什么原因)
CREATE CATALOG hadoop_catalog WITH (
? 'type'='iceberg',
? 'catalog-type'='hadoop',
? 'warehouse'='hdfs://192.168.163.101:9000/user/hive/warehouse',
? 'property-version'='1'
);

6.4 使用hadoop_catalog及使用iceberg_db數(shù)據(jù)庫(若不存在,個(gè)人創(chuàng)建一個(gè) create database iceberg_db)

6.5 查看表數(shù)據(jù),若發(fā)現(xiàn)表中有數(shù)據(jù),及監(jiān)控日志成功。
select * from image5.

本人參考以上鏈接,綜合個(gè)人版本修改,得出以上結(jié)果
https://www.136.la/jingpin/show-126501.html