前面我們搭建了一個簡單的ELK日志收集系統(tǒng),可以看到其中的Logstash起的作用。Logstash 是一個實時數(shù)據(jù)收集引擎,可收集各類型數(shù)據(jù)并對其進(jìn)行分析,過濾和歸納。按照自己條件分析過濾出符合數(shù)據(jù)導(dǎo)入到可視化界面。Logstash的功能很強(qiáng)大,遠(yuǎn)不止一個input和一個output那幾行配置起的作用。下面介紹Logstash在經(jīng)典場景中的用法。
簡單模式:以logstash作為日志搜索器
架構(gòu):logstash采集、處理、轉(zhuǎn)發(fā)到elasticsearch存儲,在kibana進(jìn)行展示
特點:這種結(jié)構(gòu)因為需要在各個服務(wù)器上部署 Logstash,而它比較消耗 CPU 和內(nèi)存資源,所以比較適合計算資源豐富的服務(wù)器,否則容易造成服務(wù)器性能下降,甚至可能導(dǎo)致無法正常工作(因為Logstash有點重)。

input和output的配置是比較靈活的,首先看最簡單的一種,從控制臺輸入日志,然后輸出到控制臺:
input { stdin { } }
output{ stdout { } }
前面的內(nèi)容介紹了從文件讀取,輸出到es當(dāng)中,輸出也可以到文件中:
output?{
? ? #輸出到文件
? ? file {
? ? ? ? path => "/logs/app/logstash/all.log" #指定寫入文件路徑
? ? ? ? flush_interval => 0????????????????? # 指定刷新間隔,0代表實時寫入
? ? ? ? codec => json
? ? ?}
}
簡單模式基本上能滿足百分之八九十的公司的日志業(yè)務(wù)了,下面介紹的幾種模式適用于日志量十分龐大的系統(tǒng)。
安全模式:beats(Filebeat、Metricbeat、Packetbeat、Winlogbeat等)作為日志搜集器
因為Logstash有點重,所以就有了beats。Beats 是一個面向輕量型采集器的平臺,這些采集器可從邊緣機(jī)器發(fā)送數(shù)據(jù)。我們從文件收集日志常用的是filebeat。下面是幾種beats:
Packetbeat(搜集網(wǎng)絡(luò)流量數(shù)據(jù));
Topbeat(搜集系統(tǒng)、進(jìn)程和文件系統(tǒng)級別的 CPU 和內(nèi)存使用情況等數(shù)據(jù));
Filebeat(搜集文件數(shù)據(jù))-------最常用
Winlogbeat(搜集 Windows 事件日志數(shù)據(jù))。

