[2] 如何快速將存量mysql 數(shù)據(jù)導(dǎo)入到ES

本文集主要是總結(jié)自己在項目中使用ES 的經(jīng)驗教訓(xùn),包括各種實戰(zhàn)和調(diào)優(yōu)。

需求

為實現(xiàn)將mysql數(shù)據(jù)導(dǎo)入到elasticsearch中進行調(diào)研,發(fā)現(xiàn)使用插件可以很好地實現(xiàn)該功能。所以進行測試環(huán)境搭建,主要的參考鏈接如下。

環(huán)境搭建鏈接:http://blog.csdn.net/yeyuma/article/details/50240595#quote

input-jdbc官方文檔:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html


實現(xiàn)原理

通過執(zhí)行一條sql語句,將每一行查詢結(jié)果生成為一個input events。


Important Fields:

1.statement:任何有效的sql聲明都會執(zhí)行,根據(jù)需求可以寫復(fù)雜的sql語句,如使用INNER JOIN 或UNION ALL等
2.parameters:通過:<filedname>引用語法可以在sql聲明中指定參數(shù)值
3.schedule:在執(zhí)行sql時,可以同時進行有效的調(diào)度(這句話不是很理解)


安裝過程

因為測試機不予許使用install、apt-get等方式,所以我們主要采用wget {url}或者本地下載好傳到測試機的方式進行插件的下載。

需要安裝一下插件:

1.安裝gem:wget https://rubygems.org/rubygems/rubygems-2.6.11.tgz 參照鏈接修改數(shù)據(jù)源地址為https://ruby.taobao.org

2.logstash-input-jdbc下載:wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.1.3.tar.gz

可通過https://github.com/logstash-plugins/logstash-input-jdbc/releases進行版本選擇,然后復(fù)制下載鏈接以后通過wget下載,也可下載以后傳到測試機上。

3.下載mysql-connector-java-5.1.36-bin.jar:直接百度搜索以后從csdn上下載傳到測試機上。

4.jdbc.conf 和jdbc.sql文件,見文檔底部


啟動過程

執(zhí)行如下命令: ./bin/logstash -f /home/appops/logstash-5.3.0/conf/jdbc.conf

同時通過curl命令查詢相關(guān)信息。


logstash-input-jdbc 相關(guān)用法總結(jié)

Example:

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
    parameters => { "favorite_artist" => "Beethoven" }
    schedule => "* * * * *"
    statement => "SELECT * from songs where artist = :favorite_artist"
  }
}

也可以通過設(shè)置statement_filepath的方式來指定sql語句。

內(nèi)置字段聲明:可參照官方文檔:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters

sql_last_value:用來計算數(shù)據(jù)執(zhí)行到哪一行,一般為語句插入es的時間,如果clean_run設(shè)置為true則會忽略sql_last_value的值。

設(shè)置use_column_value為true且tracking_column設(shè)置為數(shù)據(jù)庫中的某一個字段,則可實現(xiàn)自定義判斷條件,默認(rèn)起始為0。


必需配置項:

jdbc_connection_string => ... 
jdbc_driver_class => ... 
jdbc_user => ...

可選配置項:

add_field : Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash) ;Add a field to an event

charset : Value type is string;默認(rèn)utf-8,可以用columns_charset給特殊的列設(shè)置charset。

columns_charset:Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash);columns_charset => { "column0" => "ISO-8859-1" } //只對column0修改charset

clean_run:Value type is boolean;Default value is false ;Whether the previous run state should be preserved

codec:Value type is codec;Default value is "plain" The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.

connection_retry_attempts:Default value is 1 Maximum number of times to try connecting to database

connection_retry_attempts_wait_time:Default value is 0.5 Number of seconds to sleep between connection attempts

enable_metric:Value type is [boolean](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#boolean);Default value is true

id:暫時未整理,涉及多個相同類型的插件時會用到。

jdbc_default_timezone:設(shè)置默認(rèn)時區(qū),這個很有用,Asia/Shanghai

jdbc_driver_library:設(shè)置jdbc_driver庫,可以使用第三方庫的方法。

jdbc_fetch_size:可能就是限制jdbc讀入數(shù)據(jù)大小

的地方,還沒有測試。如果沒有設(shè)置,則會使用各自驅(qū)動的默認(rèn)值。默認(rèn)值為Integer.MIN_VALUE。

jdbc_page_size:默認(rèn)是100000;This will cause a sql statement to be broken up into multiple queries. Each query will use limits and offsets to collectively retrieve the full result-set. The limit size is set with jdbc_page_size.主要為取數(shù)據(jù)的sql設(shè)置limit和offsets

jdbc_paging_enabled:默認(rèn)是false,設(shè)置為true以后可以自定義jdbc_page_size的大小。

jdbc_password:

jdbc_password_filepath:

jdbc_pool_timeout:后面的就先自己看看,暫不總結(jié)


關(guān)于使用logstash_input_jdbc中的sql_last_value比標(biāo)準(zhǔn)時間提前8小時的解決方案

由于提前8小時,導(dǎo)致logstash_input_jdbc通過時間作為判斷對新增數(shù)據(jù)寫入es時會不停寫入最近8小時的數(shù)據(jù),導(dǎo)致version劇增。通過看文檔從配置文件入手和從sql語句入手兩個思路來解決。建議推薦使用方法2。

方法一:
修改sql語句寫法
select 
    h.id as id, 
    h.title as title, 
    h.update_time as update_time
from 
    articles h
where 
    h.update_time >= DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)
借助DATE_ADD方法實現(xiàn)對:sql_last_value的自增八小時。

方法二:
可通過jdbc_default_timezone屬性設(shè)置任意標(biāo)準(zhǔn)時區(qū)。如 Asia/Shanghai
注:添加在jdbc.conf里??蓞⒄兆詈笳淼拇罱ōh(huán)境文檔。

相關(guān)配置和jar包可以從網(wǎng)盤下載。
鏈接: https://pan.baidu.com/s/1HVhuTOvJJ4ux3BL61nBVOA 提取碼: q6cn

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容