本文集主要是總結(jié)自己在項目中使用ES 的經(jīng)驗教訓(xùn),包括各種實戰(zhàn)和調(diào)優(yōu)。
需求
為實現(xiàn)將mysql數(shù)據(jù)導(dǎo)入到elasticsearch中進行調(diào)研,發(fā)現(xiàn)使用插件可以很好地實現(xiàn)該功能。所以進行測試環(huán)境搭建,主要的參考鏈接如下。
環(huán)境搭建鏈接:http://blog.csdn.net/yeyuma/article/details/50240595#quote
input-jdbc官方文檔:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
實現(xiàn)原理
通過執(zhí)行一條sql語句,將每一行查詢結(jié)果生成為一個input events。
Important Fields:
1.statement:任何有效的sql聲明都會執(zhí)行,根據(jù)需求可以寫復(fù)雜的sql語句,如使用INNER JOIN 或UNION ALL等
2.parameters:通過:<filedname>引用語法可以在sql聲明中指定參數(shù)值
3.schedule:在執(zhí)行sql時,可以同時進行有效的調(diào)度(這句話不是很理解)
安裝過程
因為測試機不予許使用install、apt-get等方式,所以我們主要采用wget {url}或者本地下載好傳到測試機的方式進行插件的下載。
需要安裝一下插件:
1.安裝gem:wget https://rubygems.org/rubygems/rubygems-2.6.11.tgz 參照鏈接修改數(shù)據(jù)源地址為https://ruby.taobao.org
2.logstash-input-jdbc下載:wget https://github.com/logstash-plugins/logstash-input-jdbc/archive/v4.1.3.tar.gz
可通過https://github.com/logstash-plugins/logstash-input-jdbc/releases進行版本選擇,然后復(fù)制下載鏈接以后通過wget下載,也可下載以后傳到測試機上。
3.下載mysql-connector-java-5.1.36-bin.jar:直接百度搜索以后從csdn上下載傳到測試機上。
4.jdbc.conf 和jdbc.sql文件,見文檔底部
啟動過程
執(zhí)行如下命令: ./bin/logstash -f /home/appops/logstash-5.3.0/conf/jdbc.conf
同時通過curl命令查詢相關(guān)信息。
logstash-input-jdbc 相關(guān)用法總結(jié)
Example:
input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => { "favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songs where artist = :favorite_artist"
}
}
也可以通過設(shè)置statement_filepath的方式來指定sql語句。
內(nèi)置字段聲明:可參照官方文檔:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters
sql_last_value:用來計算數(shù)據(jù)執(zhí)行到哪一行,一般為語句插入es的時間,如果clean_run設(shè)置為true則會忽略sql_last_value的值。
設(shè)置use_column_value為true且tracking_column設(shè)置為數(shù)據(jù)庫中的某一個字段,則可實現(xiàn)自定義判斷條件,默認(rèn)起始為0。
必需配置項:
jdbc_connection_string => ...
jdbc_driver_class => ...
jdbc_user => ...
可選配置項:
add_field : Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash) ;Add a field to an event
charset : Value type is string;默認(rèn)utf-8,可以用columns_charset給特殊的列設(shè)置charset。
columns_charset:Value type is [hash](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#hash);columns_charset => { "column0" => "ISO-8859-1" } //只對column0修改charset
clean_run:Value type is boolean;Default value is false ;Whether the previous run state should be preserved
codec:Value type is codec;Default value is "plain" The codec used for input data. Input codecs are a convenient method for decoding your data before it enters the input, without needing a separate filter in your Logstash pipeline.
connection_retry_attempts:Default value is 1 Maximum number of times to try connecting to database
connection_retry_attempts_wait_time:Default value is 0.5 Number of seconds to sleep between connection attempts
enable_metric:Value type is [boolean](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html#boolean);Default value is true
id:暫時未整理,涉及多個相同類型的插件時會用到。
jdbc_default_timezone:設(shè)置默認(rèn)時區(qū),這個很有用,Asia/Shanghai
jdbc_driver_library:設(shè)置jdbc_driver庫,可以使用第三方庫的方法。
jdbc_fetch_size:可能就是限制jdbc讀入數(shù)據(jù)大小
的地方,還沒有測試。如果沒有設(shè)置,則會使用各自驅(qū)動的默認(rèn)值。默認(rèn)值為Integer.MIN_VALUE。
jdbc_page_size:默認(rèn)是100000;This will cause a sql statement to be broken up into multiple queries. Each query will use limits and offsets to collectively retrieve the full result-set. The limit size is set with jdbc_page_size.主要為取數(shù)據(jù)的sql設(shè)置limit和offsets
jdbc_paging_enabled:默認(rèn)是false,設(shè)置為true以后可以自定義jdbc_page_size的大小。
jdbc_password:
jdbc_password_filepath:
jdbc_pool_timeout:后面的就先自己看看,暫不總結(jié)
關(guān)于使用logstash_input_jdbc中的sql_last_value比標(biāo)準(zhǔn)時間提前8小時的解決方案
由于提前8小時,導(dǎo)致logstash_input_jdbc通過時間作為判斷對新增數(shù)據(jù)寫入es時會不停寫入最近8小時的數(shù)據(jù),導(dǎo)致version劇增。通過看文檔從配置文件入手和從sql語句入手兩個思路來解決。建議推薦使用方法2。
方法一:
修改sql語句寫法
select
h.id as id,
h.title as title,
h.update_time as update_time
from
articles h
where
h.update_time >= DATE_ADD(:sql_last_value,INTERVAL 8 HOUR)
借助DATE_ADD方法實現(xiàn)對:sql_last_value的自增八小時。
方法二:
可通過jdbc_default_timezone屬性設(shè)置任意標(biāo)準(zhǔn)時區(qū)。如 Asia/Shanghai
注:添加在jdbc.conf里??蓞⒄兆詈笳淼拇罱ōh(huán)境文檔。
相關(guān)配置和jar包可以從網(wǎng)盤下載。
鏈接: https://pan.baidu.com/s/1HVhuTOvJJ4ux3BL61nBVOA 提取碼: q6cn