Streamsets實(shí)時(shí)數(shù)據(jù)同步

背景

在工作中,各產(chǎn)品線經(jīng)常會(huì)有實(shí)時(shí)數(shù)據(jù)分析、跨中心查詢的需求,而我們使用的Postgresql數(shù)據(jù)庫(kù)難以實(shí)現(xiàn)跨庫(kù)查詢,所以,我們將Postgresql的數(shù)據(jù)實(shí)時(shí)同步到Snappydata中,來(lái)實(shí)現(xiàn)跨中心查詢、實(shí)時(shí)數(shù)據(jù)分析的需求。

這里要說(shuō)明的一點(diǎn),運(yùn)維Snappydata是非常痛苦的一件事,它的社區(qū)很不活躍,遇到問(wèn)題,基本上都是要自己解決,并且它的語(yǔ)法并沒(méi)有常用數(shù)據(jù)庫(kù)那么豐富。

目前,我們正在考慮,將數(shù)據(jù)實(shí)時(shí)同步到Postgresql指定的庫(kù)中,以schema加以區(qū)分?jǐn)?shù)據(jù)庫(kù)(數(shù)據(jù)來(lái)源)或產(chǎn)品線。

管道介紹

我們的實(shí)時(shí)同步主要是將Postgresql的數(shù)據(jù)同步到Snappydata中,而你也可以使用Streamsets實(shí)現(xiàn)從Mysql到Mysql的實(shí)時(shí)同步等等,Streamsets的組件是非常豐富,可以說(shuō)足以滿足常用的需求。這里,我先介紹第一版實(shí)時(shí)同步管道,但它會(huì)存在一些問(wèn)題,這一版只能算是完成了實(shí)時(shí)數(shù)據(jù)同步的功能。
前提條件:已經(jīng)安裝了wal組件來(lái)采集Postgresql數(shù)據(jù)

實(shí)時(shí)數(shù)據(jù)同步

實(shí)時(shí)數(shù)據(jù)同步管道(第一版)

管道組件介紹:

1.Postgresql CDC Client

用途:連接數(shù)據(jù)庫(kù),采集Wal日志

參數(shù)配置:使用組件默認(rèn)參數(shù),填寫(xiě)必要的Replication Slot、JDBC連接、用戶名/密碼

需要指出的是,如果你想立刻看到數(shù)據(jù)變化,請(qǐng)將Max Batch Size設(shè)置為1(默認(rèn)為100)

單批次最大采集數(shù)量

這里,我們發(fā)現(xiàn)Streamsets-3.9.1有以下Bug、問(wèn)題,

1、高并發(fā)下,會(huì)存在數(shù)據(jù)丟失的問(wèn)題,該問(wèn)題我已經(jīng)向Streamsets公司指出,目前他們正在完善中,估計(jì)會(huì)在3.16版本中修復(fù);jira鏈接https://issues.streamsets.com/browse/SDC-13269#add-comment

2、我們測(cè)試3.9.1版本中該組件過(guò)濾表的功能,并沒(méi)有起到過(guò)濾作用,因此,我們通過(guò)寫(xiě)python代碼的方式實(shí)現(xiàn)過(guò)濾表的功能

3、該組件并不能采集DDL語(yǔ)句,只能采集insert、update、delete操作

4、設(shè)置項(xiàng)中Poll Interval 要小于Postgresql配置文件(postgres.conf) 的** wal_sender_timeout**

2. Expression Evaluator

用途:添加過(guò)濾表單

說(shuō)明:可以不使用該組件,可以在③中寫(xiě)python腳本來(lái)過(guò)濾

3.Jython

用途:寫(xiě)python來(lái)解析wal日志,將wal日志處理成json形式

說(shuō)明:

原始wal日志update/insert:

