利用ogg實(shí)現(xiàn)oracle到kafka的增量數(shù)據(jù)實(shí)時(shí)同步

[toc]

前言(為了安裝oraclegoldgate for oracle準(zhǔn)備)

通過X11實(shí)現(xiàn) Linux服務(wù)器圖形化界面顯示

1 背景描述

有些LINUX服務(wù)器出于性能和效率的考慮,通常都是沒有安裝圖形化界面的,那么圖形化程序在服務(wù)器上壓根兒就跑不起來,或者無法直接顯示出來,這就很尷尬了!那么如何解決這個(gè)問題呢?可以基于X11 Forwarding技術(shù) + MobaXterm 工具,就可以輕松搞定,似不似很簡單?

2 原理介紹

2.1 X協(xié)議

Linux 本身是沒有圖形化界面的,所謂的圖形化界面系統(tǒng)只不過中 Linux 下的應(yīng)用程序。這一點(diǎn)和 Windows 不一樣。Windows 從 Windows 95 開始,圖形界面就直接在系統(tǒng)內(nèi)核中實(shí)現(xiàn)了,是操作系統(tǒng)不可或缺的一部分。Linux 的圖形化界面,底層都是基于 X 協(xié)議。
X 協(xié)議由 X server 和 X client 組成:


image.png

l X server 管理主機(jī)上與顯示相關(guān)的硬件設(shè)置(如顯卡、硬盤、鼠標(biāo)等),它負(fù)責(zé)屏幕畫面的繪制與顯示,以及將輸入設(shè)置(如鍵盤、鼠標(biāo))的動作告知 X client。

l X client (即 X 應(yīng)用程序) 則主要負(fù)責(zé)事件的處理(即程序的邏輯)。

舉個(gè)例子,如果用戶點(diǎn)擊了鼠標(biāo)左鍵,因?yàn)槭髽?biāo)歸 X server 管理,于是 X server 就捕捉到了鼠標(biāo)點(diǎn)擊這個(gè)動作,然后它將這個(gè)動作告訴 X client,因?yàn)?X client 負(fù)責(zé)程序邏輯,于是 X client 就根據(jù)程序預(yù)先設(shè)定的邏輯(例如畫一個(gè)圓),告訴 X server 說:“請?jiān)谑髽?biāo)點(diǎn)擊的位置,畫一個(gè)圓”。最后,X server 就響應(yīng) X client 的請求,在鼠標(biāo)點(diǎn)擊的位置,繪制并顯示出一個(gè)圓。

2.2 X11 Forwarding

image.png

這么繞,有啥意義呢?當(dāng)然有!
許多時(shí)候 X server 和 X client 在同一臺主機(jī)上,這看起來沒什么。但是, X server 和 X client 完全可以運(yùn)行在不同的機(jī)器上,只要彼此通過 X 協(xié)議通信即可。于是,我們就可以做一些“神奇”的事情,比如像本文開頭談到的,在本地顯示 (X server),運(yùn)行在服務(wù)器上的 GUI 程序 (X client)。這樣的操作可以通過 SSH X11 Forwarding (轉(zhuǎn)發(fā)) 來實(shí)現(xiàn)。
X11 中的 X 指的就是 X 協(xié)議,11 指的是采用 X 協(xié)議的第 11 個(gè)版本。

2.3 MobaXterm

image.png

那 MobaXterm 又是什么鬼?MobaXterm 是一款開源、免費(fèi)的、全功能終端軟件。它與 PuTTY 類似,但卻比 PuTTY 要強(qiáng)大得多,其中一個(gè)很實(shí)用的功能就是 MobaXterm 自帶 X Server。這樣我們就不用勞神地去想怎么在 Windows 上啟動 X server 了。

小結(jié)一下,整個(gè)實(shí)現(xiàn)邏輯就是:
本地機(jī)器采用 MobaXterm (自帶 X server) 連接遠(yuǎn)程服務(wù)器。然后,在服務(wù)器上運(yùn)行 GUI 程序 (即 X client),通過 SSH X11 Forwarding,轉(zhuǎn)發(fā)到本地 (Windows 機(jī)器上)。

3 實(shí)戰(zhàn)介紹

3.1 服務(wù)器端(LINUX7.x)

3.1.1 修改SSH配置
[root@linux-template-7 ~]# cat /etc/ssh/sshd_config

X11Forwarding yes