從架構(gòu)圖可以看到,我們的應(yīng)用服務(wù)器收集日志的不再是Logstash,而且對應(yīng)的beats,Beats 將搜集到的數(shù)據(jù)發(fā)送到 Logstash,經(jīng) Logstash 解析、過濾后,將其發(fā)送到 Elasticsearch 存儲,并由 Kibana 呈現(xiàn)給用戶。
這種架構(gòu)解決了 Logstash 在各服務(wù)器節(jié)點上占用系統(tǒng)資源高的問題。相比 Logstash,Beats 所占系統(tǒng)的 CPU 和內(nèi)存幾乎可以忽略不計。另外,Beats 和 Logstash 之間支持 SSL/TLS 加密傳輸,客戶端和服務(wù)器雙向認(rèn)證,保證了通信安全。因此這種架構(gòu)適合對數(shù)據(jù)安全性要求較高,同時各服務(wù)器性能比較敏感的場景。?
來看一個filebeat的配置:
#=========== Filebeat prospectors ===========
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log
#--------------------------- Logstash output --------------------------------
output.logstash:
?hosts: ["192.168.80.34:5044"]
Logstash的input配置:
input?{
? ? ?beats {
? ? ? ? port => 5044
? ? ? ? codec => "json"
? ? }
}
Logstash的output配置:
output?{
?#?輸出到控制臺
??? # stdout { }
?#?輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機(jī)地址
??????? port => 6379????????????? # redis端口號
?? ?????password => "123456"????????? # redis 密碼
?????? ?#db => 8?????????????????? # redis數(shù)據(jù)庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
}
#輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id???????? => "test"?
?????? }
#輸出到es
elasticsearch {
??????? hosts => "node18:9200"
??????? codec => json
??????? }
}
從上面可以看出,Logstash不僅可以輸出到es和控制臺,還可以輸出到redis緩存中和kafka中,通過配置可以實現(xiàn)強(qiáng)大的數(shù)據(jù)傳輸功能。
消息模式:Beats 還不支持輸出到消息隊列(新版本除外:5.0版本及以上)
在消息隊列前后兩端只能是 Logstash 實例。logstash從各個數(shù)據(jù)源搜集數(shù)據(jù),不經(jīng)過任何處理轉(zhuǎn)換僅轉(zhuǎn)發(fā)出到消息隊列(kafka、redis、rabbitMQ等),后logstash從消息隊列取數(shù)據(jù)進(jìn)行轉(zhuǎn)換分析過濾,輸出到elasticsearch,并在kibana進(jìn)行圖形化展示。
架構(gòu)(Logstash進(jìn)行日志解析所在服務(wù)器性能各方面必須要足夠好):


模式特點:這種架構(gòu)適合于日志規(guī)模比較龐大的情況。但由于 Logstash 日志解析節(jié)點和 Elasticsearch 的負(fù)荷比較重,可將他們配置為集群模式,以分擔(dān)負(fù)荷。引入消息隊列,均衡了網(wǎng)絡(luò)傳輸,從而降低了網(wǎng)絡(luò)閉塞,尤其是丟失數(shù)據(jù)的可能性,但依然存在 Logstash 占用系統(tǒng)資源過多的問題
工作流程:Filebeat采集—>? logstash轉(zhuǎn)發(fā)到kafka—>? logstash處理從kafka緩存的數(shù)據(jù)進(jìn)行分析—>? 輸出到es—>? 顯示在kibana
從服務(wù)器接收日志的Logstash的配置:
input?{
??? beats {
??? port => 5044
??? codec => "json"
?????? }
??? syslog{
?????? }
}
output?{
??? # 輸出到控制臺
??? # stdout { }
??? # 輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機(jī)地址
??????? port => 6379????????????? # redis端口號
??????? password => "123456"????????? # redis 密碼
?????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
??? }
??#輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id????????? => "test"?
?????? }?????
}
從kafka接收日志到es的Logstash的配置:
input{
??? kafka {
? ? ? ? ? ?bootstrap_servers => "192.168.80.42:9092"
?????? ??? topics????????? => ["test"]
?????? ??? #decroate_events?? => true
? ? ? ? ? ?group_id????????? => "consumer-test"(消費(fèi)組)
?????? ??? #decroate_events? => true
? ? ? ? ? ??auto_offset_reset => "earliest"(初始消費(fèi),相當(dāng)于from beginning,不設(shè)置,相當(dāng)于是監(jiān)控啟動后的kafka的消息生產(chǎn))
? ? ? ? }
}
output?{
?elasticsearch {
?????? hosts => "192.168.80.18:9200"???
?????? codec => json
?????? }
}
消息模式:logstash從kafka消息隊列直接讀取數(shù)據(jù)并處理、輸出到es(因為從kafka內(nèi)部直接讀取,相當(dāng)于是已經(jīng)在緩存內(nèi)部,直接logstash處理后就可以進(jìn)行輸出,輸出到文件、es等)
工作模式:【數(shù)據(jù)已存在kafka對應(yīng)主題內(nèi)】單獨(dú)的logstash,kafka讀取,經(jīng)過處理輸出到es并在kibana進(jìn)行展示
Logstash配置如下:
input{
??? kafka {
? ? ? ? ? bootstrap_servers => "192.168.80.42:9092"
?????? ???? topics??? ??????=> ["test"]
? ? ? ? ? ?group_id????? ?=> "consumer-test"
? ? ? ? ? ?#decroate_events? => true
? ? ? ? ? ? auto_offset_reset => "earliest"
? ? ?}
}?
output?{
?????? elasticsearch {
?????? hosts => "192.168.80.18:9200"
?????? codec => json
?????? }
}
filebeat新版本(5.0以上)支持直接支持輸出到kafka,而無需經(jīng)過logstash接收轉(zhuǎn)發(fā)到kafka