{
     "xid": 1055891831,
     "nextlsn": "130/6FC8A230",
     "timestamp": "2020-01-08 15:00:[14.243564+08]",
     "change": [{
         "kind": "update",
         "schema": "********",
         "table": "********",
         "columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "animal_id", "property_id", "property_val"],
         "columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "bigint", "bigint", "character varying(500)"],
         "columnvalues": [664478959718834178, "1182483438204788738", "1182483438204788738", "2020-01-08 14:42:[09.419]", "2020-01-08 15:00:[14.228968]", 502, 627092828622684160, false, 664478959546867712, 1158021187370930177, "2020-01-08 15:00:14"],
         "oldkeys": {
             "keynames": ["id"],
             "keytypes": ["bigint"],
             "keyvalues": [664478959718834178]
         }
     }, {
         "kind": "insert",
         "schema": "********",
         "table": "********",
         "columnnames": ["id", "create_user", "modify_user", "create_time", "modify_time", "app_id", "tenant_id", "deleted", "event_code", "event_date", "animal_id", "event_content", "org_id", "total_parity"],
         "columntypes": ["bigint", "character varying(500)", "character varying(500)", "timestamp(6) without time zone", "timestamp(6) without time zone", "bigint", "bigint", "boolean", "character varying(32)", "timestamp(6) without time zone", "bigint", "jsonb", "bigint", "integer"],
         "columnvalues": [664483509762727936, "1182483438204788738", "1182483438204788738", "2020-01-08 15:00:[14.234]", "2020-01-08 15:00:[14.234]", 503, 627092828622684160, false, "CR", "2020-01-08 15:00:14", 664478959546867712, "{\"reason\": \"4\", \"remark\": \"App登記\", \"isCancel\": 0}", 642422751306448896, null]
     }]
 }

原始wal日志delete:

{
    "xid": 1055894606,
    "nextlsn": "130/700E9880",
    "timestamp": "2020-01-08 15:18:53.051471+08",
    "change": [{
        "kind": "delete",
        "schema": "*********",
        "table": "*********",
        "oldkeys": {
            "keynames": ["id"],
            "keytypes": ["bigint"],
            "keyvalues": [631496870811664384]
        }
    }]
}

jython中python腳本代碼:

for record in records:
  try:
    # 過(guò)濾表class和表student
    tables = ['class', 'student']
    if record.value['change']['table'] in tables:
      changes=record.value['change']
      for change in changes: 
        record.value.clear()
        kind=change['kind']
        if kind != 'delete':
          columnnames_list=change['columnnames']
          columnvalues_list=change['columnvalues']
        else:
          columnnames_list=change['oldkeys']['keynames']
          columnvalues_list=change['oldkeys']['keyvalues']            
      
        for idx in range(len(columnnames_list)):
          record.value[columnnames_list[idx]]=columnvalues_list[idx]
      
        record.value['kind']=kind
        record.value['table']=change['table']
        record.value['schema']=change['schema']
        record.value['database']='*********'
      
        output.write(record)
  except Exception as e:
    # Send record to error
    error.write(record, str(e))

處理后的數(shù)據(jù):(這里,并沒(méi)有截取原始wal日志的那條數(shù)據(jù),只是來(lái)說(shuō)明處理后的格式)

 新增、更新記錄
{"id":645986949249966087,"create_user":"1191899847279063041","modify_user":"1191899847279063041","create_time":"2019-11-18 14:01:[30.558","modify_time":"2020-01-08 13:54:[55.789116]","app_id":502,"tenant_id":626804036418404352,"org_id":641585797362876416,"deleted":false,"stage":"nursery","start_date":"2019-10-01 14:00:37","end_date":"2020-01-08 13:54:14","qty":1,"wgt":[33.33,],"start_wgt_date":null,"end_wgt_date":null,"start_wgt":[0.0,],"end_wgt":[0.0,],"pre_id":0,"kind":"update","table":"********","schema":"********"}
 
 刪除
 {"id":618383552928546816,"kind":"delete","table":"********","schema":"********"}

下面的截圖是管道的數(shù)據(jù)快照,左邊為原始的wal日志數(shù)據(jù),右邊為Jython組件處理后輸出的數(shù)據(jù)。


管道數(shù)據(jù)快照

4.Stream Selector

用途:分流
說(shuō)明:由于Streamsets的JDBC Producer只能進(jìn)行insert/update/delete其中的一種操作,所以,你需要三個(gè)JDBC Producer來(lái)操作數(shù)據(jù)庫(kù)


分流

5.JDBC Producer

用途:操作數(shù)據(jù)庫(kù)
說(shuō)明:只能進(jìn)行一種操作


操作數(shù)據(jù)庫(kù)

至此,你可以簡(jiǎn)單實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)同步任務(wù),如果,你不想自己配置,這里,我提供了管道的json文件,你直接導(dǎo)入到Streamsets,修改參數(shù)就可以使用了

百度云鏈接(還未上傳)

這一版本存在以下問(wèn)題:
每個(gè)管道占用一個(gè)postgresql slot,如果有其他管道使用該數(shù)據(jù)庫(kù)的數(shù)據(jù)(如:實(shí)時(shí)寬表),則需要另外啟動(dòng)一個(gè)slot來(lái)采集數(shù)據(jù),這樣會(huì)降低postgresql的性能
所以,我將wal日志采集到kafka中,可以讓多個(gè)管道進(jìn)行消費(fèi)

未完,后續(xù)更新

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容