背景
在工作中,各產(chǎn)品線經(jīng)常會(huì)有實(shí)時(shí)數(shù)據(jù)分析、跨中心查詢的需求,而我們使用的Postgresql數(shù)據(jù)庫(kù)難以實(shí)現(xiàn)跨庫(kù)查詢,所以,我們將Postgresql的數(shù)據(jù)實(shí)時(shí)同步到Snappydata中,來(lái)實(shí)現(xiàn)跨中心查詢、實(shí)時(shí)數(shù)據(jù)分析的需求。
這里要說(shuō)明的一點(diǎn),運(yùn)維Snappydata是非常痛苦的一件事,它的社區(qū)很不活躍,遇到問(wèn)題,基本上都是要自己解決,并且它的語(yǔ)法并沒(méi)有常用數(shù)據(jù)庫(kù)那么豐富。
目前,我們正在考慮,將數(shù)據(jù)實(shí)時(shí)同步到Postgresql指定的庫(kù)中,以schema加以區(qū)分?jǐn)?shù)據(jù)庫(kù)(數(shù)據(jù)來(lái)源)或產(chǎn)品線。
管道介紹
我們的實(shí)時(shí)同步主要是將Postgresql的數(shù)據(jù)同步到Snappydata中,而你也可以使用Streamsets實(shí)現(xiàn)從Mysql到Mysql的實(shí)時(shí)同步等等,Streamsets的組件是非常豐富,可以說(shuō)足以滿足常用的需求。這里,我先介紹第一版實(shí)時(shí)同步管道,但它會(huì)存在一些問(wèn)題,這一版只能算是完成了實(shí)時(shí)數(shù)據(jù)同步的功能。
前提條件:已經(jīng)安裝了wal組件來(lái)采集Postgresql數(shù)據(jù)

實(shí)時(shí)數(shù)據(jù)同步管道(第一版)
管道組件介紹:
1.Postgresql CDC Client
用途:連接數(shù)據(jù)庫(kù),采集Wal日志
參數(shù)配置:使用組件默認(rèn)參數(shù),填寫(xiě)必要的Replication Slot、JDBC連接、用戶名/密碼
需要指出的是,如果你想立刻看到數(shù)據(jù)變化,請(qǐng)將Max Batch Size設(shè)置為1(默認(rèn)為100)