Filebeat采集完畢直接入到kafka消息隊列,進(jìn)而logstash取出數(shù)據(jù),進(jìn)行處理分析輸出到es,并在kibana進(jìn)行展示。
filebeat配置:?
#======== Filebeat prospectors=================
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log?
#-----------------------------kafka? output-----------------------------------
output.kafka:
? hosts: ["192.168.80.42:9092"]
? topic: test
? required_acks: 1
Logstash的配置:
input{
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
?????? ???? topics????????? => ["test"]
???????? group_id?????? => "consumer-test"
?????? ? #decroate_events? => true
?????? auto_offset_reset => "earliest"
? ?}
}
output?{
?????? elasticsearch {
?????? hosts => "192.168.80.18:9200"
?????? codec => json
?????? }
}
SSL加密傳輸(增強(qiáng)安全性,僅配置了秘鑰和證書的filebeat服務(wù)器和logstash服務(wù)器才能進(jìn)行日志文件數(shù)據(jù)的傳輸)
Logstash的配置文件:
ssl_certificate_authorities :filebeat端傳來的證書所在位置
ssl_certificate?=>?本端生成的證書所在的位置
ssl_key?=>?/本端生成的密鑰所在的位置
ssl_verify_mode?=> "force_peer"
input {
??? beats {
??? port => 5044
??? codec => "json"
ssl => true
?? ssl_certificate_authorities => ["/usr/local/logstash-5.6.10/pki/tls/certs/filebeat.crt"]
?? ssl_certificate => "/usr/local/logstash-5.6.10/pki/tls/certs/logstash.crt"
?? ssl_key => "/usr/local/logstash-5.6.10/pki/tls/private/logstash.key"
ssl_verify_mode => "force_peer"#(需與ssl_certificate_authorities一起使用)
?????? }
??? syslog{
?????? }
}
?
output {
??? # 輸出到控制臺
??? # stdout { }
??? # 輸出到redis
??? redis {
??????? host => "192.168.80.32"?? # redis主機(jī)地址
??????? port => 6379????????????? # redis端口號
??????? password => "123456"????????? # redis 密碼
?????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0" ?# 發(fā)布通道名稱
??? }
??? #輸出到kafka
??? kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
??????? topic_id????????? => "test"?
?????? }?????
??? #輸出到es
??? elasticsearch {
?????? hosts => "node18:9200"
?????? codec => json
?????? }
}
filebeat的配置文件:?
#=================== Filebeat prospectors ========================
filebeat.prospectors:?
- input_type: log
?paths:
??? - /home/admin/helloworld/logs/*.log
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
?hosts: ["192.168.80.18:5044"]
#加密傳輸
?ssl.certificate_authorities: ["/usr/local/filebeat-5.6.10/pki/tls/certs/logstash.crt"]
? ssl.certificate: "/usr/local/filebeat-5.6.10/pki/tls/certs/filebeat.crt"
? ssl.key: "/usr/local/filebeat-5.6.10/pki/tls/private/filebeat.key"?
logstash(非filebeat)進(jìn)行文件采集,輸出到kafka緩存,讀取kafka數(shù)據(jù)并處理輸出到文件或es
從文件采集到kafka:
input?{
?file?{
??????? path => [
??????????? # 這里填寫需要監(jiān)控的文件
??????????? "/home/admin/helloworld/logs/catalina.out"
??????? ]
??? }
}
output?{
?kafka?{
??? # 輸出到控制臺
??? # stdout { }
?#?輸出到kafka
??bootstrap_servers => "192.168.80.42:9092"
??? topic_id????????? => "test"
??? }
}
從kafka或者redis讀取數(shù)據(jù)輸出到es或者文件:
input{
#從redis讀取
?redis {
??????? host => "192.168.80.32"?? # redis主機(jī)地址
??????? port => 6379????????????? # redis端口號
?????? password? => "123456"??? ? # redis 密碼
??????? #db => 8?????????????????? # redis數(shù)據(jù)庫編號
??????? data_type => "channel"??? # 使用發(fā)布/訂閱模式
??????? key => "logstash_list_0"? # 發(fā)布通道名稱
}
#從kafka讀取
kafka {
??????? bootstrap_servers => "192.168.80.42:9092"
?????? ??? topics????????? => ["test"]
??????? auto_offset_reset => "earliest"
?????? }
}
?
output?{
??#輸出到文件
??? file {
??????? path => "/usr/local/logstash-5.6.10/data/log/logstash/all1.log" # 指定寫入文件路徑
#?????? message_format => "%{host} %{message}"???????? # 指定寫入格式
??????? flush_interval => 0???????????????????????????? # 指定刷新間隔,0代表實時寫入
?? ? codec => json
?????? }
?#輸出到es
?? elasticsearch {
?????? hosts => "node18:9200"
?????? codec => json
?????? }
}
logstash同步mysql數(shù)據(jù)庫數(shù)據(jù)到es(logstash5版本以上已集成jdbc插件,無需下載安裝,直接使用)
從mysql讀取數(shù)據(jù)到es:
input {
?stdin { }
??? jdbc {
??jdbc_connection_string => "jdbc:mysql://192.168.80.18:3306/fyyq-mysql"
??????? jdbc_user => "fyyq"
??????? jdbc_password => "fyyq@2017"
jdbc_driver_library => "/usr/local/logstash-5.6.10/mysql-connector-java-5.1.46.jar"
???? ???jdbc_driver_class => "com.mysql.jdbc.Driver"
??????? jdbc_paging_enabled => "true"
??????? statement_filepath => "/usr/local/logstash-5.6.10/mysql2es.sql"
??????? #schedule => "* * * * *"
??? }
?}
?
?output {
???? stdout {
??????? codec => json_lines
??? }
??elasticsearch {
??????? hosts => "node18:9200"
??????? #index => "mainIndex"
??????? #document_type => "user"
??????? #document_id => "%{id}"
??? }
}
Logstash-input插件及插件參數(shù)概覽
所有輸入插件都支持以下配置選項:

codec:可選
json?(json格式編解碼器)
msgpack?(msgpack格式編解碼器)
plain(文本格式編解碼器)
multiline(將多行文本event合并成一個event,eg:將java中的異常跟蹤日志合并成一條消)]
常用輸入插件:
1、beat-input:Receives events from the Elastic Beats framework,從框架接收事件? ? Settings:

2、file-input:來自文件的Streams事件(path字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html
3、stdin-input:從標(biāo)準(zhǔn)輸入讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html
4、syslog-input:將syslog消息作為事件讀取
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html
5、tcp-input:從TCP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html
6、udp-input:通過UDP讀取事件(port字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html
7、twitter-input:從Twitter Streaming API讀取事件(相對常用場景)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-twitter.html
(consumer_key、consumer_secret、oauth_token、oauth_token_secret必填項)
8、redis-input:從Redis實例讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-redis.html
(data_type["list", "channel", "pattern_channel"]、key必填項,)
9、kafka-input:從Kafka主題中讀取事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
(參數(shù)過多,自行查看)
10、jdbc-input:從JDBC數(shù)據(jù)創(chuàng)建事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
(jdbc_connection_string、jdbc_driver_class、jdbc_user必填項)
11、http-input:通過HTTP或HTTPS接收事件
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html
12、elasticsearch-input:從Elasticsearch集群讀取查詢結(jié)果
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-elasticsearch.html
13、exec-input:將shell命令的輸出捕獲為事件(command字段必填項)
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-exec.html
非 常用輸入插件:
自行進(jìn)入logstash的插件中心進(jìn)行查看,有需要自行配置
總:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
Logstash-filter插件及插件參數(shù)概覽
所有處理插件均支持的配置:

常用處理插件:
1、 grok-filter:可以將非結(jié)構(gòu)化日志數(shù)據(jù)解析為結(jié)構(gòu)化和可查詢的內(nèi)容
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics
grok模式的語法是%{SYNTAX:SEMANTIC}
SYNTAX是與您的文本匹配的模式的名稱
SEMANTIC是您為匹配的文本提供的標(biāo)識符
grok是通過系統(tǒng)預(yù)定義的正則表達(dá)式或者通過自己定義正則表達(dá)式來匹配日志中的各個值
正則解析式比較容易出錯,建議先調(diào)試(地址):
grok debugger調(diào)試:http://grokdebug.herokuapp.com/
grok事先已經(jīng)預(yù)定義好了許多正則表達(dá)式規(guī)則,該規(guī)則文件存放路徑:
/usr/local/logstash-5.6.10/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns


等等,可自行進(jìn)入查看。
示例一:

初始輸入的message是:

經(jīng)過grok的正則分析后:

示例二:

COMBINEDAPACHELOG的具體內(nèi)容見:
https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/httpd
初始輸入message為:

經(jīng)過grok正則分析后:

示例三(自定義grok表達(dá)式mypattern[A-Z]):

初始輸入message:

經(jīng)過grok正則分析后:

示例四(移除重復(fù)字段):

初始輸入message:

經(jīng)過grok正則解析后(json格式):

示例五(過濾篩選catalina.out文件中的信息,message字段已移除):

【Data在pattern中的定義是:.*? GREEDYDATA在pattern中的定義是:.*】
初始輸入message:

經(jīng)過grok正則解析后(截圖及json格式如下):

常用參數(shù):
1)match:match作用:用來對字段的模式進(jìn)行匹配
2)patterns_dir:用來指定規(guī)則的匹配路徑,如果使用logstash自定義的規(guī)則時,不需要寫此參數(shù)。Patterns_dir可以同時制定多個存放過濾規(guī)則的目錄;

3)remove_field:如果匹配到某個”日志字段,則將匹配的這個日志字段從這條日志中刪除(多個以逗號隔開)

2、 clone-filter:克隆過濾器用于復(fù)制事件
3、? drop-filter:丟棄所有活動
4、? json-filter:解析JSON事件
5、? kv-filter:解析鍵值對
非常用參數(shù):參考教程:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
Logstash-output插件及插件參數(shù)概覽
所有輸出插件均支持以下配置:

常用插件:
1、Elasticsearch-output:此插件是在Elasticsearch中存儲日志的推薦方法。如果您打算使用Kibana Web界面,則需要使用此輸出
2、file-output:此輸出將事件寫入磁盤上的文件(path字段必填項)
3、kafka-output:將事件寫入Kafka主題(topic_id是必填項)
4、 redis-output:此輸出將使用RPUSH將事件發(fā)送到Redis隊列
5、stdout-output:一個簡單的輸出,打印到運(yùn)行Logstash的shell的STDOUT
非常用插件:參考官網(wǎng)教程鏈接:https://www.elastic.co/guide/en/logstash/current/output-plugins.html
文章原文鏈接:https://www.cnblogs.com/qingqing74647464/p/9378385.html
我們的交流基地,“JAVA互聯(lián)網(wǎng)技術(shù)交流:789650498”歡迎小伙伴們一起來交流:
