ELK +kafka作為日志和簡單數(shù)據(jù)處理
1.0 使用背景介紹
使用背景:讓服務每天產生大量的日志文件數(shù)據(jù)對公司業(yè)務做一部分貢獻,基于日志數(shù)據(jù)做簡單的數(shù)據(jù)加工用于替 代現(xiàn)在通過SQL視圖方式的統(tǒng)計,基于錯誤日志的簡單報警,和接口性能統(tǒng)計等
Kafka Stream 做簡單的流式處理嘗試,ELK的版本6.6.6, kafka 版本 2.1.0
使用kafka可以靈活擴展結束其他數(shù)據(jù)處理框架
2.0 流程示例圖

ELK技術棧.png
3.0 Filebeat
用于收集各個服務器上的日志,filebeat是Beat家族的一部分,用戶讀取日志文件,可以直接發(fā)送到Elasticsearch,Logstash或者其他消息隊列,filebeat是一個輕量級的日志代理,占用服務資源很少,新版本有"壓敏傳感器"可以根據(jù)流量自動調節(jié)傳送大小,詳見說明參考官方文檔
3.1 下載安裝
地址 :[https://www.elastic.co/downloads/beats/filebeat]選擇版本和安裝包的方式,個人選擇使用壓縮包的方式,下載完后解壓可以找到名為filebeat.yml的配置文件
filebeat.prospectors:
- type: log
paths:
- /data/wwwlogs/*.log
tail_files: true
ignore_older: 2h
fields:
logSource: nginx-test
fields_under_root: true
processors:
- drop_fields:
fields: ["host","beat","prospector","log","input","offset"]
output.file:
enabled: false
path: "/tmp/filebeat"
filename: filebeat
output.kafka:
enabled: true
hosts: ["host:port"]
version: 2.0.0
topic: 'filebeat_nginx_test'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
#------log--------------------------
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /home/filebeat/logfilebeat/
keepfiles: 4
name: mybeat.log
4.0 Logstash 配置文件,具體配置參考 https://www.elastic.co/guide/index.html 中 input-kafka介紹
input{
kafka{
bootstrap_servers => ["10.0.20.116:9092"]
client_id => "kafaka-test"
group_id => "kafaka-test"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["filebeat_nginx_test_01"]
type => "nginx"
codec => "json"
}
}
filter {
mutate {
add_field => {
"kafka" => "%{[@metadata][kafka]}"
}
}
}
output {
elasticsearch {
hosts => "host:port"
index => "yourindex-%{+YYYY.MM}"
}
kafka {
bootstrap_servers => "host:port"
topic_id => "nginx-access-log"
compression_type => "snappy"
}
}