X11UseLocalhost no -- 禁止將X11轉(zhuǎn)發(fā)請求綁定到本地回環(huán)地址上

AddressFamily inet -- 強(qiáng)制使用IPv4通道

配置不對請修改成上面這樣

3.1.2 重啟SSH服務(wù)
[root@linux-template-7 ~]# systemctl restart sshd.service

3.2 客戶端(WIN7)

3.2.1 安裝MobaXterm

https://mobaxterm.mobatek.net/download.html
image.png

3.2.2 打開本地終端

image.png

3.2.3 SSH連接服務(wù)器

image.png

3.2.4 執(zhí)行xclock程序

已經(jīng)彈出xclock圖形界面了,似不似很簡單,似不似很Easy,那就一起實(shí)踐下吧:)


image.png

正文

ogg即Oracle GoldenGate是Oracle的同步工具,本文講如何配置ogg以實(shí)現(xiàn)Oracle數(shù)據(jù)庫增量數(shù)據(jù)實(shí)時(shí)同步到kafka中,其中同步消息格式為json。
下面是我的源端和目標(biāo)端的一些配置信息:

  • | 版本 | OGG版本 | ip | 別名
    ---|--- |--- |--- |---
    源端| Oracle 12c |Oracle GoldenGate 19.1.0.0 | 192.168.8.22| oracle|
    目標(biāo)端| kafka_2.12-2.3.0 |Oracle GoldenGate for Big Data 19.1.0.0 | 192.168.8.31| bigdata |

1、下載

可在這里舊版本查詢下載
注意:源端和目標(biāo)端的文件不一樣,目標(biāo)端需要下載Oracle GoldenGate for Big Data,源端需要下載Oracle GoldenGate for Oracle具體下載方法見最后的附錄截圖。

2、源端(Oracle)配置

注意:源端是安裝了oracle的機(jī)器,oracle環(huán)境變量之前都配置好了

2.1 解壓

先建立ogg目錄

mkdir -p /opt/ogg
unzip V983320-01.zip

解壓后得到一個(gè)tar包,再解壓這個(gè)tar

tar xf fbo_ggs_Linux_x64_shiphome.tar -C /opt/ogg
chown -R oracle:oinstall /opt/ogg

(使oracle用戶有ogg的權(quán)限,后面有些需要在oracle用戶下執(zhí)行才能成功)

2.2 配置ogg環(huán)境變量

為了簡單方便起見,我在/etc/profile里配置的,建議在生產(chǎn)中配置oracle的環(huán)境變量文件/home/oracle/.bash_profile里配置,為了怕出問題,我把OGG_HOME等環(huán)境變量在/etc/profile配置了一份,不知道這是否是必須的。

vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$ORACLE_HOME/lib:/usr/lib
export PATH=$OGG_HOME:$PATH

使之生效

source /etc/profile

測試一下ogg命令

cd /opt/ogg
./runInstaller

按照上面得安裝以后會本mobaxtern彈出一個(gè)oracle安裝對話框


image.png

點(diǎn)擊下一步


image.png
image.png
image.png
image.png

如果命令成功即可進(jìn)行下一步,不成功請檢查前面的步驟

2.3 oracle打開歸檔模式

su - oracle
sqlplus / as sysdba

執(zhí)行下面的命令查看當(dāng)前是否為歸檔模式

archive log list
SQL> archive log list 
Database log mode          No Archive Mode
Automatic archival         Disabled
Archive destination        USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     12
Current log sequence           14

若為Disabled,手動打開即可

conn / as sysdba (以DBA身份連接數(shù)據(jù)庫) 
shutdown immediate (立即關(guān)閉數(shù)據(jù)庫)
startup mount (啟動實(shí)例并加載數(shù)據(jù)庫,但不打開)
alter database archivelog; (更改數(shù)據(jù)庫為歸檔模式)
alter database open; (打開數(shù)據(jù)庫)
alter system archive log start; (啟用自動歸檔)

再執(zhí)行一下

archive log list
Database log mode          Archive Mode
Automatic archival         Enabled
Archive destination        USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence     12
Next log sequence to archive   14
Current log sequence           14

可以看到為Enabled,則成功打開歸檔模式。

2.4 Oracle打開日志相關(guān)

OGG基于輔助日志等進(jìn)行實(shí)時(shí)傳輸,故需要打開相關(guān)日志確保可獲取事務(wù)內(nèi)容,通過下面的命令查看該狀態(tài)

