Logstash 簡易教程

建議在使用logstash之前先想清楚自己的需求是什么,從哪種數(shù)據(jù)源同步到哪里,需要經(jīng)過怎么樣的處理。因為logstash版本迭代較快,每個版本的插件都有點區(qū)別,比如filter中的http插件在6.6版本以后才有;output到現(xiàn)在(7.1)都沒有jdbc的插件,然而你如果想使用output的jdbc插件就需要自己去安裝熱心人自己寫的插件(logstash-output-jdbc),不幸的是,該作者指出沒有很多的時間去維護此插件,不能保證6.3以后用戶的正常使用。
也就是說,如果你想用output的jdbc,你就必須使用6.3以下(最好5.x)的版本,如果你想用官方filter的http插件,你就得用6.5以上的版本。話說logstash為什么不把jdbc加入到output中去呢。。。

另外:最好在linux下進行調(diào)試或使用,因為用windows會出現(xiàn)惡心的亂碼問題。。。。

安裝

如果目標(biāo)數(shù)據(jù)源的type為jdbc,則建議安裝logstash-5.4.1或6.x一下的版本,因為logstash-output-jdbc只在6.x以下有效,本教程對應(yīng)的版本為logstash-5.4.1

建議直接下載zip包進行使用,解壓完之后的目錄如下

.
├── bin  # 二進制腳本,包括用于啟動Logstash的logstash和用于安裝插件的logstash-plugin
├── config # Configuration files, including logstash.yml and jvm.options
├── CONTRIBUTORS
├── data
├── Gemfile
├── Gemfile.jruby-2.3.lock
├── lib
├── LICENSE
├── logs # 日志文件
├── logstash-core
├── logstash-core-plugin-api
├── modules
├── NOTICE.TXT
├── tools
└── vendor

https://www.elastic.co/guide/en/logstash/7.1/dir-layout.html

可以在對應(yīng)目錄下,執(zhí)行如下命令來驗證是否安裝成功

bin/logstash -e 'input { stdin { } } output { stdout {} }'

等待啟動成功,直接輸入“hello world”
會有如下顯示

{
      "@version" => "1",
    "@timestamp" => 2019-06-20T06:46:48.310Z,
       "message" => "hello world",
          "host" => "localhost.localdomain"
}

則安裝成功

語法

區(qū)段

Logstash 設(shè)計了自己的 DSL —— 有點像 Puppet 的 DSL,或許因為都是用 Ruby 語言寫的吧 —— 包括有區(qū)域,注釋,數(shù)據(jù)類型(布爾值,字符串,數(shù)值,數(shù)組,哈希),條件判斷,字段引用等。

Logstash 用 {} 來定義區(qū)域,區(qū)域內(nèi)可以包括插件區(qū)域定義,你可以在一個區(qū)域內(nèi)定義多個插件。插件區(qū)域內(nèi)則可以定義鍵值對設(shè)置。示例如下:

input {}
filter {}
output {}

數(shù)據(jù)類型

類型 示例
bool debug => true
bytes my_bytes => "113" # 113 bytes
string host => "hostname"
number port => 214
array match =>[ "/var/log/messages", "/var/log/*.log" ]
hash options => {key1 => "value1",key2 => "value2" }

條件判斷

名稱 符號
等于 ==
不等于 !=
小于 <
大于 >
小于等于 <=
大于等于 >=
匹配正則 =~
不匹配正則 !~
包含 in
不包含 not in
and
or
非與 nand
非或 xor
復(fù)合表達式 ()
取反符合 !()

示例

讀取日志文件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html#plugins-inputs-file-type
各參數(shù)介紹與賦值

全量同步一個文件的所有內(nèi)容,并且進行監(jiān)聽

input {
    file {
        path => ["/usr/local/logstash/logstash-tutorial-dataset"]
        type => "file_monitor"
        tags => ["有用的","標(biāo)識用的"]
        start_position => "beginning"
    }

}
output {
   stdout {}
}

監(jiān)聽后不能打開編輯再保存必須使用echo >> 對文件進行追加內(nèi)容,可以看到

{
       "message" => "2019年6月20日16:57:52",
    "@timestamp" => 2019-06-20T08:57:55.624Z,
          "tags" => [
        [0] "有用的",
        [1] "標(biāo)識用的"
    ],
          "host" => "localhost.localdomain",
          "path" => "/usr/local/logstash/logstash-tutorial-dataset",
      "@version" => "1",
          "type" => "file_monitor"
}

