Flink 使用之 Oracle CDC

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

準(zhǔn)備工作

在這一步需要配置Oracle。主要包含。

  1. 開啟Archive log
  2. 開啟數(shù)據(jù)庫和數(shù)據(jù)表的supplemental log
  3. 創(chuàng)建CDC用戶并賦予權(quán)限

注意:不要使用Oracle的SYS和SYSTEM用戶做為CDC用戶。因為這兩個用戶能夠捕獲到大量Oracle數(shù)據(jù)庫內(nèi)部的變更信息,對于業(yè)務(wù)數(shù)據(jù)來說是不必要的。Debezium會過濾掉這兩個用戶捕獲到的變更內(nèi)容。

下面開始配置步驟。在安裝Oracle的機器上執(zhí)行:

su - oracle
sqlplus / as sysdba

進入Sqlplus。然后開啟Archive log。

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
# 檢查Archive log是否成功開啟
archive log list;

注意:

  1. 本步驟需要重啟數(shù)據(jù)庫,請選擇在合適的時間操作。
  2. 例子中的/opt/oracle/oradata/recovery_area目錄oracle用戶需要有讀寫權(quán)限。
  3. 如果執(zhí)行alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的時候報ORA-32001: write to SPFILE requested but no SPFILE is in use。需要檢查spfile文件。
show parameter spfile;
# 如果輸出value為空,說明沒有創(chuàng)建spfile,執(zhí)行下面SQL創(chuàng)建
create spfile from pfile;
# 關(guān)閉并重啟
shutdown immediate;
startup;
# 檢查spfile是否成功創(chuàng)建
show parameter spfile;

開啟數(shù)據(jù)庫和需要CDC的表的supplemental log:

ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

其中inventory.customers需要CDC的目標(biāo)表,格式為schema.table_name。

最后,我們需要創(chuàng)建CDC專用用戶,以及為它賦予權(quán)限。

# 示例路徑/opt/oracle/oradata/SID/,需要提前創(chuàng)建好并賦予權(quán)限
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;

GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;

注意:如果使用的是Oracle 11g,執(zhí)行GRANT LOGMINING TO flinkuser;會報沒有LOGMINING這個role,可忽略這個錯誤,不影響使用。如果使用12c版本賦權(quán)語句有所不同,可參考Debezium Connector for Oracle :: Debezium Documentation。

最后需要強調(diào)下,我們的Oracle CDC程序運行的時候可能會報出如下錯誤。

ORA-12514, TNS:listener does not currently know of service requested in connect descriptor

編輯listener.ora文件(不知道路徑的可以find一下),添加:

SID_LIST_LISTENER =
  (SID_LIST =
    (SID_DESC =
      (SID_NAME = ora11g)
      (ORACLE_HOME = /data/oracle/product/11.2.0/dbhome_1)
    )
  )

SID_NAMEORACLE_HOME改為真實的值,ORACLE_HOME可通過環(huán)境變量查看。

修改后別忘了執(zhí)行:

lsnrctl reload

重啟監(jiān)聽器。

到此為止,Oracle數(shù)據(jù)庫環(huán)境配置完畢。

項目依賴

pom.xml中添加如下依賴:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-oracle-cdc</artifactId>
    <!-- the dependency is available only for stable releases. -->
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

Oracle CDC SQL方式

直接上示例程序:

val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnvironment = TableEnvironment.create(bsSettings)

val sql =
    """
        |CREATE TABLE test (
        |     ID INT,
        |     NAME STRING,
        |     AGE INT
        |     ) WITH (
        |     'connector' = 'oracle-cdc',
        |     'hostname' = 'orcl11g.us.oracle.com',
        |     'port' = '1521',
        |     'username' = 'flinkuser',
        |     'password' = 'flinkpw',
        |     'database-name' = 'ora11g',
        |     'schema-name' = 'INVENTORY',
        |     'table-name' = 'CUSTOMERS'
        |     )
        |""".stripMargin

tableEnvironment.executeSql(sql)
// 如下兩種print數(shù)據(jù)方式都可以使用
// 方法 1
//    val result = tableEnvironment.executeSql("select * from test")
//    result.print()

// 方法 2
tableEnvironment.executeSql("CREATE TABLE sink_table (ID INT, NAME STRING, AGE INT) WITH (    'connector' = 'print')")
tableEnvironment.executeSql("INSERT INTO sink_table SELECT ID, NAME, AGE FROM test")

注意:Oracle字段默認會轉(zhuǎn)化為大寫。如果create table的時候沒有使用引號引住字段名,則字段名會被轉(zhuǎn)換為大寫。那么在Flink create table的時候字段也必須使用大寫。否則對應(yīng)字段的內(nèi)容會變成null,無法正常獲取到數(shù)據(jù)!Oracle中查看建表語句的方法為SELECT DBMS_METADATA.GET_DDL('TABLE','表名稱') FROM DUAL;

Oracle CDC API方式

除了使用SQL方式外,我們還可以使用DataStream API方式。

val sourceFunction: SourceFunction[String] = OracleSource
    .builder[String]
    .hostname("orcl11g.us.oracle.com")
    .port(1521)
    .database("ora11g")
    .schemaList("INVENTORY")
    .tableList("INVENTORY.CUSTOMERS")
    .username("flinkuser")
    .password("flinkpw")
    .deserializer(new JsonDebeziumDeserializationSchema)
    .build

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.addSource(sourceFunction).print.setParallelism(1) // use parallelism 1 for sink to keep message ordering
    env.execute()

注意:tableList參數(shù)有一個坑,必須配置為schema-name.table-name格式,否則會找不到數(shù)據(jù)表。和SQL中的table-name配置方式不同!

參考文獻

Oracle CDC Connector — Flink CDC 2.0.0 documentation (ververica.github.io)

Debezium Connector for Oracle :: Debezium Documentation

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容