select force_logging, supplemental_log_data_min from v$database;
FORCE_ SUPPLEMENTAL_LOG
------ ----------------
NO     NO

若為NO,則需要通過命令修改

alter database force logging;
alter database add supplemental log data;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION = TRUE;#設(shè)置允許GoldDate讀取日志

再查看一下為YES即可

SQL> select force_logging, supplemental_log_data_min from v$database;

FORCE_ SUPPLEMENTAL_LOG
------ ----------------
YES    YES

2.5 oracle創(chuàng)建復(fù)制用戶

首先root用戶建立相關(guān)文件夾,并賦予權(quán)限

mkdir -p /u01/app/oracle/oggdata/orcl
chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl

然后執(zhí)行下面sql

SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

Tablespace created.

SQL>  create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;

Grant succeeded.

2.6 OGG初始化

ggsci
create subdirs
ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 19.1.0.0.2 OGGCORE_19.1.0.0.0_PLATFORMS_190823.0013_FBO
Linux, x64, 64bit (optimized), Oracle 12c on Aug 23 2019 20:05:42
Operating system character set identified as UTF-8.

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.



GGSCI (crmtstmicserv) 1> create subdirs

Creating subdirectories under current directory /root

Parameter files                /root/dirprm: created
Report files                   /root/dirrpt: created
Checkpoint files               /root/dirchk: created
Process status files           /root/dirpcs: created
SQL script files               /root/dirsql: created
Database definitions files     /root/dirdef: created
Extract data files             /root/dirdat: created
Temporary files                /root/dirtmp: created
Stdout files                   /root/dirout: created


GGSCI (crmtstmicserv) 2>

2.7 Oracle創(chuàng)建測試表

創(chuàng)建一個(gè)用戶,在該用戶下新建測試表,用戶名、密碼、表名均為 test_ogg。

create user test_ogg  identified by test_ogg default tablespace users;
grant dba to test_ogg;
conn test_ogg/test_ogg;
create table test_ogg(id int ,name varchar(20),primary key(id));

3 目標(biāo)端(kafka)配置

3.1 解壓

mkdir -p /opt/ogg
unzip 123111_ggs_Adapters_Linux_x64.zip 
tar xf ggs_Adapters_Linux_x64.tar  -C /opt/ogg/

3.2 環(huán)境變量

vim /etc/profile
export OGG_HOME=/opt/ogg
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib
export PATH=$OGG_HOME:$PATH
source /etc/profile

同樣測試一下ogg命令

ggsci

3.3 初始化目錄

create subdirs

4、OGG源端配置

4.1 配置OGG的全局變量

先切換到oracle用戶下

su oracle
cd /opt/ogg
ggsci

GGSCI (crmtstmicserv) 1> dblogin userid ogg password ogg
Successfully logged into database.
GGSCI (crmtstmicserv) 2> edit param ./globals

然后和用vim編輯一樣添加

oggschema ogg

4.2 配置管理器mgr

GGSCI (crmtstmicserv) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

說明:PORT即mgr的默認(rèn)監(jiān)聽端口;DYNAMICPORTLIST動態(tài)端口列表,當(dāng)指定的mgr端口不可用時(shí),會在這個(gè)端口列表中選擇一個(gè),最大指定范圍為256個(gè);AUTORESTART重啟參數(shù)設(shè)置表示重啟所有EXTRACT進(jìn)程,最多5次,每次間隔3分鐘;PURGEOLDEXTRACTS即TRAIL文件的定期清理

4.3 添加復(fù)制表或者SCHEMA

GGSCI (crmtstmicserv) 4> add trandata test_ogg.test_ogg

Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.

GGSCI (crmtstmicserv) 5> info trandata test_ogg.test_ogg

Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.

Columns supplementally logged for table TEST_OGG.TEST_OGG: ID

如果添加按照schema 如下配置:

GGSCI (crmtstmicserv) 4> add schematrandata crm_report ALLCOLS #crm_report為我的schema名

Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.

GGSCI (crmtstmicserv) 5> info schematrandata crm_report

4.4 配置extract進(jìn)程

GGSCI (crmtstmicserv) 6> edit param extkafka
extract extkafka
dynamicresolution
SETENV (ORACLE_SID = "orcl")
SETENV (NLS_LANG = "american_america.AL32UTF8")
userid ogg,password ogg
exttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

schema的話把table 這一行換成:

table crm_report.*;