這里,我們發(fā)現(xiàn)Streamsets-3.9.1有以下Bug、問(wèn)題,
1、高并發(fā)下,會(huì)存在數(shù)據(jù)丟失的問(wèn)題,該問(wèn)題我已經(jīng)向Streamsets公司指出,目前他們正在完善中,估計(jì)會(huì)在3.16版本中修復(fù);jira鏈接https://issues.streamsets.com/browse/SDC-13269#add-comment
2、我們測(cè)試3.9.1版本中該組件過(guò)濾表的功能,并沒(méi)有起到過(guò)濾作用,因此,我們通過(guò)寫(xiě)python代碼的方式實(shí)現(xiàn)過(guò)濾表的功能
3、該組件并不能采集DDL語(yǔ)句,只能采集insert、update、delete操作
4、設(shè)置項(xiàng)中Poll Interval 要小于Postgresql配置文件(postgres.conf) 的** wal_sender_timeout**
2. Expression Evaluator
用途:添加過(guò)濾表單
說(shuō)明:可以不使用該組件,可以在③中寫(xiě)python腳本來(lái)過(guò)濾
3.Jython
用途:寫(xiě)python來(lái)解析wal日志,將wal日志處理成json形式
說(shuō)明:
原始wal日志update/insert:
{
"xid": 1055891831,
"nextlsn": "130/6FC8A230",
"timestamp": "2020-01-08 15:00:[14.243564+08]",
"change": [{
"kind": "update",
"schema": "********",
"table": "********",
"columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "animal_id", "property_id", "property_val"],
"columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "bigint", "bigint", "character varying(500)"],
"columnvalues": [664478959718834178, "1182483438204788738", "1182483438204788738", "2020-01-08 14:42:[09.419]", "2020-01-08 15:00:[14.228968]", 502, 627092828622684160, false, 664478959546867712, 1158021187370930177, "2020-01-08 15:00:14"],
"oldkeys": {
"keynames": ["id"],
"keytypes": ["bigint"],
"keyvalues": [664478959718834178]
}
}, {
"kind": "insert",
"schema": "********",
"table": "********",
"columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "event_code", "event_date", "animal_id", "event_content", "org_id", "total_parity"],
"columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "character varying(32)", "timestamp(6) without time zone", "bigint", "jsonb", "bigint", "integer"],
"columnvalues": [664483509762727936, "1182483438204788738", "1182483438204788738", "2020-01-08 15:00:[14.234]", "2020-01-08 15:00:[14.234]", 503, 627092828622684160, false, "CR", "2020-01-08 15:00:14", 664478959546867712, "{\"reason\": \"4\", \"remark\": \"App登記\", \"isCancel\": 0}", 642422751306448896, null]
}]
}
原始wal日志delete:
{
"xid": 1055894606,
"nextlsn": "130/700E9880",
"timestamp": "2020-01-08 15:18:53.051471+08",
"change": [{
"kind": "delete",
"schema": "*********",
"table": "*********",
"oldkeys": {
"keynames": ["id"],
"keytypes": ["bigint"],
"keyvalues": [631496870811664384]
}
}]
}
jython中python腳本代碼:
for record in records:
try:
# 過(guò)濾表class和表student
tables = ['class', 'student']
if record.value['change']['table'] in tables:
changes=record.value['change']
for change in changes:
record.value.clear()
kind=change['kind']
if kind != 'delete':
columnnames_list=change['columnnames']
columnvalues_list=change['columnvalues']
else:
columnnames_list=change['oldkeys']['keynames']
columnvalues_list=change['oldkeys']['keyvalues']
for idx in range(len(columnnames_list)):
record.value[columnnames_list[idx]]=columnvalues_list[idx]
record.value['kind']=kind
record.value['table']=change['table']
record.value['schema']=change['schema']
record.value['database']='*********'
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
處理后的數(shù)據(jù):(這里,并沒(méi)有截取原始wal日志的那條數(shù)據(jù),只是來(lái)說(shuō)明處理后的格式)
新增、更新記錄
{"id":645986949249966087,"create_user":"1191899847279063041","modify_user":"1191899847279063041","create_time":"2019-11-18 14:01:[30.558","modify_time":"2020-01-08 13:54:[55.789116]","app_id":502,"tenant_id":626804036418404352,"org_id":641585797362876416,"deleted":false,"stage":"nursery","start_date":"2019-10-01 14:00:37","end_date":"2020-01-08 13:54:14","qty":1,"wgt":[33.33,],"start_wgt_date":null,"end_wgt_date":null,"start_wgt":[0.0,],"end_wgt":[0.0,],"pre_id":0,"kind":"update","table":"********","schema":"********"}
刪除
{"id":618383552928546816,"kind":"delete","table":"********","schema":"********"}
下面的截圖是管道的數(shù)據(jù)快照,左邊為原始的wal日志數(shù)據(jù),右邊為Jython組件處理后輸出的數(shù)據(jù)。

4.Stream Selector
用途:分流
說(shuō)明:由于Streamsets的JDBC Producer只能進(jìn)行insert/update/delete其中的一種操作,所以,你需要三個(gè)JDBC Producer來(lái)操作數(shù)據(jù)庫(kù)

5.JDBC Producer
用途:操作數(shù)據(jù)庫(kù)
說(shuō)明:只能進(jìn)行一種操作

至此,你可以簡(jiǎn)單實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步任務(wù),如果,你不想自己配置,這里,我提供了管道的json文件,你直接導(dǎo)入到Streamsets,修改參數(shù)就可以使用了
百度云鏈接(還未上傳)
這一版本存在以下問(wèn)題:
每個(gè)管道占用一個(gè)postgresql slot,如果有其他管道使用該數(shù)據(jù)庫(kù)的數(shù)據(jù)(如:實(shí)時(shí)寬表),則需要另外啟動(dòng)一個(gè)slot來(lái)采集數(shù)據(jù),這樣會(huì)降低postgresql的性能
所以,我將wal日志采集到kafka中,可以讓多個(gè)管道進(jìn)行消費(fèi)
未完,后續(xù)更新