目前最為主流的容器編排工具主要有kubernetes、mesos、swarm,個人不評價誰好誰壞因為每個東西都有自己的優(yōu)勢。不過個人認(rèn)為目前關(guān)注度最高的應(yīng)該當(dāng)屬kubernetes,現(xiàn)在越來越多的公司采用kubernetes作為底層編排工具開發(fā)自己的容器調(diào)度平臺。既然是一個PAAS平臺那么就應(yīng)該提供一個計算監(jiān)控等一體的服務(wù),因為是在kubernetes運行上面的容器大多數(shù)都是無狀態(tài)服務(wù),所以統(tǒng)一的日志管理又是其中必不可少的一部分。下面我們就講一下如何基于filebeat開發(fā)屬于自己的日志采集。
目前用的最多的日志管理技術(shù)應(yīng)該是ELK,E應(yīng)該沒有太多的疑問基本上很多公司都是采用的這個作為存儲索引引擎。L及l(fā)ogstash是一個日志采集工具支持文件采集等多種方式,但是基于容器的日志采集又跟傳統(tǒng)的文件采方式略有不同,雖然docker本身提供了一些log driver但是還是無法很好的滿足我們的需求?,F(xiàn)在kubernetes官方有一個日志解決方案是基于fluentd的。至于為什么最后選擇采用filebeat而沒有用fluentd主要有一下幾點:
- 首先filebeat是go寫的,我本身是go開發(fā),fluentd是ruby寫的很抱歉我看不太懂
- filbeat比較輕量,filbeat現(xiàn)在功能雖然比較簡單但是已經(jīng)基本上夠用,而且打出來鏡像只有幾十M
- filbeat性能比較好,沒有具體跟fluentd對比過,之前跟logstash對比過確實比logstash好不少,logtash也是ruby寫的我想應(yīng)該會比fluentd好不少
- filbeat雖然功能簡單,但是代碼結(jié)構(gòu)非常易于進(jìn)行定制開發(fā)
- 還有就是雖然用了很久fluentd但是fluentd的配置文件實在是讓我很難懂
filebeat如何采集kubernetes日志
所以基于以上幾點決定采用filebeat開發(fā)了自己的日志采集。
filebeat的Github地址是https://github.com/elastic/beats里面囊括了好幾個項目其中就包括filebeat。
和其他的日志采集處理一樣filebeat也有幾個部分分別是input、processors、output,不過filebeat提供的能力還比較少,不過無所謂夠用就好。
filebeat提供了一個add_kubernetes_metadata的processor,文件的采集路徑就要配成/var/lib/docker/containers/*/*-json.log主要是監(jiān)聽kubernetes的apiserver把容器對應(yīng)的pod的信息存到內(nèi)存里面,從文件日志source里面(就是上面的那個路徑)里面獲取容器id匹配得到pod的信息。
因為json.log文件里面的日志都是json格式的所以需要對日志進(jìn)行json格式化,filebeat有一個processor叫decode_json_fields這些processor都支持條件判斷,可以通過條件判斷來絕對是否要對某一條日志進(jìn)行處理。filebeat默認(rèn)的日志字段是message但是*-json.log解析出來以后的日志字段是log,如果同時配置了其他的日志采集這個時候所用的存儲日志的字段就不一樣了,所以需要對它們進(jìn)行處理讓它們使用同一個字段,但是filebeat并沒有提供這個功能所以自己寫了一個add_fields的功能。
整理后的配置文件如下:
filebeat.prospectors:
- type: log
paths:
- /var/lib/docker/containers/*/*-json.log
- /var/log/containers/applogs/*
processors:
- add_kubernetes_metadata:
in_cluster: false
host: "127.0.0.1"
kube_config: /root/.kube/config
- add_fields:
fields:
log: '{message}'
- decode_json_fields:
when:
regexp:
log: "{*}"
fields: ["log"]
overwrite_keys: true
target: ""
- drop_fields:
fields: ["source", "beat.version", "beat.name", "message"]
- parse_level:
levels: ["fatal", "error", "warn", "info", "debug"]
field: "log"
logging.level: info
setup.template.enabled: true
setup.template.name: "filebeat-%{+yyyy.MM.dd}"
setup.template.pattern: "filebeat-*"
#setup.template.fields: "${path.config}/fields.yml"
setup.template.fields: "/fields.yml"
setup.template.overwrite: true
setup.template.settings:
index:
analysis:
analyzer:
enncloud_analyzer:
filter: ["standard", "lowercase", "stop"]
char_filter: ["my_filter"]
type: custom
tokenizer: standard
char_filter:
my_filter:
type: mapping
mappings: ["-=>_"]
output:
elasticsearch:
hosts: ["127.0.0.1:9200"]
index: "filebeat-%{+yyyy.MM.dd}"
如果線上環(huán)境filebeat也是以daemonset的方式運行在kubernetes集群里面,所以in_cluster就需要設(shè)置成true,對應(yīng)的kube_config則不需要配置了,host參數(shù)則是監(jiān)聽的某一個節(jié)點的pod,所以這個值應(yīng)該是filebeat運行所在節(jié)點的pod的名稱,當(dāng)然也可以不寫,那樣的話就是監(jiān)聽全局的pod,不過這個對于filebeat來說是沒必要的也是不好的。
add_fieldsprocessor可以添加自己想要的字段,值可以是字符串也可以是{message}格式,如果是這種格式則會從已有的字段里面取值進(jìn)行填充。
parse_levelprocessor是用于一個匹配日志格式的功能,如果日志文件最前面出現(xiàn)的那個日志級別則這個日志加一個相應(yīng)級別的字段。
filebeat還有對于template處理的功能的功能可以指定所用的mapping。
開發(fā)filebeat processor
使用的過程中主要是針對一些不滿足的processor進(jìn)行了開發(fā),filebeat的代碼結(jié)構(gòu)非常清晰抽象也很好,可以很簡單的進(jìn)行開發(fā)。
filebeat的processor功能主要放在libbeat和filbeat同級的目錄下,在這個目錄下就叫processors??梢钥吹嚼锩嬗?code>actions,add_cloud_metadata、add_kubernetes_metadata、add_docker_metadata所以filebeat也只支持直接docker的processor的,比較普通的processor都是放在actions下面的所以如果我們需要開發(fā)一些簡單的processor的話可以直接放到下面,包括decode_json和drop_event等也是放在下面的。以add_field為例:
package actions
import (
"fmt"
"regexp"
"strings"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
type addFields struct {
Fields map[string]string
reg *regexp.Regexp
}
func init() {
processors.RegisterPlugin("add_fields",
configChecked(newAddFields,
requireFields("fields"),
allowedFields("fields", "when")))
}
func newAddFields(c *common.Config) (processors.Processor, error) {
config := struct {
Fields map[string]string `config:"fields"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_fields configuration: %s", err)
}
f := &addFields{Fields: config.Fields, reg: regexp.MustCompile("{(.*)}")}
return f, nil
}
func (f *addFields) Run(event *beat.Event) (*beat.Event, error) {
var errors []string
for field, value := range f.Fields {
matchers := f.reg.FindAllStringSubmatch(value, -1)
if len(matchers) == 0 {
event.PutValue(field, value)
} else {
if len(matchers[0]) >= 2 {
val, err := event.GetValue(strings.Trim(matchers[0][1], " "))
if err != nil {
errors = append(errors, err.Error())
} else {
event.PutValue(field, val)
}
}
}
}
return event, nil
}
func (f *addFields) String() string {
var fields []string
for field, _ := range f.Fields {
fields = append(fields, field)
}
return "add_fields=" + strings.Join(fields, ", ")
}
需要定義自己的struct, newAddFields方法通過配置文件初始化自己的struct。并在init里面通過RegisterPlugin把自己的processor注冊進(jìn)去。這個struct主要是要實現(xiàn)Run方法,這個方法就是對于每一條日志event的具體處理。
到這就基本上實現(xiàn)了對接kubernetes的對接改造就基本上完成了,當(dāng)然還有其他很多工作可以做,比如golang本身的regex和encoding/json性能比較差,這些都是可以優(yōu)化的地方。
我自己fork出來的地址是https://github.com/yiqinguo/beats增加了Makefile直接編譯打鏡像,和filebeat-ds.yml直接發(fā)到kubernetes集群里面。