如果你對 使用Logstash保持Elasticsearch與數(shù)據(jù)庫同步 方案還不是很熟悉,建議先花點時間精讀它。
上面的文章以單表同步場景為例,清楚講述了如何通過JDBC同步數(shù)據(jù)至ES,而對于實際開發(fā)中經(jīng)常出現(xiàn)的多表關(guān)聯(lián)同步并未提及,以下是我針對多表關(guān)聯(lián)同步的趟坑過程希望對你有所幫助。
數(shù)據(jù)庫表的約定原則
同步單表時我們對于表字段的約定:
- 表中要有主鍵字段(如id),最近變更時間字段(如modification_time),軟刪除標(biāo)記字段(如is_deleted),以便jdbc-input數(shù)據(jù)采集的輪詢Job可以識別出增量變動的數(shù)據(jù)。
- 提示:jdbc input輪詢需要基于modification_time條件查詢,所以給該字段加上索引。
多表關(guān)聯(lián)同步方案
多表關(guān)聯(lián)的情況下我們需要JOIN其他表查詢得到結(jié)果,這個結(jié)果就是ES需要的打平后的寬表。ES新的版本中也增加了join操作,但這事不是ES擅長的,我們選擇交給更擅長的數(shù)據(jù)庫處理,讓ES只存儲打平后的單層索引。
如果你理解單表同步而困惑多表關(guān)聯(lián)同步的話,試著將關(guān)聯(lián)查詢的復(fù)雜SQL想象(定義)為視圖,是不是后續(xù)操作就跟單表沒區(qū)別了!
我們來逐個看下多表關(guān)聯(lián)的同步問題 (假設(shè)表a多對多關(guān)聯(lián)表b):
-
單表的id字段綁定到ES document的_id,可以實現(xiàn)ES索引冪等性,不會出現(xiàn)job原因?qū)е滤饕臋n重復(fù)。那對于多表關(guān)聯(lián)的情況呢,可以使用各表id的組合作為document的_id。如SELECT:
concat(a.id, '_', b.id) AS docid(如果你不關(guān)注冪等,也可以用_id默認(rèn)生成策略。)
-
單表基于modification_time就可以識別出自上次輪詢后新的變化數(shù)據(jù),多表關(guān)聯(lián)的情況呢也類似:
(CASE WHEN a.modification_time > b.modification_time THEN a.modification_time ELSE b.modification_time END) AS modification_time -
同理軟刪除字段is_deleted的處理邏輯:
(CASE WHEN a.is_deleted=0 AND b.is_deleted=0 THEN 0 ELSE 1 END) AS is_deleted這樣無論表a還是表b發(fā)生變更,都可以被logstash識別出來采集到。
如此我們就可以寫出多表關(guān)聯(lián)同步的SQL了,為了方便更新維護SQL及保持logstash-jdbc端conf配置文件的簡潔,你可以把SQL定義成一張視圖,conf文件中的SQL statement可以像寫單表處理一樣了。
示例conf:
input {
jdbc {
jdbc_driver_library => "../drivers/mysql-connector-java-8.0.16.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/es_db?serverTimezone=UTC"
jdbc_user => "usr"
jdbc_password => "pwd"
jdbc_paging_enabled => true
tracking_column => "unix_ts_in_secs"
use_column_value => true
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM esview WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
}
}
filter {
mutate {
copy => { "docid" => "[@metadata][_id]"}
remove_field => ["docid", "@version", "unix_ts_in_secs"]
}
}
output {
elasticsearch {
index => "test_idx"
document_id => "%{[@metadata][_id]}"
}
}