flink13.2CDC-iceberg結(jié)合

1.根據(jù)網(wǎng)上文章,客戶端使用flink1.11.4+iceberg-flink-runtime-0.11.1.jar (iceberg0.12新出,使用即報(bào)錯(cuò))版本可正常操作。flink1.12.5 與flink1.13.2? 都嘗試過,皆報(bào)錯(cuò)(可能由于本人原因,尚未排查出錯(cuò)誤原因)。

?2.代碼端 flink cdc使用1.13.2 或者1.12.5 版本皆可,但pom配置某些包需降成1.11.1 不然會(huì)報(bào)缺包等錯(cuò)誤。本次操作為使用flinkcdc(flink-connector-mysql-cdc 2.0.0 jar)與flink 13.2 結(jié)合,實(shí)時(shí)監(jiān)控mysqlbinlog日志(需提前開啟binlog日志功能,此處可自行百度,修改mysql配置文件即可),入庫iceberg。此代碼很多版本問題,版本不一致會(huì)出現(xiàn)各種錯(cuò)誤,下面會(huì)本人使用pom文件和代碼,親測(cè)有效


3.pom文件

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

? ? ? ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

? ? ? ? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

? ? <modelVersion>4.0.0

? ? <groupId>org.example

? ? <artifactId>flink-mysql-iceberg

? ? <packaging>pom

? ? <version>1.0-SNAPSHOT

? ? ? ? <module>spark-hudi

? ? ? ? <java.version>1.8

? ? ? ? <maven.compiler.source>1.8

? ? ? ? <maven.compiler.target>1.8

? ? ? ? <spark.version>3.0.0

? ? ? ? <flink.version>1.12.5

? ? ? ? <scala.version>2.12

? ? ? ? <hadoop.version>3.1.3

? ? ? ? ? ? <groupId>org.slf4j

? ? ? ? ? ? <artifactId>slf4j-simple

? ? ? ? ? ? <version>1.7.25

? ? ? ? ? ? <groupId>org.projectlombok

? ? ? ? ? ? <artifactId>lombok

? ? ? ? ? ? <version>1.18.2

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-streaming-java_2.11

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-json

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-common

? ? ? ? ? ? <version>1.11.1

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-api-java

? ? ? ? ? ? <version>1.11.1

? ? ? ? <!-- 最新加入 -->

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-api-java-bridge_2.11

? ? ? ? ? ? <version>1.11.1

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-runtime-blink_2.11

? ? ? ? ? ? <version>1.11.1

? ? ? ? ? ? <groupId>com.alibaba

? ? ? ? ? ? <artifactId>fastjson

? ? ? ? ? ? <version>1.2.67

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-connector-jdbc_${scala.version}

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-clients_2.11

? ? ? ? ? ? <version>${flink.version}

? ? ? ? <!--后面追加的jar包-->

? ? ? ? ? ? <groupId>com.ververica

? ? ? ? ? ? <artifactId>flink-connector-mysql-cdc

? ? ? ? ? ? <version>2.0.0

? ? ? ? ? ? <groupId>org.apache.hadoop

? ? ? ? ? ? <artifactId>hadoop-common

? ? ? ? ? ? <version>2.7.7

? ? ? ? ? ? <scope>compile

? ? ? ? ? ? <groupId>org.apache.hadoop

? ? ? ? ? ? <artifactId>hadoop-hdfs

? ? ? ? ? ? <version>2.7.7

? ? ? ? ? ? <groupId>org.apache.iceberg

? ? ? ? ? ? <artifactId>iceberg-flink-runtime

? ? ? ? ? ? <version>0.11.0

? ? ? ? ? ? <groupId>org.apache.iceberg

? ? ? ? ? ? <artifactId>iceberg-flink

? ? ? ? ? ? <version>0.11.0

? ? ? ? ? ? <scope>provided

? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api

? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-planner-blink_2.11

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <scope>compile

? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api

? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table-planner_2.11

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <scope>compile

? ? ? ? ? ? ? ? ? ? <artifactId>slf4j-api

? ? ? ? ? ? ? ? ? ? <groupId>org.slf4j

? ? ? ? ? ? <groupId>org.apache.flink

? ? ? ? ? ? <artifactId>flink-table

? ? ? ? ? ? <version>${flink.version}

? ? ? ? ? ? <type>pom

? ? ? ? ? ? <scope>provided

? ? ? ? ? ? <groupId>mysql

? ? ? ? ? ? <artifactId>mysql-connector-java

? ? ? ? ? ? <version>8.0.16

? ? ? ? ? ? <scope>${scope}

? ? ? ? ? ? <groupId>org.apache.hadoop

? ? ? ? ? ? <artifactId>hadoop-mapreduce-client-core

