Mysql數(shù)據(jù)準(zhǔn)實(shí)時(shí)同步到Elasticsearch

一. 摘要

Elasticsearch作為大數(shù)據(jù)場(chǎng)景下的搜索和分析的引擎,廣泛的應(yīng)用于實(shí)時(shí)數(shù)據(jù)的分析場(chǎng)景。本文介紹如何通過logstash_input_jdbc插件將mysql數(shù)據(jù)準(zhǔn)實(shí)時(shí)的同步于Elasticsearch。

二. 前期準(zhǔn)備

  • 安裝Elasticsearch(本文采用的版本為2.4.6)
  • 安裝Mysql
  • 安裝Logstash(本文采用的版本為6.1.1)

由于本文主要介紹如何Mysql數(shù)據(jù)向Elasticsearch同步實(shí)操步驟,前期準(zhǔn)備工作請(qǐng)參閱其他文章。

三. 實(shí)驗(yàn)步驟

3.1 安裝logstash_input_jdbc插件

進(jìn)入logstash bin目錄,執(zhí)行./logstash-plugin list --group input,查看是否已安裝logstash_input_jdbc插件

chenyaleideMacBook-Pro:bin lay$ ./logstash-plugin list --group input
logstash-input-beats
logstash-input-elasticsearch
logstash-input-exec
logstash-input-file
logstash-input-ganglia
logstash-input-gelf
logstash-input-generator
logstash-input-graphite
logstash-input-heartbeat
logstash-input-http
logstash-input-http_poller
logstash-input-imap
logstash-input-jdbc
logstash-input-kafka
logstash-input-pipe
logstash-input-rabbitmq
logstash-input-redis
logstash-input-s3
logstash-input-snmptrap
logstash-input-sqs
logstash-input-stdin
logstash-input-syslog
logstash-input-tcp
logstash-input-twitter
logstash-input-udp
logstash-input-unix

未安裝插件的情況下執(zhí)行./logstash-plugin install logstash-input-jdbc

3.2 安裝logstash-output-elasticsearch插件

進(jìn)入logstash bin目錄,執(zhí)行./logstash-plugin list --group output,查看是否已安裝logstash_output_elasticsearch插件

chenyaleideMacBook-Pro:bin lay$ ./logstash-plugin list --group output
logstash-output-cloudwatch
logstash-output-csv
logstash-output-elasticsearch
logstash-output-email
logstash-output-file
logstash-output-graphite
logstash-output-http
logstash-output-lumberjack
logstash-output-nagios
logstash-output-null
logstash-output-pagerduty
logstash-output-pipe
logstash-output-rabbitmq
logstash-output-redis
logstash-output-s3
logstash-output-sns
logstash-output-sqs
logstash-output-stdout
logstash-output-tcp
logstash-output-udp
logstash-output-webhdfs

未安裝插件的情況下執(zhí)行./logstash-plugin install logstash_output_elasticsearch

3.3 創(chuàng)建mysql測(cè)試數(shù)據(jù)

本文為方便起見,創(chuàng)建teacher與student兩張表,旨在展示如何同時(shí)同步多張表數(shù)據(jù)。

create table teacher (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );

create table student (         
id varchar(10),     
first_name varchar(20),     
last_name varchar(20),     
age int(10),     
about varchar(100),     
interests varchar(100),     
updatetime timestamp null default current_timestamp on update current_timestamp );

3.4 創(chuàng)建logstash作業(yè)文件

在/bin目錄下新建jdbc.conf文件,當(dāng)然你可以在其他目錄創(chuàng)建。
輸入以下內(nèi)容:

input {
    jdbc {
      # mysql jdbc connection string to our backup databse
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/estest"
      # the user we wish to excute our statement as
      jdbc_user => "root"
      jdbc_password => "123456"
      # the path to our downloaded jdbc driver
      jdbc_driver_library => "/Users/lay/.m2/repository/mysql/mysql-connector-java/5.1.37/mysql-connector-java-5.1.37.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone => "Asia/Shanghai"
      #可以將sql單獨(dú)提成一個(gè)文件,寫法如下
      #statement_filepath => "jdbc.sql"
      statement => "select * from teacher where update_time > :sql_last_value"
      use_column_value => false
      clean_run => true
      last_run_metadata_path => "./teacher_last_run"
      schedule => "* * * * *"
      type => "jdbc"
      #通用屬性tags
      tags => ["teacher"]
   }
   jdbc {
      # mysql jdbc connection string to our backup databse
      jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/estest"
      # the user we wish to excute our statement as
      jdbc_user => "root"
      jdbc_password => "123456"
      # the path to our downloaded jdbc driver
      jdbc_driver_library => "/Users/lay/.m2/repository/mysql/mysql-connector-java/5.1.37/mysql-connector-java-5.1.37.jar"
      # the name of the driver class for mysql
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      jdbc_default_timezone => "Asia/Shanghai"
      #statement_filepath => "jdbc.sql"
      statement => "select * from student where update_time > :sql_last_value"
      use_column_value => false
      clean_run => true
      last_run_metadata_path => "./student_last_run"
      schedule => "* * * * *"
      type => "jdbc"
      tags => ["student"]
   }
}

output {
    if "teacher" in [tags]{

      elasticsearch{
        hosts => "127.0.0.1:9200"
        index => "teacher"
        document_id => "%{id}"
      }
    }else if "student" in [tags]{
      elasticsearch{
        hosts => "127.0.0.1:9200"
        index => "student"
        document_id => "%{id}"
      }
    }
    stdout {
        codec => json_lines
    }
}

3.5 同步數(shù)據(jù)

啟動(dòng)logstash服務(wù),執(zhí)行如下命令:

logstash -f jdbc.conf

后臺(tái)啟動(dòng)命令:

nohup ./logstash -f jdbc.conf  > logstash.log &

數(shù)據(jù)庫(kù)插入數(shù)據(jù):

INSERT INTO student(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO student(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO student(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]'); 

INSERT INTO teacher(id,first_name,last_name,age,about,interests) VALUES('001','John','Smith', 25, 'I love to go rock climbing','[ "sports", "music" ]'); 
INSERT INTO teacher(id,first_name,last_name,age,about,interests) VALUES('002','Jane','Smith', 32, 'I like to collect rock albums','[ "music" ]'); 
INSERT INTO teacher(id,first_name,last_name,age,about,interests) VALUES('003','Douglas','Fir', 35, 'I like to build cabinets','[ "forestry" ]'); 

3.6 結(jié)果驗(yàn)證

查詢Elasticsearch中是否存在mysql的插入數(shù)據(jù),至此數(shù)據(jù)同步完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • 搜索引擎介紹Elasticsearch的使用Logstash的使用Filebeat的使用Kibana的使用Elas...
    哈嘍別樣閱讀 1,015評(píng)論 1 4
  • 1.where、if 3.foreach
    Stringer閱讀 274評(píng)論 0 0
  • 現(xiàn)在,我以長(zhǎng)大,可以翻過那道高高墻,但我舍不得,舍不得這個(gè)我生活了很久的地方,但是不可以,我要去看看這個(gè)世界有多大...
    魚肉卷閱讀 205評(píng)論 0 0
  • 鷓鴣天(詞林正韻)/觀山海關(guān)圖隨感 作者:心博、圖片:網(wǎng)絡(luò) 血跡狼煙始未干,秦風(fēng)漢雨兩千年。雄關(guān)鐵桶臨天險(xiǎn),古道金...
    心博1閱讀 250評(píng)論 0 0

友情鏈接更多精彩內(nèi)容