SpringBoot集成logstash

前言

大家應該都聽說過ELK,一般ELK都是用來做分布式系統(tǒng)的集中日志管理,ELK的優(yōu)點這里就不介紹了,好處太多,今天主要介紹下其中的Logstash。數(shù)據(jù)傳給logstash,它將數(shù)據(jù)進行過濾和格式化(轉(zhuǎn)成JSON格式),然后傳給數(shù)據(jù)存儲或者消息隊列Broker,用于后續(xù)加工處理。

最近要做app埋點監(jiān)控,app將埋點日志發(fā)送到埋點日志網(wǎng)關(guān),在埋點日志網(wǎng)關(guān)通過部署logstash,將日志發(fā)送到logstash,再由logstash發(fā)送到kafka,最后由kafka入mongodb,由大數(shù)據(jù)系統(tǒng)定時跑批,將埋點統(tǒng)計結(jié)果加工出來提供給前端查詢。這里就記錄下對于springboot的項目如何和logstash集成,完成日志數(shù)據(jù)通過logstash對外輸出。

logstash官網(wǎng) https://www.elastic.co/products/logstash

logstash安裝&配置

logstash的安裝流程這里就不介紹了,大家可以看下官網(wǎng)的guide

下面主要來說下logstash的配置,logstash一般常用的有兩種模式,一種是應用服務器寫本地日志,在logstash中配置讀取本地日志文件,對指定的日志文件進行數(shù)據(jù)抽取,這種模式一般適用于一般的應用或者系統(tǒng)日志采集到中央日志系統(tǒng);還有一種模式是將應用系統(tǒng)需要大數(shù)據(jù)系統(tǒng)分析的數(shù)據(jù)直接將信息流直接發(fā)送給logstash進行JSON格式化,不需要在應用系統(tǒng)本地先落地文件,這種模式一般適用于大數(shù)據(jù)系統(tǒng)從應用系統(tǒng)采集數(shù)據(jù),看起來有點像kafka這類消息隊列的功能,從消息流傳遞角度看,確實比較類似,但是logstash最大的特點還在于可以對收到的數(shù)據(jù)進行各種形式的流式加工,加工成上游大數(shù)據(jù)系統(tǒng)需要的數(shù)據(jù),這樣可以方便大數(shù)據(jù)系統(tǒng)進行二次加工,有點類似流水線的意思。

下面我們看下logstash的config文件,從config文件中可以看出logstash的主要處理流程,配置主要分為3部分:input、filter、output。這三部份的配置,從名字也很好理解含義:

input就是輸入,從這個例子中可以看出配置的tcp網(wǎng)絡信息流的模式,這樣應用系統(tǒng)本地數(shù)據(jù)就不落地了,可以減少系統(tǒng)本地的磁盤空間占用,也減少了系統(tǒng)的磁盤IO壓力;當然input除了tcp模式還有file模式,也很簡單,配置的內(nèi)容當然就是文件的路徑信息,編碼模式之類的都是一樣的。

filter就是過濾,對于從input輸入的數(shù)據(jù),在filter層可以通過配置ruby腳本很輕易的將輸入數(shù)據(jù)加工成輸出需要的數(shù)據(jù)格式,增加字段、刪減字段之類都是很簡單的應用。

output當然就是輸出,從官網(wǎng)的介紹就很容易了解到logstash不管是輸入還是輸出都支持很多種格式,有點類似linux里的pipline的意思,不管輸入是什么都可以輸出到下一個管道,例子中的output是輸出到kafka集群,當然也可以輸出的ES,或者數(shù)據(jù)庫。

## logtash.conf
input {
  tcp {
    host => "10.xxx.xx.xx"
    port => 9250
    mode => "server"
    tags => ["tags"]
    codec => plain{charset=>"UTF-8"}
    }
}
filter {
    ruby{
     code => "event['readunixtime']= event.timestamp.time.getlocal.to_f.to_s"
     add_field =>{
        "app_name"=>"xxx_sdk_apm"
        "app_stage"=>"dev"
        "readtimestamp"=>"%{@timestamp}"
        }
     }
}
# output { stdout { codec => rubydebug } }
output {
    kafka{
# 大數(shù)據(jù)kafka集群地址
    bootstrap_servers=> "10.132.XX.XX:9092,10.132.XX.XX:9092,10.132.XX.XX:9092,10.132.37.XX:9092"
# 分配給應用的topic
    topic_id=> "xx_xx_apm"
# 異步傳輸,不保證消息一定傳輸,速度最快
    acks=>"0"
# 單位是字節(jié),16k
    batch_size=>16384
    codec=>json
  }
}

logback配置

上面介紹了logstash的配置,下面就到了本文的重頭戲,那就是在springboot中集成logstash,當然logstash為我們提供了常見的日志框架的encoder,可以方便我們將各種日志框架的日志輸出到logstash中作為input。下面介紹下我這次做的本地不落日志,直接將logback的日志輸出的logstash中,

發(fā)送logstash input本地不留存日志。

首先引入pom依賴

<dependency>
   <groupId>net.logstash.logback</groupId>
   <artifactId>logstash-logback-encoder</artifactId>
   <version>5.1</version>
</dependency>

在logback的xml配置文件中只需要配置下appender就可以了,例子中使用的是LogstashTcpSocketAppender,當然還提供其他的appender,具體用法可以查看https://github.com/logstash/logstash-logback-encoder,如圖:

Screenshot 2018-07-24 15.40.35

logback中的配置也很簡單,只需要配置logstash的ip地址和端口和encoder的編碼類就可以了。

<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
    <destination>10.132.xx.xx:9250</destination>
    <queueSize>1048576</queueSize>
    <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
   
<logger name="APMInfoDev" level="INFO" additivity="false">
   <appender-ref ref="LOGSTASH"/>
</logger> 

因為是單獨定義了logger不是root logger,所以在使用的時候需要通過LoggerFactory指定logger name去獲取logger:

Logger apmInfoLogger = LoggerFactory.getLogger("APMInfoDev");

apmInfoLogger.info("XXXXXX");

好了下面就可以去logstash輸出的存儲或者隊列中查看輸出的日志信息了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容