? ? ? ? ? ? <version>3.1.3

? ? ? ? <!--新加入spark jar包-->

? ? ? ? ? ? ? ? <groupId>org.apache.maven.plugins

? ? ? ? ? ? ? ? <artifactId>maven-assembly-plugin

? ? ? ? ? ? ? ? <version>3.0.0

? ? ? ? ? ? ? ? ? ? ? ? <descriptorRef>jar-with-dependencies

? ? ? ? ? ? ? ? ? ? ? ? <id>make-assembly

? ? ? ? ? ? ? ? ? ? ? ? <phase>package

? ? ? ? ? ? ? ? ? ? ? ? ? ? <goal>single

</project>

以上4個(gè)類需降低到1.11.1版本

5.代碼:

import com.ververica.cdc.connectors.mysql.MySqlSource;

import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.CheckpointConfig;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.flink.table.api.DataTypes;

import org.apache.flink.table.api.TableColumn;

import org.apache.flink.table.api.TableSchema;

import org.apache.flink.table.data.RowData;

import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;

import org.apache.flink.table.types.DataType;

import org.apache.flink.table.types.inference.TypeTransformations;

import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.table.types.utils.DataTypeUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.*;

import org.apache.iceberg.catalog.Catalog;

import org.apache.iceberg.catalog.Namespace;

import org.apache.iceberg.catalog.TableIdentifier;

import org.apache.iceberg.flink.CatalogLoader;

import org.apache.iceberg.flink.TableLoader;

import org.apache.iceberg.flink.sink.FlinkSink;

import org.apache.iceberg.types.Types;

import java.time.ZoneId;

import java.util.Arrays;

import java.util.HashMap;

import java.util.Map;

public class rowData2iceberg {

private static final StringHADOOP_CATALOG ="iceberg_hadoop_catalog";

? ? //定義iceberg schema

? ? private static final SchemaSCHEMA =

new Schema(

Types.NestedField.optional(1, "id", Types.IntegerType.get()),

? ? ? ? ? ? ? ? ? ? Types.NestedField.optional(2, "image", Types.BinaryType.get())

//Types.NestedField.optional(3, "age", Types.IntegerType.get()),

//Types.NestedField.optional(4, "address", Types.StringType.get()),

//Types.NestedField.optional(5, "score1", Types.IntegerType.get()));

// Types.NestedField.optional(6, "school", Types.StringType.get());

//? Types.NestedField.optional(7, "class", Types.StringType.get())

? ? ? ? ? ? ? ? ? ? );

? ? public static void main(String[] args)throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? System.setProperty("HADOOP_USER_NAME","daizhihao");

? ? ? ? CheckpointConfig checkpointConfig = env.getCheckpointConfig();

? ? ? ? checkpointConfig.setCheckpointInterval(60 *1000L);

? ? ? ? checkpointConfig.setMinPauseBetweenCheckpoints(60 *1000L);

? ? ? ? checkpointConfig.setTolerableCheckpointFailureNumber(10);

? ? ? ? checkpointConfig.setCheckpointTimeout(12 *1000L);

? ? ? ? checkpointConfig.enableExternalizedCheckpoints(

CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

? ? ? ? //定義mysql的監(jiān)控字段

? ? ? ? TableSchema schema =

TableSchema.builder()

.add(TableColumn.of("id", DataTypes.INT()))

.add(TableColumn.of("image", DataTypes.BINARY(2)))

//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("age", DataTypes.INT()))

//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("address", DataTypes.STRING()))

//? ? ? ? ? ? ? ? ? ? ? ? .add(TableColumn.of("score1", DataTypes.INT()))

//.add(TableColumn.of("school", DataTypes.STRING()))

//.add(TableColumn.of("class", DataTypes.STRING()))

? ? ? ? ? ? ? ? ? ? ? ? .build();

? ? ? ? RowType rowType = (RowType) schema.toRowDataType().getLogicalType();

? ? ? ? DebeziumDeserializationSchema deserialer =

new RowDataDebeziumDeserializeSchema(

rowType,

? ? ? ? ? ? ? ? ? ? ? ? createTypeInfo(schema.toRowDataType()),

? ? ? ? ? ? ? ? ? ? ? ? (rowData, rowKind) -> {},

? ? ? ? ? ? ? ? ? ? ? ? ZoneId.of("Asia/Shanghai"));

? ? ? ? SourceFunction sourceFunction = MySqlSource.builder()

.hostname("localhost")

.serverTimeZone("UTC")

.port(3306)

.databaseList("demo")// monitor all tables under inventory database

? ? ? ? ? ? ? ? .tableList("demo.picture")

.username("root")

.password("root")

//? ? ? ? ? ? ? ? .hostname("Tgz3-eip-gzjfzxgputest1")

//? ? ? ? ? ? ? ? .port(23308)

//? ? ? ? ? ? ? ? .databaseList("eip_cs")

? ? ? ? ? ? ? ? .deserializer(deserialer)

// .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String

? ? ? ? ? ? ? ? .build();

? ? ? ? DataStreamSource src = env.addSource(sourceFunction);

//設(shè)置Checkpoint的模式:精準(zhǔn)一次

? ? ? ? env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

? ? ? ? icebergSink_hadoop(src);

? ? ? ? env

//.addSource(sourceFunction)

//.print()

? ? ? ? ? ? ? ? .setParallelism(1); // use parallelism 1 for sink to keep message ordering

? ? ? ? ? ? env.execute();

? ? }

private static TypeInformationcreateTypeInfo(DataType producedDataType) {

final DataType internalDataType =

DataTypeUtils.transform(producedDataType, TypeTransformations.TO_INTERNAL_CLASS);

? ? ? ? return (TypeInformation)

TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(internalDataType);

? ? }

private static void icebergSink_hadoop(DataStream src) {

Map properties =new HashMap<>();

? ? ? ? properties.put("type", "iceberg");

? ? ? ? properties.put("catalog-type", "hadoop");

? ? ? ? properties.put("property-version", "1");

? ? ? ? properties.put("warehouse", "hdfs://192.168.163.101:9000/user/hive/warehouse");

? ? ? ? CatalogLoader catalogLoader =

CatalogLoader.hadoop(HADOOP_CATALOG, new Configuration(), properties);

? ? ? ? icebergSink(src, catalogLoader);

? ? }

private static void icebergSink(DataStream input,? CatalogLoader loader) {

Catalog catalog = loader.loadCatalog();

? ? ? ? //iceberg 命名

? ? ? ? TableIdentifier identifier =

TableIdentifier.of(Namespace.of("iceberg_db"), "image5");

? ? ? ? Table table;

? ? ? ? if (catalog.tableExists(identifier)) {

table = catalog.loadTable(identifier);

? ? ? ? }else {

table =

catalog.buildTable(identifier, SCHEMA)

.withPartitionSpec(PartitionSpec.unpartitioned())

.create();

? ? ? ? }

// need to upgrade version to 2,otherwise 'java.lang.IllegalArgumentException: Cannot write

// delete files in a v1 table'

? ? ? ? TableOperations operations = ((BaseTable) table).operations();

? ? ? ? TableMetadata metadata = operations.current();

? ? ? ? operations.commit(metadata, metadata.upgradeToFormatVersion(2));

? ? ? ? TableLoader tableLoader = TableLoader.fromCatalog(loader, identifier);

? ? ? ? FlinkSink.forRowData(input)

.table(table)

.tableLoader(tableLoader)

.equalityFieldColumns(Arrays.asList("id"))

.writeParallelism(1)

.build();

? ? }

}

