如何通過Logstash同步多表關(guān)聯(lián)數(shù)據(jù)至Elasticsearch

如果你對 使用Logstash保持Elasticsearch與數(shù)據(jù)庫同步 方案還不是很熟悉,建議先花點時間精讀它。
上面的文章以單表同步場景為例,清楚講述了如何通過JDBC同步數(shù)據(jù)至ES,而對于實際開發(fā)中經(jīng)常出現(xiàn)的多表關(guān)聯(lián)同步并未提及,以下是我針對多表關(guān)聯(lián)同步的趟坑過程希望對你有所幫助。

數(shù)據(jù)庫表的約定原則

同步單表時我們對于表字段的約定:

  • 表中要有主鍵字段(如id),最近變更時間字段(如modification_time),軟刪除標(biāo)記字段(如is_deleted),以便jdbc-input數(shù)據(jù)采集的輪詢Job可以識別出增量變動的數(shù)據(jù)。
  • 提示:jdbc input輪詢需要基于modification_time條件查詢,所以給該字段加上索引。

多表關(guān)聯(lián)同步方案

多表關(guān)聯(lián)的情況下我們需要JOIN其他表查詢得到結(jié)果,這個結(jié)果就是ES需要的打平后的寬表。ES新的版本中也增加了join操作,但這事不是ES擅長的,我們選擇交給更擅長的數(shù)據(jù)庫處理,讓ES只存儲打平后的單層索引。

如果你理解單表同步而困惑多表關(guān)聯(lián)同步的話,試著將關(guān)聯(lián)查詢的復(fù)雜SQL想象(定義)為視圖,是不是后續(xù)操作就跟單表沒區(qū)別了!

我們來逐個看下多表關(guān)聯(lián)的同步問題 (假設(shè)表a多對多關(guān)聯(lián)表b):

  • 單表的id字段綁定到ES document的_id,可以實現(xiàn)ES索引冪等性,不會出現(xiàn)job原因?qū)е滤饕臋n重復(fù)。那對于多表關(guān)聯(lián)的情況呢,可以使用各表id的組合作為document的_id。如SELECT:

    concat(a.id, '_', b.id) AS docid
    

    (如果你不關(guān)注冪等,也可以用_id默認(rèn)生成策略。)

  • 單表基于modification_time就可以識別出自上次輪詢后新的變化數(shù)據(jù),多表關(guān)聯(lián)的情況呢也類似:

    (CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time
    
  • 同理軟刪除字段is_deleted的處理邏輯:

    (CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted
    

    這樣無論表a還是表b發(fā)生變更,都可以被logstash識別出來采集到。

如此我們就可以寫出多表關(guān)聯(lián)同步的SQL了,為了方便更新維護SQL及保持logstash-jdbc端conf配置文件的簡潔,你可以把SQL定義成一張視圖,conf文件中的SQL statement可以像寫單表處理一樣了。

示例conf:

input {
  jdbc {
    jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC"
    jdbc_user => "usr"
    jdbc_password => "pwd"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
  }
}
filter {
  mutate {
    copy => { "docid" => "[@metadata][_id]"}
    remove_field => ["docid", "@version", "unix_ts_in_secs"]
  }
}
output {
  elasticsearch {
      index => "test_idx"
      document_id => "%{[@metadata][_id]}"
  }
}

diboot 簡單高效的輕代碼開發(fā)框架 (求star)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容