ELK —— Logstash 將 MySQL 數據同步至 ElasticSearch

目錄預覽

一、搭建環(huán)境

????1.0 環(huán)境依賴

????1.1 搭建 ElasticSearch 環(huán)境

????????1.1.1 ElasticSearch 簡介

????????1.1.2 啟動 ElasticSearch

????1.2 Logstash(多數據源采集系統(tǒng))

????1.3 Kibana(可視化面板)

二、Logstash 配置

????2.1 配置數據庫連接

????2.2 配置同步 ES

????2.3 重新啟動

三、下一步更新計劃

Author:Gorit?

Date:2021/4/7?

Refer:各種同類文章參考融合 + 自己的思考總結?

2021年發(fā)表博文:?16/50

一、搭建環(huán)境

官網介紹[1]?下載地址[2]

1.0 環(huán)境依賴

1.windows 10 開發(fā)環(huán)境2.jdk1.8 環(huán)境(最低版本要求,我看到目前最新)3.配置好 JAVA_HOME,以及 classpath4.相同版本的 ELK (我目前用的是 7.10.0,最新版本的已經更新到了 7.12.0。一定要下載相同版本的,不然會出現莫名其妙的?BUG)5.mysql-connector-java.jar (8.0 或者 5.5 都可以,這個從maven 倉庫里面找,因為同步數據用的是 jdbc)6.ELK 三個下載好的軟件放在一起,目錄中不要出現 空格,中文什么的,也會出現莫名其妙的?BUG

1.1 搭建 ElasticSearch 環(huán)境

1.1.1 ElasticSearch 簡介

ElasticSearch?是基于?Lucence?的分布式搜索引擎,也可以作為“數據庫”存儲一些數據,同類產品還有一個叫做?solr?的,這里就不做描述

1.1.2 啟動 ElasticSearch

1.

不了解 ES 的可以先看這篇?文章[3],畢竟ES 的概念還是挺多的

PS:近期我會整理出 ES 的基本操作的文檔,

2.

ES 的項目結構

3.

ES 啟動很簡單,一鍵啟動即可?bin/elasticsearch.bat

4.

ES 會占用?9200?和?9300?端口

[2021-04-07T15:02:36,121][INFO ][o.e.t.TransportService? ] [DESKTOP-8HFODO1] publish_address {127.0.0.1:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}

[2021-04-07T15:02:39,181][INFO ][o.e.h.AbstractHttpServerTransport] [DESKTOP-8HFODO1] publish_address {127.0.0.1:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}

1.打開游覽器訪問?http://localhost:9200,可以看到你的 ES 基本信息,說明你就搭建完成了

1.2 Logstash(多數據源采集系統(tǒng))

1.項目結構

2.這里需要配置一些東西才能啟動,并且需要啟動參數才能解決。啟動 Logstash logstash -f ../config/logstash-sample.conf 即可3.但是看不到效果,因為要和 ES 配合使用才行

1.3 Kibana(可視化面板)

1.是一個純前端項目,下載好后,項目結構如下

2.啟動方式同 ES,在?bin/kibana.bat?,雙擊即可啟動3.輸入 http://localhost:5601 即可看到 Kibana 的控制面板,但是發(fā)現頁面全是英文的,但是 kibana 也是支持中文的。4.進入 config/kibana.yml ,的最后一行

5.然后重新啟動即可

6.進入工作頁

二、Logstash 配置

2.1 配置數據庫連接

7.將下載好的 mysql-connector-java.8.22.jar 拷貝到?lib/mysql/?下

8.進入 config 目錄,拷貝?logstash-sample.conf?并重命名為?logstash.conf9.查看?logstash.conf?的內容

# Sample Logstash configuration for creating a simple

# Beats -> Logstash -> Elasticsearch pipeline.

logstash 收集模塊,從日志,數據庫中采集數據

input { beats { port => 5044 } }

logstash 輸出模塊,將采集好的數據同步至 ES

output { elasticsearch { hosts => ["http://localhost:9200"] index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" #user => "elastic" #password => "changeme" } }

4. 因此我們需要修改 input 中的內容

```js

input{

beats{

port =>5044

}

# 可以在 logstash 控制臺輸入相對應的參數,來改變 output 的行為

stdin{}

jdbc{

type=>"jdbc"

# 數據庫連接地址,我的是 MySQL 8.0 的,所以連接必須帶上時區(qū)

jdbc_connection_string =>"jdbc:mysql://連接地址:3306/數據庫名稱?characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai"

# 數據庫連接賬號密碼

jdbc_user =>"root"

jdbc_password =>"root"

# MySQL依賴包路徑,名稱一定要對應;

jdbc_driver_library =>"../lib/mysql/mysql-connector-java-8.0.22.jar"

# the name of the driver class for mysql

jdbc_driver_class =>"com.mysql.cj.jdbc.Driver"

# 數據庫重連嘗試次數

connection_retry_attempts =>"3"

# 判斷數據庫連接是否可用,默認false不開啟

jdbc_validate_connection =>"true"

# 數據庫連接可用校驗超時時間,默認3600S

jdbc_validation_timeout =>"3600"

# 開啟分頁查詢(默認false不開啟);

jdbc_paging_enabled =>"true"

# 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值);

jdbc_page_size =>"500"

# statement為查詢數據sql,如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑;

# sql_last_value為內置的變量,存放上次查詢結果中最后一條數據tracking_column的值,此處即為ModifyTime;

# 這個你需要自己多嘗試,執(zhí)行 sql 文件

#statement_filepath =>"../lib/mysql/jdbc.sql"

# 查詢語句,高級一點的就是增加查詢條件

statement =>"select * from `xxx`"

# 是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false);

lowercase_column_names =>false

# Value can be any of: fatal,error,warn,info,debug,默認info;

sql_log_level => warn

# 是否記錄上次執(zhí)行結果,true表示會將上次執(zhí)行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;

record_last_run =>true

# 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值;

use_column_value =>true

# 需要記錄的字段,用于增量同步,需是數據庫字段

tracking_column =>"ModifyTime"

# Value can be any of: numeric,timestamp,Default value is"numeric"

tracking_column_type=> timestamp

# record_last_run上次數據存放位置;

last_run_metadata_path =>"mysql/last_id.txt"

# 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false;

clean_run =>false

# 同步頻率(分 時 天 月 年),默認每分鐘同步一次; 定時任務中的 corn 表達式

schedule =>"* * * * *"

}

}

2.2 配置同步 ES

output{

elasticsearch{

# 作為數組可以存儲集群數據

hosts => ["http://localhost:9200"]

# 索引名字

index =>"blog"

# index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"

# 數據的唯一索引,就是你查詢的表的主鍵或者一個唯一 ID,自動替換為 ES 的 _id 字段

document_id =>"%{blog_id}"

}

stdout{

codec => json_lines

}

}

2.3 重新啟動

可以看到 MySQL 數據庫中的內容已經同步過來了

三、下一步更新計劃

可能考慮做一下 ELK 做日志系統(tǒng)吧

References

[1]?官網介紹:?https://www.elastic.co/cn/what-is/elk-stack

[2]?下載地址:?https://www.elastic.co/cn/start

[3]?文章:?https://blog.csdn.net/caidewei121/article/details/109192680

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容