5.1代碼主要修改點(diǎn):

1.


該段代碼為iceberg建表字段(順序和mysql的一模一樣,名稱可以不一致,因?yàn)槿霂鞎r(shí),iceberg不會(huì)查看字段名稱,只會(huì)按照順序入庫)

2.

該段代碼是監(jiān)控mysql字段,名稱類型需一模一樣

3.

數(shù)據(jù)庫信息

4.

該處為iceberg配置信息

5.

iceberg_db 為數(shù)據(jù)庫,image5為表名

6.flink11.1客戶端查看入庫信息。

6.1啟動(dòng)flink 集群 :./start-cluster.sh。同時(shí)需自行啟動(dòng)hdfs集群。


6.2 查看當(dāng)前進(jìn)程,若出現(xiàn)StandaloneSessionClusterEntrypoint,TaskManagerRunner即代表成功。


6.3 進(jìn)去客戶端后,創(chuàng)建hadoop_catalog(注意:本人當(dāng)使用hive_catalog時(shí),插入數(shù)據(jù)時(shí)會(huì)報(bào)錯(cuò)。尚不清楚什么原因)

CREATE CATALOG hadoop_catalog WITH (

? 'type'='iceberg',

? 'catalog-type'='hadoop',

? 'warehouse'='hdfs://192.168.163.101:9000/user/hive/warehouse',

? 'property-version'='1'

);


6.4 使用hadoop_catalog及使用iceberg_db數(shù)據(jù)庫(若不存在,個(gè)人創(chuàng)建一個(gè) create database iceberg_db)


6.5 查看表數(shù)據(jù),若發(fā)現(xiàn)表中有數(shù)據(jù),及監(jiān)控日志成功。

select * from image5.


本人參考以上鏈接,綜合個(gè)人版本修改,得出以上結(jié)果

https://www.136.la/jingpin/show-126501.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容