ELK快速入門(03)Logstash高級配置

前面我們搭建了一個簡單的ELK日志收集系統(tǒng),可以看到其中的Logstash起的作用。Logstash 是一個實時數(shù)據(jù)收集引擎,可收集各類型數(shù)據(jù)并對其進(jìn)行分析,過濾和歸納。按照自己條件分析過濾出符合數(shù)據(jù)導(dǎo)入到可視化界面。Logstash的功能很強(qiáng)大,遠(yuǎn)不止一個input和一個output那幾行配置起的作用。下面介紹Logstash在經(jīng)典場景中的用法。










簡單模式:以logstash作為日志搜索器

架構(gòu):logstash采集、處理、轉(zhuǎn)發(fā)到elasticsearch存儲,在kibana進(jìn)行展示

特點:這種結(jié)構(gòu)因為需要在各個服務(wù)器上部署 Logstash,而它比較消耗 CPU 和內(nèi)存資源,所以比較適合計算資源豐富的服務(wù)器,否則容易造成服務(wù)器性能下降,甚至可能導(dǎo)致無法正常工作(因為Logstash有點重)。

input和output的配置是比較靈活的,首先看最簡單的一種,從控制臺輸入日志,然后輸出到控制臺:

input { stdin { } }

output{ stdout { } }

前面的內(nèi)容介紹了從文件讀取,輸出到es當(dāng)中,輸出也可以到文件中:

output?{

? ? #輸出到文件

? ? file {

? ? ? ? path => "/logs/app/logstash/all.log" #指定寫入文件路徑

? ? ? ? flush_interval => 0????????????????? # 指定刷新間隔,0代表實時寫入

? ? ? ? codec => json

? ? ?}

}

簡單模式基本上能滿足百分之八九十的公司的日志業(yè)務(wù)了,下面介紹的幾種模式適用于日志量十分龐大的系統(tǒng)。











安全模式:beats(FilebeatMetricbeat、Packetbeat、Winlogbeat等)作為日志搜集器

因為Logstash有點重,所以就有了beats。Beats 是一個面向輕量型采集器的平臺,這些采集器可從邊緣機(jī)器發(fā)送數(shù)據(jù)。我們從文件收集日志常用的是filebeat。下面是幾種beats:

Packetbeat(搜集網(wǎng)絡(luò)流量數(shù)據(jù));

Topbeat(搜集系統(tǒng)、進(jìn)程和文件系統(tǒng)級別的 CPU 和內(nèi)存使用情況等數(shù)據(jù));

Filebeat(搜集文件數(shù)據(jù))-------最常用

Winlogbeat(搜集 Windows 事件日志數(shù)據(jù))。

從架構(gòu)圖可以看到,我們的應(yīng)用服務(wù)器收集日志的不再是Logstash,而且對應(yīng)的beats,Beats 將搜集到的數(shù)據(jù)發(fā)送到 Logstash,經(jīng) Logstash 解析、過濾后,將其發(fā)送到 Elasticsearch 存儲,并由 Kibana 呈現(xiàn)給用戶。

這種架構(gòu)解決了 Logstash 在各服務(wù)器節(jié)點上占用系統(tǒng)資源高的問題。相比 Logstash,Beats 所占系統(tǒng)的 CPU 和內(nèi)存幾乎可以忽略不計。另外,Beats 和 Logstash 之間支持 SSL/TLS 加密傳輸,客戶端和服務(wù)器雙向認(rèn)證,保證了通信安全。因此這種架構(gòu)適合對數(shù)據(jù)安全性要求較高,同時各服務(wù)器性能比較敏感的場景。?

來看一個filebeat的配置:

#=========== Filebeat prospectors ===========

filebeat.prospectors:?

- input_type: log

?paths:

