前言
大家應該都聽說過ELK,一般ELK都是用來做分布式系統(tǒng)的集中日志管理,ELK的優(yōu)點這里就不介紹了,好處太多,今天主要介紹下其中的Logstash。數(shù)據(jù)傳給logstash,它將數(shù)據(jù)進行過濾和格式化(轉(zhuǎn)成JSON格式),然后傳給數(shù)據(jù)存儲或者消息隊列Broker,用于后續(xù)加工處理。
最近要做app埋點監(jiān)控,app將埋點日志發(fā)送到埋點日志網(wǎng)關(guān),在埋點日志網(wǎng)關(guān)通過部署logstash,將日志發(fā)送到logstash,再由logstash發(fā)送到kafka,最后由kafka入mongodb,由大數(shù)據(jù)系統(tǒng)定時跑批,將埋點統(tǒng)計結(jié)果加工出來提供給前端查詢。這里就記錄下對于springboot的項目如何和logstash集成,完成日志數(shù)據(jù)通過logstash對外輸出。
logstash官網(wǎng) https://www.elastic.co/products/logstash
logstash安裝&配置
logstash的安裝流程這里就不介紹了,大家可以看下官網(wǎng)的guide
下面主要來說下logstash的配置,logstash一般常用的有兩種模式,一種是應用服務器寫本地日志,在logstash中配置讀取本地日志文件,對指定的日志文件進行數(shù)據(jù)抽取,這種模式一般適用于一般的應用或者系統(tǒng)日志采集到中央日志系統(tǒng);還有一種模式是將應用系統(tǒng)需要大數(shù)據(jù)系統(tǒng)分析的數(shù)據(jù)直接將信息流直接發(fā)送給logstash進行JSON格式化,不需要在應用系統(tǒng)本地先落地文件,這種模式一般適用于大數(shù)據(jù)系統(tǒng)從應用系統(tǒng)采集數(shù)據(jù),看起來有點像kafka這類消息隊列的功能,從消息流傳遞角度看,確實比較類似,但是logstash最大的特點還在于可以對收到的數(shù)據(jù)進行各種形式的流式加工,加工成上游大數(shù)據(jù)系統(tǒng)需要的數(shù)據(jù),這樣可以方便大數(shù)據(jù)系統(tǒng)進行二次加工,有點類似流水線的意思。
下面我們看下logstash的config文件,從config文件中可以看出logstash的主要處理流程,配置主要分為3部分:input、filter、output。這三部份的配置,從名字也很好理解含義:
input就是輸入,從這個例子中可以看出配置的tcp網(wǎng)絡信息流的模式,這樣應用系統(tǒng)本地數(shù)據(jù)就不落地了,可以減少系統(tǒng)本地的磁盤空間占用,也減少了系統(tǒng)的磁盤IO壓力;當然input除了tcp模式還有file模式,也很簡單,配置的內(nèi)容當然就是文件的路徑信息,編碼模式之類的都是一樣的。
filter就是過濾,對于從input輸入的數(shù)據(jù),在filter層可以通過配置ruby腳本很輕易的將輸入數(shù)據(jù)加工成輸出需要的數(shù)據(jù)格式,增加字段、刪減字段之類都是很簡單的應用。
output當然就是輸出,從官網(wǎng)的介紹就很容易了解到logstash不管是輸入還是輸出都支持很多種格式,有點類似linux里的pipline的意思,不管輸入是什么都可以輸出到下一個管道,例子中的output是輸出到kafka集群,當然也可以輸出的ES,或者數(shù)據(jù)庫。
## logtash.conf
input {
tcp {
host => "10.xxx.xx.xx"
port => 9250
mode => "server"
tags => ["tags"]
codec => plain{charset=>"UTF-8"}
}
}
filter {
ruby{
code => "event['readunixtime']= event.timestamp.time.getlocal.to_f.to_s"
add_field =>{
"app_name"=>"xxx_sdk_apm"
"app_stage"=>"dev"
"readtimestamp"=>"%{@timestamp}"
}
}
}
# output { stdout { codec => rubydebug } }
output {
kafka{
# 大數(shù)據(jù)kafka集群地址
bootstrap_servers=> "10.132.XX.XX:9092,10.132.XX.XX:9092,10.132.XX.XX:9092,10.132.37.XX:9092"
# 分配給應用的topic
topic_id=> "xx_xx_apm"
# 異步傳輸,不保證消息一定傳輸,速度最快
acks=>"0"
# 單位是字節(jié),16k
batch_size=>16384
codec=>json
}
}
logback配置
上面介紹了logstash的配置,下面就到了本文的重頭戲,那就是在springboot中集成logstash,當然logstash為我們提供了常見的日志框架的encoder,可以方便我們將各種日志框架的日志輸出到logstash中作為input。下面介紹下我這次做的本地不落日志,直接將logback的日志輸出的logstash中,
發(fā)送logstash input本地不留存日志。
首先引入pom依賴
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.1</version>
</dependency>
在logback的xml配置文件中只需要配置下appender就可以了,例子中使用的是LogstashTcpSocketAppender,當然還提供其他的appender,具體用法可以查看https://github.com/logstash/logstash-logback-encoder,如圖:

logback中的配置也很簡單,只需要配置logstash的ip地址和端口和encoder的編碼類就可以了。
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>10.132.xx.xx:9250</destination>
<queueSize>1048576</queueSize>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<logger name="APMInfoDev" level="INFO" additivity="false">
<appender-ref ref="LOGSTASH"/>
</logger>
因為是單獨定義了logger不是root logger,所以在使用的時候需要通過LoggerFactory指定logger name去獲取logger:
Logger apmInfoLogger = LoggerFactory.getLogger("APMInfoDev");
apmInfoLogger.info("XXXXXX");
好了下面就可以去logstash輸出的存儲或者隊列中查看輸出的日志信息了。