讀取mysql

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash/mysql/mysql-connector-java-5.1.47.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://xxx:xxx/data"
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        parameters => { "id" => 666 }
        statement => "SELECT * FROM xxx_t_job_function_net_bak20140828 WHERE id > :id"
    }
}
output {
    stdout {}
}

寫mysql

官方?jīng)]有提供寫mysql的插件,所以需要自己下載安裝
https://github.com/theangryangel/logstash-output-jdbc

./logstash-plugin install logstash-output-jdbc

可惜的是,目前插件只支持6.3以前,當(dāng)前l(fā)ogstash最新版本為7.1.1,
測試用的是logstash-5.4.1 較穩(wěn)定

input {
    jdbc {
        jdbc_driver_library => "/usr/local/logstash/mysql/mysql-connector-java-5.1.47.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://120.55.168.18:8840/data"
        jdbc_user => "crawler"
        jdbc_password => "crawler9988@ed"
        parameters => { "id" => 666 }
        statement => "SELECT * FROM xxx_t_job_function_net_bak20140828 WHERE id > :id"
    }
}
output {
    jdbc {
        driver_jar_path => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar"
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://sss:8840/testcase"
        username => "sss"
        password => "csssd"
        statement => ["INSERT INTO job_function_20190621 ( code_val, name_val, level_val, source_name, version ) VALUES (?,?,?,?,?)","code","name","level","source_name","current_version"]
    }
    stdout {}
}

input

output

filter

以上三個鏈接為官方文檔,基本上涵蓋了可能會用到的數(shù)據(jù)源頭與目標(biāo),如果沒有的話,可以去github上搜索一下一般都會有人自己開發(fā)插件如logstash-output-jdbc

完整的示例

  • 源數(shù)據(jù)。data數(shù)據(jù)庫的xxx_t_job_function_net_bak20140828表,獲取id>666的數(shù)據(jù)
  • 過濾(清洗)數(shù)據(jù)。利用dict數(shù)據(jù)庫dict_location表替換數(shù)據(jù)源中code,去除對應(yīng)的location_name_cn,如遇到多條數(shù)據(jù)則合并為一條用“;”分開
  • 目標(biāo)數(shù)據(jù)。將整理好的數(shù)據(jù)保存在testcase數(shù)據(jù)庫的·job_function_20190621`表中
input {
    jdbc {
        jdbc_driver_library => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar"    
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://xxx:xxx/data"
        jdbc_user => "xxx"
        jdbc_password => "xxxx"
        statement => "SELECT * FROM xxx_t_job_function_net_bak20140828 WHERE id > :id"
        parameters => { "id" => 666 }
    }
}
filter {
    grok {
        match => {"@timestamp" => "%{YEAR:year}-%{MONTHNUM:month}-%{MONTHDAY:day}" }
        add_field => { "current_version" => "%{year}%{month}%{day}"}
    }
    jdbc_streaming {
        # 加了會錯誤,可能引用了上面的input jdbc_driver_library => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://xxx:3306/xxx"
        jdbc_user => "xxx"
        jdbc_password => "xxx"
        statement => "SELECT location_name_cn FROM dict_location WHERE location_code = :codeParam"
        parameters => { "codeParam" => "code"}
        target => "code"
    }
    if [code] and [code][0] and ("location_name_cn" in [code][0]) {
        ruby {
            code => "
            r = ''
            event.get('code').each do |variable|
               # puts variable['location_name_cn']
               r = r + variable['location_name_cn'] + ';'
            end 
            event.set('code',r)
            "
        }
    } else {
        mutate {
            replace => { "code" => ""}
        }
    }
}
output {
    stdout {
        codec => rubydebug{}        
    }
    jdbc {
        driver_jar_path => "D:\repo\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar"
        driver_class => "com.mysql.jdbc.Driver"
        connection_string => "jdbc:mysql://xxx:8840/xxx"
        username => "xxx"
        password => "xxx"
        statement => ["INSERT INTO job_function_20190621 ( code_val, name_val, level_val, source_name, version ) VALUES (?,?,?,?,?)","code","name","level","source_name","current_version"]
    }
}

最佳實踐

1、整理并調(diào)試好讀取數(shù)據(jù)與插入數(shù)據(jù)的腳本或參數(shù);

2、一般字符串的操作官方給的插件中就可以解決如Date、json、mutate等

3、遇到比較復(fù)雜的操作可以借助ruby,在編譯器中調(diào)試好,直接復(fù)制過來,可以實時調(diào)試;

4、grok為提取日志的核心操作組件

5、注意變量的引用,內(nèi)置參數(shù)為@timestamp

字符串中引用 "the longitude is %{[geoip][location][0]}"
作為判斷的時候可以[filed]使用

https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容