??? - /home/admin/helloworld/logs/*.log

#--------------------------- Logstash output --------------------------------

output.logstash:

?hosts: ["192.168.80.34:5044"]

Logstash的input配置:

input?{

? ? ?beats {

? ? ? ? port => 5044

? ? ? ? codec => "json"

? ? }

}

Logstash的output配置:

output?{

?#?輸出到控制臺

??? # stdout { }

?#?輸出到redis

??? redis {

??????? host => "192.168.80.32"?? # redis主機(jī)地址

??????? port => 6379????????????? # redis端口號

?? ?????password => "123456"????????? # redis 密碼

?????? ?#db => 8?????????????????? # redis數(shù)據(jù)庫編號

??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式

??????? key => "logstash_list_0"? # 發(fā)布通道名稱

}

#輸出到kafka

??? kafka {

??????? bootstrap_servers => "192.168.80.42:9092"

??????? topic_id???????? => "test"?

?????? }

#輸出到es

elasticsearch {

??????? hosts => "node18:9200"

??????? codec => json

??????? }

}

從上面可以看出,Logstash不僅可以輸出到es和控制臺,還可以輸出到redis緩存中和kafka中,通過配置可以實現(xiàn)強(qiáng)大的數(shù)據(jù)傳輸功能。










消息模式:Beats 還不支持輸出到消息隊列新版本除外:5.0版本及以上

在消息隊列前后兩端只能是 Logstash 實例。logstash從各個數(shù)據(jù)源搜集數(shù)據(jù),不經(jīng)過任何處理轉(zhuǎn)換僅轉(zhuǎn)發(fā)出到消息隊列(kafka、redis、rabbitMQ等),后logstash從消息隊列取數(shù)據(jù)進(jìn)行轉(zhuǎn)換分析過濾,輸出到elasticsearch,并在kibana進(jìn)行圖形化展示。

架構(gòu)(Logstash進(jìn)行日志解析所在服務(wù)器性能各方面必須要足夠好):

模式特點這種架構(gòu)適合于日志規(guī)模比較龐大的情況。但由于 Logstash 日志解析節(jié)點和 Elasticsearch 的負(fù)荷比較重,可將他們配置為集群模式,以分擔(dān)負(fù)荷。引入消息隊列,均衡了網(wǎng)絡(luò)傳輸,從而降低了網(wǎng)絡(luò)閉塞,尤其是丟失數(shù)據(jù)的可能性,但依然存在 Logstash 占用系統(tǒng)資源過多的問題

工作流程:Filebeat采集—>? logstash轉(zhuǎn)發(fā)到kafka—>? logstash處理從kafka緩存的數(shù)據(jù)進(jìn)行分析—>? 輸出到es—>? 顯示在kibana

從服務(wù)器接收日志的Logstash的配置:

input?{

??? beats {

??? port => 5044

??? codec => "json"

?????? }

??? syslog{

?????? }

}

output?{

??? # 輸出到控制臺

??? # stdout { }

??? # 輸出到redis

??? redis {

??????? host => "192.168.80.32"?? # redis主機(jī)地址

??????? port => 6379????????????? # redis端口號

??????? password => "123456"????????? # redis 密碼

?????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號

??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式

??????? key => "logstash_list_0"? # 發(fā)布通道名稱

??? }

??#輸出到kafka

??? kafka {

??????? bootstrap_servers => "192.168.80.42:9092"

??????? topic_id????????? => "test"?

?????? }?????

}

從kafka接收日志到es的Logstash的配置:

input{

??? kafka {

? ? ? ? ? ?bootstrap_servers => "192.168.80.42:9092"

?????? ??? topics????????? => ["test"]

?????? ??? #decroate_events?? => true

? ? ? ? ? ?group_id????????? => "consumer-test"(消費(fèi)組)

?????? ??? #decroate_events? => true

? ? ? ? ? ??auto_offset_reset => "earliest"(初始消費(fèi),相當(dāng)于from beginning,不設(shè)置,相當(dāng)于是監(jiān)控啟動后的kafka的消息生產(chǎn))

? ? ? ? }

}

output?{

?elasticsearch {

?????? hosts => "192.168.80.18:9200"???

?????? codec => json

?????? }

}











消息模式:logstash從kafka消息隊列直接讀取數(shù)據(jù)并處理、輸出到es(因為從kafka內(nèi)部直接讀取,相當(dāng)于是已經(jīng)在緩存內(nèi)部,直接logstash處理后就可以進(jìn)行輸出,輸出到文件、es等)

工作模式:【數(shù)據(jù)已存在kafka對應(yīng)主題內(nèi)】單獨(dú)的logstash,kafka讀取,經(jīng)過處理輸出到es并在kibana進(jìn)行展示

Logstash配置如下:

input{

??? kafka {

? ? ? ? ? bootstrap_servers => "192.168.80.42:9092"

?????? ???? topics??? ??????=> ["test"]

? ? ? ? ? ?group_id????? ?=> "consumer-test"

? ? ? ? ? ?#decroate_events? => true

? ? ? ? ? ? auto_offset_reset => "earliest"

? ? ?}

}?

output?{

?????? elasticsearch {

?????? hosts => "192.168.80.18:9200"

?????? codec => json

?????? }

}













filebeat新版本(5.0以上)支持直接支持輸出到kafka,而無需經(jīng)過logstash接收轉(zhuǎn)發(fā)到kafka

Filebeat采集完畢直接入到kafka消息隊列,進(jìn)而logstash取出數(shù)據(jù),進(jìn)行處理分析輸出到es,并在kibana進(jìn)行展示。

filebeat配置:?

#======== Filebeat prospectors=================

filebeat.prospectors:?

- input_type: log

?paths:

??? - /home/admin/helloworld/logs/*.log?

#-----------------------------kafka? output-----------------------------------

output.kafka:

? hosts: ["192.168.80.42:9092"]

? topic: test

? required_acks: 1

Logstash的配置:

input{

??? kafka {

??????? bootstrap_servers => "192.168.80.42:9092"

?????? ???? topics????????? => ["test"]

???????? group_id?????? => "consumer-test"

?????? ? #decroate_events? => true

?????? auto_offset_reset => "earliest"

? ?}

}

output?{

?????? elasticsearch {

?????? hosts => "192.168.80.18:9200"

?????? codec => json

?????? }

}











SSL加密傳輸(增強(qiáng)安全性,僅配置了秘鑰和證書的filebeat服務(wù)器和logstash服務(wù)器才能進(jìn)行日志文件數(shù)據(jù)的傳輸)

Logstash的配置文件:

ssl_certificate_authorities :filebeat端傳來的證書所在位置

ssl_certificate?=>?本端生成的證書所在的位置

ssl_key?=>?/本端生成的密鑰所在的位置

ssl_verify_mode?=> "force_peer"

input {

??? beats {

??? port => 5044

??? codec => "json"

ssl => true

?? ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]

?? ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"

?? ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"

ssl_verify_mode => "force_peer"#(需與ssl_certificate_authorities一起使用

?????? }

??? syslog{

?????? }

}

?

output {

??? # 輸出到控制臺

??? # stdout { }

??? # 輸出到redis

??? redis {

??????? host => "192.168.80.32"?? # redis主機(jī)地址

??????? port => 6379????????????? # redis端口號

??????? password => "123456"????????? # redis 密碼

?????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號

??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式

??????? key => "logstash_list_0" ?# 發(fā)布通道名稱

??? }

??? #輸出到kafka

??? kafka {

??????? bootstrap_servers => "192.168.80.42:9092"

??????? topic_id????????? => "test"?

?????? }?????

??? #輸出到es

??? elasticsearch {

?????? hosts => "node18:9200"

?????? codec => json

?????? }

}

filebeat的配置文件:?

#=================== Filebeat prospectors ========================

filebeat.prospectors:?

- input_type: log

?paths:

??? - /home/admin/helloworld/logs/*.log

#----------------------------- Logstash output --------------------------------

output.logstash:

# The Logstash hosts

?hosts: ["192.168.80.18:5044"]

#加密傳輸

?ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]

? ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"

? ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key"?













logstash(非filebeat)進(jìn)行文件采集,輸出到kafka緩存,讀取kafka數(shù)據(jù)并處理輸出到文件或es

從文件采集到kafka:

input?{

?file?{

??????? path => [

??????????? # 這里填寫需要監(jiān)控的文件

??????????? "/home/admin/helloworld/logs/catalina.out"

??????? ]

??? }

}

output?{

?kafka?{

??? # 輸出到控制臺

??? # stdout { }

?#?輸出到kafka

??bootstrap_servers => "192.168.80.42:9092"

??? topic_id????????? => "test"

??? }

}

從kafka或者redis讀取數(shù)據(jù)輸出到es或者文件:

input{

#從redis讀取

?redis {

??????? host => "192.168.80.32"?? # redis主機(jī)地址

??????? port => 6379????????????? # redis端口號

?????? password? => "123456"??? ? # redis 密碼

??????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號

??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式

??????? key => "logstash_list_0"? # 發(fā)布通道名稱

}

#從kafka讀取

kafka {

??????? bootstrap_servers => "192.168.80.42:9092"

?????? ??? topics????????? => ["test"]

??????? auto_offset_reset => "earliest"

?????? }

}

?

output?{

??#輸出到文件

??? file {

??????? path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定寫入文件路徑

#?????? message_format => "%{host} %{message}"???????? # 指定寫入格式

??????? flush_interval => 0???????????????????????????? # 指定刷新間隔,0代表實時寫入

?? ? codec => json

?????? }

?#輸出到es

?? elasticsearch {

?????? hosts => "node18:9200"

?????? codec => json

?????? }

}











logstash同步mysql數(shù)據(jù)庫數(shù)據(jù)到es(logstash5版本以上已集成jdbc插件,無需下載安裝,直接使用)

從mysql讀取數(shù)據(jù)到es:

input {

?stdin { }

??? jdbc {

??jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"

??????? jdbc_user => "fyyq"

??????? jdbc_password => "fyyq@2017"

jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"

???? ???jdbc_driver_class => "com.mysql.jdbc.Driver"

??????? jdbc_paging_enabled => "true"

??????? statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"

??????? #schedule => "* * * * *"

??? }

?}

?

?output {

???? stdout {

??????? codec => json_lines

??? }

??elasticsearch {

??????? hosts => "node18:9200"

??????? #index => "mainIndex"

??????? #document_type => "user"

??????? #document_id => "%{id}"

??? }

}










Logstash-input插件及插件參數(shù)概覽

所有輸入插件都支持以下配置選項:

codec:可選

json?(json格式編解碼器)

msgpack?(msgpack格式編解碼器)

plain(文本格式編解碼器)

multiline(將多行文本event合并成一個event,eg:將java中的異常跟蹤日志合并成一條消)]

常用輸入插件:

1、beat-input:Receives events from the Elastic Beats framework,從框架接收事件? ? Settings:

2、file-input:來自文件的Streams事件(path字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html


3、stdin-input:從標(biāo)準(zhǔn)輸入讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html


4、syslog-input:將syslog消息作為事件讀取

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html


5、tcp-input:從TCP讀取事件(port字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html


6、udp-input:通過UDP讀取事件(port字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html


7、twitter-input:從Twitter Streaming API讀取事件(相對常用場景)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html

consumer_keyconsumer_secret、oauth_tokenoauth_token_secret必填項)


8、redis-input:從Redis實例讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html

data_type["list", "channel", "pattern_channel"]、key必填項,)


9、kafka-input:從Kafka主題中讀取事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

(參數(shù)過多,自行查看)


10、jdbc-input:從JDBC數(shù)據(jù)創(chuàng)建事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html

jdbc_connection_string、jdbc_driver_class、jdbc_user必填項)


11、http-input:通過HTTP或HTTPS接收事件

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html


12、elasticsearch-input:從Elasticsearch集群讀取查詢結(jié)果

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html


13、exec-input:將shell命令的輸出捕獲為事件(command字段必填項)

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html


非 常用輸入插件:

自行進(jìn)入logstash的插件中心進(jìn)行查看,有需要自行配置

總:https://www.elastic.co/guide/en/logstash/current/input-plugins.html











Logstash-filter插件及插件參數(shù)概覽

所有處理插件均支持的配置:

常用處理插件:

1、 grok-filter:可以將非結(jié)構(gòu)化日志數(shù)據(jù)解析為結(jié)構(gòu)化和可查詢的內(nèi)容

https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics

grok模式的語法是%{SYNTAX:SEMANTIC}

SYNTAX是與您的文本匹配的模式的名稱

SEMANTIC是您為匹配的文本提供的標(biāo)識符

grok是通過系統(tǒng)預(yù)定義的正則表達(dá)式或者通過自己定義正則表達(dá)式來匹配日志中的各個值

正則解析式比較容易出錯,建議先調(diào)試(地址):

grok debugger調(diào)試:http://grokdebug.herokuapp.com/


grok事先已經(jīng)預(yù)定義好了許多正則表達(dá)式規(guī)則,該規(guī)則文件存放路徑:

/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns

等等,可自行進(jìn)入查看。

示例一:

初始輸入的message是:

經(jīng)過grok的正則分析后:

示例二:

COMBINEDAPACHELOG的具體內(nèi)容見:

https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd

初始輸入message為:

經(jīng)過grok正則分析后:

示例三(自定義grok表達(dá)式mypattern[A-Z]):

初始輸入message:

經(jīng)過grok正則分析后:

示例四(移除重復(fù)字段):

初始輸入message:

經(jīng)過grok正則解析后(json格式):

示例五(過濾篩選catalina.out文件中的信息,message字段已移除):

【Data在pattern中的定義是:.*? GREEDYDATA在pattern中的定義是:.*】


初始輸入message:

經(jīng)過grok正則解析后(截圖及json格式如下):

常用參數(shù):

1)match:match作用:用來對字段的模式進(jìn)行匹配

2)patterns_dir:用來指定規(guī)則的匹配路徑,如果使用logstash自定義的規(guī)則時,不需要寫此參數(shù)。Patterns_dir可以同時制定多個存放過濾規(guī)則的目錄;

3)remove_field:如果匹配到某個”日志字段,則將匹配的這個日志字段從這條日志中刪除(多個以逗號隔開)

2、 clone-filter:克隆過濾器用于復(fù)制事件

3、? drop-filter:丟棄所有活動

4、? json-filter:解析JSON事件

5、? kv-filter:解析鍵值對


非常用參數(shù):參考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html










Logstash-output插件及插件參數(shù)概覽

所有輸出插件均支持以下配置:

常用插件:

1、Elasticsearch-output:此插件是在Elasticsearch中存儲日志的推薦方法。如果您打算使用Kibana Web界面,則需要使用此輸出


2、file-output:此輸出將事件寫入磁盤上的文件(path字段必填項)


3、kafka-output:將事件寫入Kafka主題(topic_id是必填項)


4、 redis-output:此輸出將使用RPUSH將事件發(fā)送到Redis隊列


5、stdout-output:一個簡單的輸出,打印到運(yùn)行Logstash的shell的STDOUT


非常用插件:參考官網(wǎng)教程鏈接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html





文章原文鏈接:https://www.cnblogs.com/qingqing74647464/p/9378385.html



















我們的交流基地,“JAVA互聯(lián)網(wǎng)技術(shù)交流:789650498”歡迎小伙伴們一起來交流:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容