說明:第一行指定extract進(jìn)程名稱;dynamicresolution動態(tài)解析;SETENV設(shè)置環(huán)境變量,這里分別設(shè)置了Oracle數(shù)據(jù)庫以及字符集;userid ggs,password ggs即OGG連接Oracle數(shù)據(jù)庫的帳號密碼,這里使用2.5中特意創(chuàng)建的復(fù)制帳號;exttrail定義trail文件的保存位置以及文件名,注意這里文件名只能是2個(gè)字母,其余部分OGG會補(bǔ)齊;table即復(fù)制表的表名,支持*通配,必須以;結(jié)尾

添加extract進(jìn)程:

GGSCI (crmtstmicserv) 16> add extract extkafka,tranlog,begin now
EXTRACT added.

(注:若報(bào)錯(cuò)

ERROR: Could not create checkpoint file /opt/ogg/dirchk/EXTKAFKA.cpe (error 2, No such file or directory).

執(zhí)行下面的命令再重新添加即可。

create subdirs

添加trail文件的定義與extract進(jìn)程綁定:

GGSCI (crmtstmicserv) 17> add exttrail /opt/ogg/dirdat/to,extract extkafka
EXTTRAIL added.

4.5 配置pump進(jìn)程

pump進(jìn)程本質(zhì)上來說也是一個(gè)extract,只不過他的作用僅僅是把trail文件傳遞到目標(biāo)端,配置過程和extract進(jìn)程類似,只是邏輯上稱之為pump進(jìn)程

GGSCI (crmtstmicserv) 18> edit param pukafka
extract pukafka
passthru
dynamicresolution
userid ogg,password ogg
rmthost 192.168.44.129 mgrport 7809
rmttrail /opt/ogg/dirdat/to
table test_ogg.test_ogg;

schema的話把table 這一行換成:

table crm_report.*;

說明:第一行指定extract進(jìn)程名稱;passthru即禁止OGG與Oracle交互,我們這里使用pump邏輯傳輸,故禁止即可;dynamicresolution動態(tài)解析;userid ogg,password ogg即OGG連接Oracle數(shù)據(jù)庫的帳號密碼rmthost和mgrhost即目標(biāo)端(kafka)OGG的mgr服務(wù)的地址以及監(jiān)聽端口;rmttrail即目標(biāo)端trail文件存儲位置以及名稱。

分別將本地trail文件和目標(biāo)端的trail文件綁定到extract進(jìn)程:

GGSCI (crmtstmicserv) 1> add extract pukafka,exttrailsource /opt/ogg/dirdat/to
EXTRACT added.
GGSCI (crmtstmicserv) 2> add rmttrail /opt/ogg/dirdat/to,extract pukafka
RMTTRAIL added.

4.6 配置define文件

Oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數(shù)據(jù)傳輸可以定義為異構(gòu)數(shù)據(jù)類型的傳輸,故需要定義表之間的關(guān)系映射,在OGG命令行執(zhí)行:

GGSCI (crmtstmicserv) 3> edit param test_ogg
defsfile /opt/ogg/dirdef/test_ogg.test_ogg
userid ogg,password ogg
table test_ogg.test_ogg;

schema的話把table 這一行換成:

table crm_report.*;

在OGG主目錄下執(zhí)行(oracle用戶):

./defgen paramfile dirprm/crm_report.prm

***********************************************************************
        Oracle GoldenGate Table Definition Generator for Oracle
 Version 11.2.1.0.3 14400833 OGGCORE_11.2.1.0.3_PLATFORMS_120823.1258
   Linux, x64, 64bit (optimized), Oracle 11g on Aug 23 2012 16:58:29
 
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.


                    Starting at 2018-05-23 05:03:04
***********************************************************************

Operating System Version:
Linux
Version #1 SMP Wed Apr 12 15:04:24 UTC 2017, Release 3.10.0-514.16.1.el7.x86_64
Node: crmtstmicserv
Machine: x86_64
                         soft limit   hard limit
Address Space Size   :    unlimited    unlimited
Heap Size            :    unlimited    unlimited
File Size            :    unlimited    unlimited
CPU Time             :    unlimited    unlimited

Process id: 13126

***********************************************************************
**            Running with the following parameters                  **
***********************************************************************
defsfile /opt/ogg/dirdef/crm_report
userid ogg,password ***
table crm_report.*;
Retrieving definition for TEST_OGG.TEST_OGG



Definitions generated for 1 table in /opt/ogg/dirdef/crm_report

將生成的/opt/ogg/dirdef/crm_report發(fā)送的目標(biāo)端ogg目錄下的dirdef里:

scp -r /opt/ogg/dirdef/crm_report root@slave1:/opt/ogg/dirdef/

5、OGG目標(biāo)端配置

5.1 開啟kafka服務(wù)

cd /opt/kafka_2.11-1.1.0/
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

5.2 配置管理器mgr

GGSCI (bigdata) 1>  edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

5.3 配置checkpoint

checkpoint即復(fù)制可追溯的一個(gè)偏移量記錄,在全局配置里添加checkpoint表即可。

edit  param  ./GLOBALS
CHECKPOINTTABLE crm_report.checkpoint

5.4 配置replicate進(jìn)程

GGSCI (bigdata) 4> edit param rekafka
REPLICAT rekafka
sourcedefs /opt/ogg/dirdef/test_ogg.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE 
GROUPTRANSOPS 10000
MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg;

說明:REPLICATE rekafka定義rep進(jìn)程名稱;sourcedefs即在4.6中在源服務(wù)器上做的表映射文件;TARGETDB LIBFILE即定義kafka一些適配性的庫文件以及配置文件,配置文件位于OGG主目錄下的dirprm/kafka.props;REPORTCOUNT即復(fù)制任務(wù)的報(bào)告生成頻率;GROUPTRANSOPS為以事務(wù)傳輸時(shí),事務(wù)合并的單位,減少IO操作;MAP即源端與目標(biāo)端的映射關(guān)系

5.5 配置kafka.props

cd /opt/ogg/dirprm/
vim kafka.props
gg.handlerlist=kafkahandler //handler類型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關(guān)配置
gg.handler.kafkahandler.topicMappingTemplate=crm_report_ogg //kafka的topic名稱,無需手動創(chuàng)建
gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op  //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次
gg.classpath=dirprm/:/opt/kafka_2.11-1.1.0/libs/*:/opt/ogg/:/opt/ogg/lib
vim custom_kafka_producer.properties
bootstrap.servers=localhost:9092 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時(shí)
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000

其中需要將后面的注釋去掉,ogg不識別注釋,如果不去掉會報(bào)錯(cuò)

5.6 添加trail文件到replicate進(jìn)程

GGSCI (bigdata) 2> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint
REPLICAT added.

6、測試

6.1 啟動所有進(jìn)程

在源端和目標(biāo)端的OGG命令行下使用start [進(jìn)程名]的形式啟動所有進(jìn)程。
啟動順序按照源mgr——目標(biāo)mgr——源extract——源pump——目標(biāo)replicate來完成。
全部需要在ogg目錄下執(zhí)行g(shù)gsci目錄進(jìn)入ogg命令行。
源端依次是

start mgr
start extkafka
start pukafka

目標(biāo)端

start mgr
start rekafka

可以通過info all 或者info [進(jìn)程名] 查看狀態(tài),所有的進(jìn)程都為RUNNING才算成功
源端

GGSCI (crmtstmicserv) 5> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
EXTRACT     RUNNING     EXTKAFKA    04:50:21      00:00:03    
EXTRACT     RUNNING     PUKAFKA     00:00:00      00:00:03

目標(biāo)端

GGSCI (bigdata) 3> info all

Program     Status      Group       Lag at Chkpt  Time Since Chkpt

MANAGER     RUNNING                                           
REPLICAT    RUNNING     REKAFKA     00:00:00      00:00:01

6.2 異常解決

如果有不是RUNNING可通過查看日志的方法檢查解決問題,具體通過下面兩種方法

vim ggser.log

或者ogg命令行,以rekafka進(jìn)程為例

GGSCI (bigdata) 2> view report rekafka

列舉其中我遇到的一個(gè)問題:
異常信息

SEVERE: Unable to set property on handler 'kafkahandler' (oracle.goldengate.handler.kafka.KafkaHandler). Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
oracle.goldengate.util.ConfigException: Failed to set property: TopicName:="test_ogg" (class: oracle.goldengate.handler.kafka.KafkaHandler).
at ......

具體原因是網(wǎng)上的教程是舊版的,設(shè)置topicName的屬性為:

gg.handler.kafkahandler.topicName=test_ogg

新版的這樣設(shè)置

gg.handler.kafkahandler.topicMappingTemplate=test_ogg

大家可根據(jù)自己的版本進(jìn)行設(shè)置,附上stackoverflow原答案

I tried to move data from Oracle Database to Kafka using Golden gate adapter Version 12.3.0.1.0

In new version there is no topicname

The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=test

In previous version we have gg.handler.kafkahandler.topicName=test

6.3 測試同步更新效果

現(xiàn)在源端執(zhí)行sql語句

conn test_ogg/test_ogg
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete test_ogg where id=1;
commit;

查看源端trail文件狀態(tài)

ls -l /opt/ogg/dirdat/to*
-rw-rw-rw- 1 oracle oinstall 1464 May 23 10:31 /opt/ogg/dirdat/to000000

查看目標(biāo)端trail文件狀態(tài)

ls -l /opt/ogg/dirdat/to*
-rw-r----- 1 root root 1504 May 23 10:31 /opt/ogg/dirdat/to000000

查看kafka是否自動建立對應(yīng)的主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

在列表中顯示有test_ogg則表示沒問題
通過消費(fèi)者看是否有同步消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.44.129:9092 --topic test_ogg --from-beginning
{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2018-05-23 10:31:28.000078","current_ts":"2018-05-23T10:36:48.525000","pos":"00000000000000001093","after":{"ID":1,"NAME":"test"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"U","op_ts":"2018-05-23 10:31:36.000073","current_ts":"2018-05-23T10:36:48.874000","pos":"00000000000000001233","before":{},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"TEST_OGG.TEST_OGG","op_type":"D","op_ts":"2018-05-23 10:31:43.000107","current_ts":"2018-05-23T10:36:48.875000","pos":"00000000000000001376","before":{"ID":1}}

顯然,Oracle的數(shù)據(jù)已準(zhǔn)實(shí)時(shí)同步到Kafka,格式為json,其中op_type代表操作類型,這個(gè)可配置,我沒有配置則按默認(rèn)的來,默認(rèn)為

gg.handler.kafkahandler.format.insertOpKey = I  
gg.handler.kafkahandler.format.updateOpKey = U  
gg.handler.kafkahandler.format.deleteOpKey = D

before代表操作之前的數(shù)據(jù),after代表操作后的數(shù)據(jù),現(xiàn)在已經(jīng)可以從kafka獲取到同步的json數(shù)據(jù)了,后面可以用SparkStreaming和Storm等解析然后存到hadoop等大數(shù)據(jù)平臺里

6.4 SparkStreaming測試消費(fèi)同步消息
具體代碼可參考Spark Streaming連接Kafka入門教程
下面附上消費(fèi)成功的結(jié)果圖


image.png

7、更新:后續(xù)遇到的問題

在后面的使用過程中發(fā)現(xiàn)上面同步到kafka的json數(shù)據(jù)中少一些我們想要的一些,下面講一下我是如何解決的
首先建表:

CREATE TABLE "TCLOUD"."T_OGG2" 
   (    "ID" NUMBER(*,0), 
    "TEXT_NAME" VARCHAR2(20), 
    "AGE" NUMBER(*,0), 
    "ADD" VARCHAR2(100), 
    "IDD" VARCHAR2(100), 
     CONSTRAINT "T_OGG2_PK" PRIMARY KEY ("ID", "IDD")

   )

為什么不用之前建的表,主要是之前的字段太少,不容易看出問題,現(xiàn)在主要是增加幾個(gè)字段,然后id,idd是聯(lián)合主鍵。
看一下按照之前的配置,同步到kafka的數(shù)據(jù)(截取部分?jǐn)?shù)據(jù))

{"table":"TCLOUD.T_OGG2","op_type":"I","op_ts":"2018-05-31 11:46:09.512672","current_ts":"2018-05-31T11:46:15.292000","pos":"00000000000000001903","after":{"ID":4,"TEXT_NAME":null,"AGE":0,"ADD":null,"IDD":"8"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:10.514549","current_ts":"2018-05-31T11:49:16.450000","pos":"00000000000000002227","before":{},"after":{"ID":4,"TEXT_NAME":"lisi","IDD":"7"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 11:49:48.514869","current_ts":"2018-05-31T11:49:54.481000","pos":"00000000000000002373","before":{"ID":4,"IDD":"7"},"after":{"ID":1,"IDD":"7"}}

{"table":"TCLOUD.T_OGG2","op_type":"D","op_ts":"2018-05-31 11:52:38.516877","current_ts":"2018-05-31T11:52:45.633000","pos":"00000000000000003161","before":{"ID":1,"IDD":"7"}}

現(xiàn)在只有insert的數(shù)據(jù)是全的,update更新非主鍵字段before是沒有數(shù)據(jù)的,更新主鍵before只有主鍵的數(shù)據(jù),delete只有before的主鍵字段,也就是update和delete的信息是不全的,且沒有主鍵信息(程序里是不能判斷哪一個(gè)是主鍵的),這樣對于程序自動解析同步數(shù)據(jù)是不利的(不同的需求可能不一樣),具體自己可以分析,就不啰嗦了,這里主要解決,有需要before和after全部信息和主鍵信息的需求。

7.1 添加before

在源端extract里添加下面幾行

GGSCI (crmtstmicserv) 33> edit param extkafka
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES

重啟 extkafka

stop extkafka
start extkafka

然后測試

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.709000","pos":"00000000000000003770","before":{"ID":1,"AGE":20,"IDD":"1"},"after":{"ID":1,"AGE":1,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.714000","pos":"00000000000000004009","before":{"ID":1,"AGE":20,"IDD":"2"},"after":{"ID":1,"AGE":1,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:48:55.630340","current_ts":"2018-05-31T14:49:01.715000","pos":"00000000000000004248","before":{"ID":1,"AGE":20,"IDD":"8"},"after":{"ID":1,"AGE":1,"IDD":"8"}}

發(fā)現(xiàn)update之后before里有數(shù)據(jù)即可,但是現(xiàn)在before和after的數(shù)據(jù)都不全(只有部分字段)

網(wǎng)上有的說只添加GETUPDATES即可,但我測試了沒有成功,關(guān)于每個(gè)配置項(xiàng)什么含義可以參考https://blog.csdn.net/linucle/article/details/13505939(有些配置的含義里面也沒有給出)
參考:http://www.itpub.net/thread-2083473-1-1.html

7.2 添加主鍵

在kafka.props添加

gg.handler.kafkahandler.format.includePrimaryKeys=true

重啟 rekafka

stop rekafka
start rekafka

測試:

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 14:58:57.637035","current_ts":"2018-05-31T14:59:03.401000","pos":"00000000000000004510","primary_keys":["ID","IDD"],"before":{"ID":1,"AGE":1,"IDD":"1"},"after":{"ID":1,"AGE":20,"IDD":"1"}}

發(fā)現(xiàn)有primary_keys,不錯(cuò)~

參考:http://blog.51cto.com/lyzbg/2088409

7.3 補(bǔ)全全部字段

如果字段補(bǔ)全應(yīng)該是Oracle沒有開啟全列補(bǔ)充日志

SQL> select supplemental_log_data_all from v$database;  

SUPPLE
------
NO

通過以下命令開啟

SQL> alter database add supplemental log data(all) columns;

Database altered.

SQL> select supplemental_log_data_all from v$database;

SUPPLE
------
YES

SQL>

測試一下

{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.891000","pos":"00000000000000006070","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"1"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"1"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.893000","pos":"00000000000000006341","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"2"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"2"}}
{"table":"TCLOUD.T_OGG2","op_type":"U","op_ts":"2018-05-31 15:27:45.655518","current_ts":"2018-05-31T15:27:52.895000","pos":"00000000000000006612","primary_keys":["ID","IDD"],"before":{"ID":1,"TEXT_NAME":null,"AGE":1,"ADD":null,"IDD":"8"},"after":{"ID":1,"TEXT_NAME":null,"AGE":20,"ADD":null,"IDD":"8"}}

到現(xiàn)在json信息里的內(nèi)容已經(jīng)很全了,基本滿足了我想要的,附圖:

啟發(fā)我發(fā)現(xiàn)和Oracle全列補(bǔ)充日志沒有開啟有關(guān)的博客:https://blog.csdn.net/huoshuyinhua/article/details/79013387
開啟命令參考:https://blog.csdn.net/aaron8219/article/details/16825963

注:博客上講到,開啟全列補(bǔ)充日志會導(dǎo)致磁盤快速增長,LGWR進(jìn)程繁忙,不建議使用。大家可根據(jù)自己的情況使用。

8、關(guān)于通配

如果想通配整個(gè)庫的話,只需要把上面的配置所有表名的改為,如test_ogg.test_ogg改為 test_ogg.,但是kafka的topic不能通配,所以需要把所有的表的數(shù)據(jù)放在一個(gè)topic即可,后面再用程序解析表名即可。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容