項(xiàng)目背景
運(yùn)維人云需要對(duì)系統(tǒng)和業(yè)務(wù)日志進(jìn)行精準(zhǔn)把控,便于分析系統(tǒng)和業(yè)務(wù)狀態(tài)。日志往往分布在不同的服務(wù)器上,傳統(tǒng)的方式使用跳板機(jī)登錄服務(wù)器一臺(tái)一臺(tái)的查看日志,這樣既繁瑣,效率又低,所以我們需要集中化日志管理工具,將不同服務(wù)器上的日志統(tǒng)一收集,統(tǒng)一展示,當(dāng)然最好可以進(jìn)行繪圖分析
ELK介紹
elk是一套開(kāi)源的日志分析系統(tǒng),由
elasticsearch+logstash+kibana組成,它可以實(shí)時(shí)的收集、處理、展示、分析數(shù)據(jù)
常用架構(gòu)
datasource->logstash->elasticsearch->kibana
datasource->filebeat->logstash->elasticsearch->kibana
datasource->filebeat->logstash->redis/kafka->logstash->elasticsearch->kibana
datasource->filebeat->kafka->logstash->elasticsearch->kibana(最常用)備注:
datasource為日志數(shù)據(jù)
組件介紹
以最常用為例
filebeat:輕量級(jí)數(shù)據(jù)收集引擎kafka:消息隊(duì)列logstash:數(shù)據(jù)收集處理引擎。支持動(dòng)態(tài)的從各種數(shù)據(jù)源搜集數(shù)據(jù),并對(duì)數(shù)據(jù)進(jìn)行過(guò)濾、分析、豐富、統(tǒng)一格式等操作,然后存儲(chǔ)以供后續(xù)使用。elasticsearch:分布式搜索引擎。具有高可伸縮、高可靠、易管理等特點(diǎn)??梢杂糜谌臋z索、結(jié)構(gòu)化檢索和分析,并能將這三者結(jié)合起來(lái)。Elasticsearch是用Java 基于Lucene開(kāi)發(fā)kibana:可視化化平臺(tái)。它能夠搜索、展示存儲(chǔ)在Elasticsearch中索引數(shù)據(jù)。使用它可以很方便的用圖表、表格、地圖展示和分析數(shù)據(jù)
重要組件詳細(xì)介紹
filebeat
架構(gòu)圖
[圖片上傳失敗...(image-7a1b41-1689256799159)]
工作原理
當(dāng)您啟動(dòng)
Filebeat時(shí),它會(huì)啟動(dòng)一個(gè)或 更多的輸入,在您指定的日志數(shù)據(jù)的位置中查找。為了Filebeat定位的每個(gè)日志,Filebeat啟動(dòng)一個(gè)收集器。每個(gè) 收集器讀取新內(nèi)容的單個(gè)日志,并將新日志數(shù)據(jù)發(fā)送到libbeat,它聚合事件并將聚合的數(shù)據(jù)發(fā)送到配置的Filebeat輸出
harvseter
收割器負(fù)責(zé)閱讀單個(gè)文件的內(nèi)容。采集器逐行讀取每個(gè)文件,并將內(nèi)容發(fā)送到輸出。為每個(gè)文件啟動(dòng)一個(gè)收集器。收集器負(fù)責(zé)打開(kāi)和關(guān)閉文件,這意味著在收集器運(yùn)行時(shí)文件描述符保持打開(kāi)狀態(tài)。如果文件在獲取時(shí)被刪除或重命名,
Filebeat會(huì)繼續(xù)讀取該文件。這有一個(gè)副作用,即磁盤(pán)上的空間將被保留,直到收割機(jī)關(guān)閉。默認(rèn)情況下,Filebeat保持文件打開(kāi),直到達(dá)到close_inactive
input
輸入負(fù)責(zé)管理
harvseter并查找要讀取的所有源。如果輸入類(lèi)型為
log,則輸入查找驅(qū)動(dòng)器上與定義的glob路徑匹配的所有文件,并為每個(gè)文件啟動(dòng)收集器。
Filebeat目前支持多種input類(lèi)型。每個(gè)輸入類(lèi)型可以定義多次。log輸入檢查每個(gè)文件,以查看是否需要啟動(dòng)收集器,是否已經(jīng)在運(yùn)行,或者是否可以忽略該文件(參見(jiàn)ignore_older)。只有當(dāng)文件大小在收獲器關(guān)閉后發(fā)生了變化時(shí),才會(huì)拾取新行。參考文檔: https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-log.html
常用配置文件詳解
filebeat:
spool_size: 1024 # 最大可以攢夠 1024 條數(shù)據(jù)一起發(fā)送出去
idle_timeout: "5s" # 否則每 5 秒鐘也得發(fā)送一次
registry_file: ".filebeat" # 文件讀取位置記錄文件,會(huì)放在當(dāng)前工作目錄下。所以如果你換一個(gè)工作目錄執(zhí)行 filebeat 會(huì)導(dǎo)致重復(fù)傳輸!
config_dir: "path/to/configs/contains/many/yaml" # 如果配置過(guò)長(zhǎng),可以通過(guò)目錄加載方式拆分配置
prospectors: # 有相同配置參數(shù)的可以歸類(lèi)為一個(gè) prospector
fields:
ownfield: "mac" # 類(lèi)似 logstash 的 add_fields
paths:
- /var/log/system.log # 指明讀取文件的位置
- /var/log/wifi.log
include_lines: ["^ERR", "^WARN"] # 只發(fā)送包含這些字樣的日志
exclude_lines: ["^OK"] # 不發(fā)送包含這些字樣的日志
document_type: "apache" # 定義寫(xiě)入 ES 時(shí)的 _type 值
ignore_older: "24h" # 超過(guò) 24 小時(shí)沒(méi)更新內(nèi)容的文件不再監(jiān)聽(tīng)。在 windows 上另外有一個(gè)配置叫 force_close_files,只要文件名一變化立刻關(guān)閉文件句柄,保證文件可以被刪除,缺陷是可能會(huì)有日志還沒(méi)讀完
scan_frequency: "10s" # 每 10 秒鐘掃描一次目錄,更新通配符匹配上的文件列表
tail_files: false # 是否從文件末尾開(kāi)始讀取
harvester_buffer_size: 16384 # 實(shí)際讀取文件時(shí),每次讀取 16384 字節(jié)
backoff: "1s" # 每 1 秒檢測(cè)一次文件是否有新的一行內(nèi)容需要讀取
paths:
- "/var/log/apache/*" # 可以使用通配符
exclude_files: ["/var/log/apache/error.log"]
input_type: "stdin" # 除了 "log",還有 "stdin"
multiline: # 多行合并
pattern: '^[[:space:]]'
negate: false
match: after
output:
...
filebeat多行日志收集
parsers:
- multiline:
type: pattern
pattern: '^\['
negate: true
match: after
在上述示例中,我們使用
multiline配置選項(xiàng)來(lái)指定多行日志的處理方式。具體來(lái)說(shuō),我們使用pattern參數(shù)指定一個(gè)正則表達(dá)式,該表達(dá)式用于匹配日志記錄的第一行。在這里,我們使用'^\['模式來(lái)匹配以方括號(hào)開(kāi)頭的行,例如'[INFO]'或'[ERROR]'等。然后,我們使用
negate參數(shù)將模式取反,這意味著任何不匹配模式的行都被視為單獨(dú)的事件。最后,我們使用match參數(shù)指定將匹配行(即以方括號(hào)開(kāi)頭的行)添加到前一個(gè)行(即上一個(gè)單獨(dú)的事件)的后面,從而形成一個(gè)完整的多行日志事件。參考文檔: https://www.elastic.co/guide/en/beats/filebeat/7.17/multiline-examples.html
filebeat添加字段
filebeat.inputs:
- type: log
fields_under_root: true
processors:
- add_fields:
fields:
project: 'project_test_demo' # 添加的字段
enabled: true
paths:
- /data/logs/*.log
output.kafka:
...
可選字段,可以指定這些字段將其信息添加到輸入,字段可以是標(biāo)量值、數(shù)組、字典或任何嵌套的組合,默認(rèn)情況下,在此出指定的字段為分組在輸出文檔的fields字典下,要存儲(chǔ)自定義字段作為頂級(jí)字段,請(qǐng)將
fields_under_root選擇設(shè)置為true
filebeat使用target添加字段
processors:
- add_fields:
target: project
fields:
name: myproject
id: '574734885120952459'
效果為:
{
"project": {
"name": "myproject",
"id": "574734885120952459"
}
}
add_fields處理器向事件添加附加字段。 字段可以是 標(biāo)量值、數(shù)組、字典或它們的任何嵌套組合。 如果目標(biāo)字段已經(jīng)存在,add_fields處理器將覆蓋該字段。 默認(rèn)情況下,您指定的字段將分組在fields 事件中的子字典。將字段分組到不同的 子字典,使用target設(shè)置。將字段存儲(chǔ)為 頂級(jí)字段
filebeat 輸出到kafka
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
# message topic selection + partitioning
topic: '%{[fields.log_topic]}'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
事件大于 max_message_bytes將被撤銷(xiāo),要避免此問(wèn)題請(qǐng)確保
filebeat不會(huì)生成大于max_message_bytes
filebeat完整配置文件實(shí)例
filebeat.inputs:
- type: log
fields_under_root: true
processors:
- add_fields:
fields:
project: 'project_test_demo'
enabled: true
paths:
- /data/logs/*.log
output.kafka:
enabled: true
hosts: ["192.168.50.38:9092","192.168.50.39:9092","192.168.50.40:9092"]
topic: "log-demo"
filebeat使用script
processors:
- script:
lang: javascript
source: >
var console = require('console');
function process(event) {
var logPath = event.Get("log.file.path");
console.log("xxxxxxxxxxxxxxxxxxxxxxxxxxxxx",logPath);
}
獲取文件路徑,更多案例請(qǐng)參考項(xiàng)目實(shí)戰(zhàn)章節(jié)
logstash
工作原理
Logstash是一個(gè)開(kāi)源的數(shù)據(jù)收集引擎,可以從各種來(lái)源收集數(shù)據(jù),并將其轉(zhuǎn)換和輸出到不同的目的地。以下是Logstash的工作原理:
- 輸入:
Logstash從多個(gè)輸入中收集數(shù)據(jù),例如文件、網(wǎng)絡(luò)套接字、消息隊(duì)列等。- 過(guò)濾:
Logstash將從輸入中收集到的數(shù)據(jù)傳遞給過(guò)濾器,通過(guò)過(guò)濾器對(duì)數(shù)據(jù)進(jìn)行處理、轉(zhuǎn)換和豐富等操作,例如解析JSON數(shù)據(jù)、增加字段、刪除字段、重命名字段等。- 輸出:經(jīng)過(guò)過(guò)濾后的數(shù)據(jù)被發(fā)送到指定的輸出位置,如 Elasticsearch、Redis、Amazon S3 等。
- 插件:
Logstash的強(qiáng)大之處在于其插件架構(gòu),用戶可以根據(jù)需要選擇或編寫(xiě)插件來(lái)擴(kuò)展Logstash的功能。插件可以用于輸入、過(guò)濾、輸出、編碼、解碼等方面。- 可靠性:為了保證數(shù)據(jù)的完整性和可靠性,
Logstash提供了一些機(jī)制來(lái)緩存、重新嘗試和重放失敗的事件,以便在出現(xiàn)故障時(shí)能夠恢復(fù)數(shù)據(jù)流。
input
input{
kafka{
bootstrap_servers => ["192.168.50.38:9092,192.168.30.39:9092,192.168.50.40:9092"]
client_id => "log-demo"
group_id => "logstash-es"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => "true"
topics => ["log-demo"]
type => "kafka-to-elas"
codec => json
}
}
bootstrap_servers: 指定 Kafka 集群中某些 Kafka 代理的地址和端口號(hào)列表
client_id: 指定與 Kafka 代理通信時(shí)要使用的客戶端 ID。這個(gè) ID 可以用來(lái)追蹤在 Kafka 代理上運(yùn)行的 Logstash 實(shí)例,多個(gè)客戶端(Logstash)同時(shí)消費(fèi)需要設(shè)置不同的client_id,注意同一分組的客戶端數(shù)量≤kafka分區(qū)數(shù)量
group_id: 指定消費(fèi)者組的名稱(chēng),多個(gè)消費(fèi)者可以在同一個(gè)組內(nèi)進(jìn)行協(xié)調(diào)以實(shí)現(xiàn)負(fù)載均衡
auto_offset_reset: 當(dāng)沒(méi)有可用的偏移量時(shí),設(shè)置從哪里開(kāi)始讀取數(shù)據(jù)。這里我們將其設(shè)置為latest,表示從最新的消息開(kāi)始讀取
consumer_threads: 指定 Logstash 使用的消費(fèi)者線程數(shù)。如果沒(méi)有指定,則默認(rèn)使用 1
decorate_events: 將 Kafka 消息轉(zhuǎn)換成 Logstash 的事件對(duì)象,并添加一些元數(shù)據(jù)信息(例如主題、分區(qū)和偏移量等)。這里我們將其設(shè)置為true
topics: 指定要消費(fèi)的 Kafka 主題列表
type: 為 Logstash 事件指定一個(gè)類(lèi)型。這里我們將其設(shè)置為kafka-to-elas,以便稍后用于區(qū)分其他輸入來(lái)源
codec: 指定消息的編解碼器,以及如何將其轉(zhuǎn)換為 Logstash 事件對(duì)象。這里我們使用json編解碼器,以便將 JSON 格式的消息轉(zhuǎn)換為L(zhǎng)ogstash事件對(duì)象
logstash filter mutate添加字段
filter {
mutate {
add_field => {
"env" => "%{[tags][0]}"
}
}
}
關(guān)于取值,使用%{},其中{}里面使用json取值方法
logstash filter mutate刪除字段
filter {
mutate {
remove_field => ["@version"]
}
當(dāng)我們刪除的字段為頂級(jí)字段,只有一個(gè)值時(shí),可以省略%{}
logstash使用ruby添加字段
filter {
mutate {
add_field => {
"env" => "%{[tags][0]}"
}
}
ruby {
code => "
hostNameValues = event.get('[host][name]')
event.set('hostvalue',hostNameValues)
"
}
}
其中
hostvalue為字段名稱(chēng),hostNameValues為字段的值
[圖片上傳失敗...(image-dfacb3-1689256799159)]
logstash使用if判斷
input{
kafka{
bootstrap_servers => ["192.168.50.38:9092,192.168.30.39:9092,192.168.50.40:9092"]
client_id => "elk-log"
group_id => "elk-log"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics_pattern => "log.*"
type => "kafka-to-elas"
codec => json
}
}
filter {
mutate {
add_field => {
"env" => "%{[tags][0]}"
}
remove_field => ["@version"]
}
ruby {
code => "
hostNameValues = event.get('[host][name]')
event.set('hostvalue',hostNameValues)
"
}
if [serviceName] == 'go-log' {
ruby {
code => "
envNew = event.get('[env]')
event.set('envNew',envNew)
"
}
}
}
output {
stdout {}
}
[圖片上傳失敗...(image-b05336-1689256799159)]
logstash auto_offset_reset 配置
input{
kafka{
bootstrap_servers => ["192.168.50.38:9092,192.168.30.39:9092,192.168.50.40:9092"]
client_id => "elk-log"
group_id => "elk-log"
auto_offset_reset => "earliest"
consumer_threads => 3
decorate_events => true
topics_pattern => "log.*"
type => "kafka-to-elas"
codec => json
}
}
auto_offset_reset是 Kafka Consumer Group 的一個(gè)參數(shù),用于指定 Consumer Group 在找不到 Offset 或者 Offset 失效時(shí)應(yīng)該從何處開(kāi)始讀取消息。它的取值范圍包括earliest(從最早的消息開(kāi)始)和latest(從最新的消息開(kāi)始)
項(xiàng)目實(shí)戰(zhàn)
說(shuō)明:服務(wù)器配置僅供學(xué)習(xí)使用,生產(chǎn)環(huán)境請(qǐng)按需求增加資源
服務(wù)器配置
系統(tǒng):centos7
內(nèi)存:8G
cpu:4V
磁盤(pán):40G
臺(tái)數(shù):4臺(tái)
服務(wù)器規(guī)劃
| IP | 服務(wù) |
|---|---|
| 192.168.50.38 | es、kafka、filebeat |
| 192.168.50.39 | es、kafka、filebeat |
| 192.168.50.40 | es、kafka、filebeat |
| 192.168.50.47 | kibana、logstash |
es、filebeat、logstash、kibana版本保持一致
組件版本規(guī)劃
| 組件名稱(chēng) | 組件版本 |
|---|---|
| filebeat | 7.17.9 |
| kafka | 3.0.0 |
| logstash | 7.17.9 |
| elasticsearch | 7.17.9 |
| kibana | 7.17.9 |
es集群部署
192.168.50.38、192.168.50.39、192.168.50.40服務(wù)器上執(zhí)行
下載地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.9-linux-x86_64.tar.gz
192.168.50.38服務(wù)器上執(zhí)行
### 下載es文件
# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.9-linux-x86_64.tar.gz
### 創(chuàng)建存放elk的目錄
# mkdir /data/soft -p
### 解壓es到指定目錄
# tar -xvf elasticsearch-7.17.9-linux-x86_64.tar.gz -C /data/soft/
### 修改es配置文件
# cd /data/soft/elasticsearch-7.17.9/
# cat > config/elasticsearch.yml <<EOF
cluster.name: my-application #集群名稱(chēng),所有節(jié)點(diǎn)必須一致
node.name: node-1 # 節(jié)點(diǎn)名稱(chēng),每個(gè)節(jié)點(diǎn)必須不一致
path.data: /data/elk/data # 索引數(shù)據(jù)存放路徑,所有節(jié)點(diǎn)建議一致
path.logs: /data/elk/logs # 日志路徑,所有節(jié)點(diǎn)建議一致
bootstrap.memory_lock: true # es啟動(dòng)時(shí),是否鎖內(nèi)存,建議為true
network.host: 192.168.50.38 # 當(dāng)前es節(jié)點(diǎn)的ip地址
http.port: 9200 # 端口號(hào),所有節(jié)點(diǎn)建議一致
discovery.seed_hosts: ["192.168.50.38", "192.168.50.39","192.168.50.40"] # es集群節(jié)點(diǎn)直接互相發(fā)現(xiàn),填寫(xiě)所有節(jié)點(diǎn)ip:port
cluster.initial_master_nodes: ["node-1"] # 集群初始化,那個(gè)節(jié)點(diǎn)為master,這里配置一個(gè)節(jié)點(diǎn)名稱(chēng)即可,所有節(jié)點(diǎn)一致
action.destructive_requires_name: true
EOF
### 創(chuàng)建es所需目錄
# mkdir /data/elk/data -p && mkdir /data/elk/logs -p
### 創(chuàng)建es啟動(dòng)用戶為es
# useradd es
### 修改文件所屬人
# chown es /data/ -R
### 修改es所需系統(tǒng)配置
# cat >> /etc/security/limits.conf <<EOF
es hard nofile 65535
es soft nofile 65535
es nproc 4096
es hard memlock unlimited
es soft memlock unlimited
EOF
# cat >> /etc/sysctl.conf <<EOF
vm.swappiness=1
vm.max_map_count=262144
EOF
# 此時(shí)最好重啟一下服務(wù)器
### 切換為es用戶
# su es
### 啟動(dòng)es,可以現(xiàn)在前臺(tái)啟動(dòng),看是否報(bào)錯(cuò),沒(méi)有報(bào)錯(cuò)再切換為后臺(tái)啟動(dòng)
# /data/soft/elasticsearch-7.17.9/bin
# ./elasticsearch -d
192.168.50.39服務(wù)器上執(zhí)行
### 下載es文件
# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.9-linux-x86_64.tar.gz
### 創(chuàng)建存放elk的目錄
# mkdir /data/soft -p
### 解壓es到指定目錄
# tar -xvf elasticsearch-7.17.9-linux-x86_64.tar.gz -C /data/soft/
### 修改es配置文件
# cd /data/soft/elasticsearch-7.17.9/
# cat > config/elasticsearch.yml <<EOF
cluster.name: my-application #集群名稱(chēng),所有節(jié)點(diǎn)必須一致
node.name: node-2 # 節(jié)點(diǎn)名稱(chēng),每個(gè)節(jié)點(diǎn)必須不一致
path.data: /data/elk/data # 索引數(shù)據(jù)存放路徑,所有節(jié)點(diǎn)建議一致
path.logs: /data/elk/logs # 日志路徑,所有節(jié)點(diǎn)建議一致
bootstrap.memory_lock: true # es啟動(dòng)時(shí),是否鎖內(nèi)存,建議為true
network.host: 192.168.50.39 # 當(dāng)前es節(jié)點(diǎn)的ip地址
http.port: 9200 # 端口號(hào),所有節(jié)點(diǎn)建議一致
discovery.seed_hosts: ["192.168.50.38", "192.168.50.39","192.168.50.40"] # es集群節(jié)點(diǎn)直接互相發(fā)現(xiàn),填寫(xiě)所有節(jié)點(diǎn)ip:port
cluster.initial_master_nodes: ["node-1"] # 集群初始化,那個(gè)節(jié)點(diǎn)為master,這里配置一個(gè)節(jié)點(diǎn)名稱(chēng)即可,所有節(jié)點(diǎn)一致
action.destructive_requires_name: true
EOF
### 創(chuàng)建es所需目錄
# mkdir /data/elk/data -p && mkdir /data/elk/logs -p
### 創(chuàng)建es啟動(dòng)用戶為es
# useradd es
### 修改文件所屬人
# chown es /data/ -R
### 修改es所需系統(tǒng)配置
# cat >> /etc/security/limits.conf <<EOF
es hard nofile 65535
es soft nofile 65535
es nproc 4096
es hard memlock unlimited
es soft memlock unlimited
EOF
# cat >> /etc/sysctl.conf <<EOF
vm.swappiness=1
vm.max_map_count=262144
EOF
# 此時(shí)最好重啟一下服務(wù)器
### 切換為es用戶
# su es
### 啟動(dòng)es,可以現(xiàn)在前臺(tái)啟動(dòng),看是否報(bào)錯(cuò),沒(méi)有報(bào)錯(cuò)再切換為后臺(tái)啟動(dòng)
# /data/soft/elasticsearch-7.17.9/bin
# ./elasticsearch -d
192.168.50.40服務(wù)器上執(zhí)行
### 下載es文件
# wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.9-linux-x86_64.tar.gz
### 創(chuàng)建存放elk的目錄
# mkdir /data/soft -p
### 解壓es到指定目錄
# tar -xvf elasticsearch-7.17.9-linux-x86_64.tar.gz -C /data/soft/
### 修改es配置文件
# cd /data/soft/elasticsearch-7.17.9/
# cat > config/elasticsearch.yml <<EOF
cluster.name: my-application #集群名稱(chēng),所有節(jié)點(diǎn)必須一致
node.name: node-3 # 節(jié)點(diǎn)名稱(chēng),每個(gè)節(jié)點(diǎn)必須不一致
path.data: /data/elk/data # 索引數(shù)據(jù)存放路徑,所有節(jié)點(diǎn)建議一致
path.logs: /data/elk/logs # 日志路徑,所有節(jié)點(diǎn)建議一致
bootstrap.memory_lock: true # es啟動(dòng)時(shí),是否鎖內(nèi)存,建議為true
network.host: 192.168.50.40 # 當(dāng)前es節(jié)點(diǎn)的ip地址
http.port: 9200 # 端口號(hào),所有節(jié)點(diǎn)建議一致
discovery.seed_hosts: ["192.168.50.38", "192.168.50.39","192.168.50.40"] # es集群節(jié)點(diǎn)直接互相發(fā)現(xiàn),填寫(xiě)所有節(jié)點(diǎn)ip:port
cluster.initial_master_nodes: ["node-1"] # 集群初始化,那個(gè)節(jié)點(diǎn)為master,這里配置一個(gè)節(jié)點(diǎn)名稱(chēng)即可,所有節(jié)點(diǎn)一致
action.destructive_requires_name: true
EOF
### 創(chuàng)建es所需目錄
# mkdir /data/elk/data -p && mkdir /data/elk/logs -p
### 創(chuàng)建es啟動(dòng)用戶為es
# useradd es
### 修改文件所屬人
# chown es /data/ -R
### 修改es所需系統(tǒng)配置
# cat >> /etc/security/limits.conf <<EOF
es hard nofile 65535
es soft nofile 65535
es nproc 4096
es hard memlock unlimited
es soft memlock unlimited
EOF
# cat >> /etc/sysctl.conf <<EOF
vm.swappiness=1
vm.max_map_count=262144
EOF
# 此時(shí)最好重啟一下服務(wù)器
### 切換為es用戶
# su es
### 啟動(dòng)es,可以現(xiàn)在前臺(tái)啟動(dòng),看是否報(bào)錯(cuò),沒(méi)有報(bào)錯(cuò)再切換為后臺(tái)啟動(dòng)
# /data/soft/elasticsearch-7.17.9/bin
# ./elasticsearch -d
elasticsearch.yml用于配置Elasticsearch
jvm.options用于配置Elasticsearch JVM設(shè)置
log4j2.properties用于配置Elasticsearch日志
驗(yàn)證es集群
3臺(tái)es中隨便訪問(wèn)一臺(tái)即可
[圖片上傳失敗...(image-a9c881-1689256799160)]
kibana部署
下載地址:https://artifacts.elastic.co/downloads/kibana/kibana-7.17.9-linux-x86_64.tar.gz
192.168.50.47服務(wù)器上執(zhí)行
# wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.9-linux-x86_64.tar.gz
# mkdir /data/soft -p
# tar -xvf kibana-7.17.9-linux-x86_64.tar.gz -C /data/soft/
# cd /data/soft/kibana-7.17.9-linux-x86_64
# cat > config/kibana.yml <<EOF
server.port: 5601
server.host: "192.168.50.47"
server.name: "TPLN_ELK"
elasticsearch.hosts: ["http://192.168.50.38:9200","http://192.168.50.39:9200","http://192.168.50.40:9200",]
kibana.index: ".kibana"
kibana.defaultAppId: "home"
elasticsearch.pingTimeout: 1500
elasticsearch.requestTimeout: 30000
ops.interval: 5000
i18n.locale: "zh-CN"
EOF
# useradd es
# chown es /data/ -R
# cat > /usr/lib/systemd/system/kibana.service <<EOF
[Unit]
Description=kibana
After=network.target
[Service]
User=es
ExecStart=/data/soft/kibana-7.17.9-linux-x86_64/bin/kibana
ExecStop=/usr/bin/kill -15 $MAINPID
ExecReload=/usr/bin/kill -HUP $MAINPID
Type=simple
RemainAfterExit=yes
PrivateTmp=true
# file size
LimitFSIZE=infinity
# cpu time
LimitCPU=infinity
# virtual memory size
LimitAS=infinity
# open files
LimitNOFILE=65535
# processes/threads
LimitNPROC=64000
# locked memory
LimitMEMLOCK=infinity
# total threads (user+kernel)
TasksMax=infinity
TasksAccounting=false
[Install]
WantedBy=multi-user.target
EOF
# systemctl start kibana.service
訪問(wèn)kibana驗(yàn)證
[圖片上傳失敗...(image-b71dd9-1689256799160)]
kafka集群部署
如果需要使用zookeeper部署,請(qǐng)參考另外一篇文章,本次實(shí)驗(yàn),不再使用zookeeper
下載地址: https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
192.168.50.38服務(wù)器上執(zhí)行
# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
# tar -xvf kafka_2.12-3.0.0.tgz -C /data/soft/
# cd /data/soft/kafka_2.12-3.0.0
# cat > config/kraft/server.properties <<EOF
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@192.168.50.38:9093,2@192.168.50.39:9093,3@192.168.50.40:9093
listeners=PLAINTEXT://192.168.50.38:9092,CONTROLLER://192.168.50.38:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.50.38:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF
# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
### 設(shè)置集群的uuid 生成了隨機(jī)字符串:fRSifcmESiqs4nkhNWtoYw
# ./bin/kafka-storage.sh random-uuid
192.168.50.39服務(wù)器上執(zhí)行
# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
# tar -xvf kafka_2.12-3.0.0.tgz -C /data/soft/
# cd /data/soft/kafka_2.12-3.0.0
# cat > config/kraft/server.properties <<EOF
process.roles=broker,controller
node.id=2
controller.quorum.voters=1@192.168.50.38:9093,2@192.168.50.39:9093,3@192.168.50.40:9093
listeners=PLAINTEXT://192.168.50.39:9092,CONTROLLER://192.168.50.39:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.50.39:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF
# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
192.168.50.40服務(wù)器上執(zhí)行
# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.12-3.0.0.tgz
# tar -xvf kafka_2.12-3.0.0.tgz -C /data/soft/
# cd /data/soft/kafka_2.12-3.0.0
# cat > config/kraft/server.properties <<EOF
process.roles=broker,controller
node.id=3
controller.quorum.voters=1@192.168.50.38:9093,2@192.168.50.39:9093,3@192.168.50.40:9093
listeners=PLAINTEXT://192.168.50.40:9092,CONTROLLER://192.168.50.40:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.50.40:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kraft-combined-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
EOF
# yum install java-1.8.0-openjdk java-1.8.0-openjdk-devel -y
192.168.50.38、192.168.50.39、192.168.50.40服務(wù)器上執(zhí)行
# cd /data/soft/kafka_2.12-3.0.0
# ./bin/kafka-storage.sh format -t fRSifcmESiqs4nkhNWtoYw -c ./config/kraft/server.properties
# ./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties
其中
RSifcmESiqs4nkhNWtoYw這個(gè)字符串,是在192.168.50.38服務(wù)器上生成的uuid
filebeat部署
下載地址: https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.9-linux-x86_64.tar.gz
192.168.50.38、192.168.50.39、192.168.50.40服務(wù)器上執(zhí)行
### 下載filebeat
# wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.9-linux-x86_64.tar.gz
### 解壓filebeat
# tar -xvf filebeat-7.17.9-linux-x86_64.tar.gz -C /data/soft/
### 進(jìn)入解壓后的文件
# cd /data/soft/filebeat-7.17.9-linux-x86_64
### 修改配文件
# cat > filebeat.yml <<EOF
filebeat.inputs:
- type: log
tags: ["dev"]
fields_under_root: true
processors:
- add_fields:
fields:
serviceName: ""
processors:
- script:
lang: javascript
source: >
var console = require('console');
function process(event) {
var logPath = event.Get("log.file.path");
var fileName = getFileName(logPath);
var fileServiceName = getServiceName(fileName);
event.Put('serviceName',fileServiceName);
}
function getFileName(o) {
var pos = o.lastIndexOf('/');
return o.substring(pos + 1);
}
function getServiceName(o) {
return o.replace(/\.|_/g,'-')
}
enabled: true
paths:
- /data/logs/*.log
output.kafka:
enabled: true
hosts: ["192.168.50.38:9092","192.168.50.39:9092","192.168.50.40:9092"]
topic: 'log-%{[serviceName]}'
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1
EOF
### 創(chuàng)建/data/logs目錄,便我們實(shí)驗(yàn)收集日志
# mkdir -p /data/logs
### 創(chuàng)建日志文件
# touch /data/logs/{chiness.log,english.log,go.log,math_achievement_detailed.log}
### 在創(chuàng)建的文件隨機(jī)寫(xiě)入一下字符串,這里實(shí)驗(yàn)沒(méi)有使用多行收集,需要的話按照日志格式自行配置
# cd /data/logs
# echo `ifconfig | grep inet` >> go.log && echo `date` >> chiness.log
### 啟動(dòng)filebeat,實(shí)驗(yàn)暫時(shí)使用前臺(tái)方式啟動(dòng),驗(yàn)證沒(méi)有問(wèn)題再使用后臺(tái)啟動(dòng)
# ./filebeat -e -c filebeat.yml
logstash部署
下載地址: https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-linux-x86_64.tar.gz
192.168.50.47服務(wù)器上執(zhí)行
### 下載logstash
# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-linux-x86_64.tar.gz
### 解壓logstash
# tar -xvf logstash-7.17.9-linux-x86_64.tar.gz -C /data/soft/
### 進(jìn)入解壓后的文件
# cd /data/soft/logstash-7.17.9
### 修改配置文件
# cat > config/logstash.conf <<EOF
input{
kafka{
bootstrap_servers => ["192.168.50.38:9092,192.168.30.39:9092,192.168.50.40:9092"]
client_id => "elk-log"
group_id => "elk-log"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics_pattern => "log.*"
type => "kafka-to-elas"
codec => json
}
}
filter {
mutate {
add_field => {
"env" => "%{[tags][0]}"
}
remove_field => ["@version"]
}
ruby {
code => "
hostNameValues = event.get('[host][name]')
event.set('hostvalue',hostNameValues)
"
}
if [serviceName] == 'go-log' {
ruby {
code => "
envNew = event.get('[env]')
event.set('envNew',envNew)
"
}
}
}
output{
elasticsearch{
hosts => ["192.168.50.38:9200","192.168.50.39:9200","192.168.50.40:9200"]
index => "log-%{[serviceName]}"
timeout => 300
}
}
EOF
### 啟動(dòng)logstash,實(shí)驗(yàn)暫時(shí)使用前臺(tái)方式啟動(dòng) 驗(yàn)證沒(méi)有問(wèn)題再使用后臺(tái)啟動(dòng)
# ./bin/logstash -f config/logstash.conf
驗(yàn)證日志系統(tǒng)可用性
訪問(wèn)kibana
[圖片上傳失敗...(image-64e601-1689256799160)]
再次向日志文件中寫(xiě)入數(shù)據(jù)(隨機(jī)一臺(tái)服務(wù)器)
# cd /data/logs/
# echo `ifconfig | grep inet` >> go.log && echo `date` >> chiness.log
192.168.50.38 中寫(xiě)入的
在kibana中查看es中索引
[圖片上傳失敗...(image-5da49-1689256799160)]
[圖片上傳失敗...(image-ebc69f-1689256799160)]
發(fā)現(xiàn),我們剛在日志文件中添加數(shù)據(jù),此時(shí),已經(jīng)創(chuàng)建的該文件的索引
創(chuàng)建索引模式
[圖片上傳失敗...(image-ecf28e-1689256799160)]
[圖片上傳失敗...(image-8e0fa7-1689256799160)]
[圖片上傳失敗...(image-77c724-1689256799160)]
[圖片上傳失敗...(image-8bd3a2-1689256799160)]
只有當(dāng)輸入的索引匹配后,才能創(chuàng)建
[圖片上傳失敗...(image-dacc14-1689256799160)]
查看日志
[圖片上傳失敗...(image-c5b4e9-1689256799160)]
[圖片上傳失敗...(image-e100e-1689256799160)]
查看自定義的字段
[圖片上傳失敗...(image-f5a03e-1689256799160)]
完善當(dāng)前配置
配置es索引生命周期
[圖片上傳失敗...(image-85c564-1689256799160)]
[圖片上傳失敗...(image-e2c6bf-1689256799160)]
[圖片上傳失敗...(image-c0a113-1689256799160)]
[圖片上傳失敗...(image-d01aa3-1689256799160)]

滿足熱階段后,選擇delete階段,這個(gè)需要根據(jù)需求來(lái)哦,這個(gè)可是刪除哦
索引引用生命周期
修改logstash配置文件
output{
elasticsearch{
hosts => ["192.168.50.38:9200","192.168.50.39:9200","192.168.50.40:9200"]
index => "log-%{[serviceName]}"
ilm_enabled => true
ilm_policy => "log-dev-policy"
timeout => 300
}
}
重啟一下logstash
查看索引中的策略是否生效
[圖片上傳失敗...(image-d23779-1689256799160)]
es跨域設(shè)置
在elasticsearch.yml文件中添加以下配置
http.cors.enabled: true
http.cors.allow-origin: "*"
filebeat設(shè)置systemctl啟動(dòng)
# cat > /etc/systemd/system/filebeat.service <<EOF
[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation=https://www.elastic.co/guide/en/beats/filebeat/current/index.html
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Environment="BEAT_LOG_OPTS=-e"
ExecStart=/data/soft/filebeat-7.17.9-linux-x86_64/filebeat -c /data/soft/filebeat-7.17.9-linux-x86_64/filebeat.yml
Restart=always
[Install]
WantedBy=multi-user.target
EOF
# systemctl daemon-reload
# systemctl start filebeat
根據(jù)自己的實(shí)際情況進(jìn)行修改
ExecStart里的內(nèi)容
logstash設(shè)置systemctl啟動(dòng)
# cat > /etc/systemd/system/logstash.service <<EOF
[Unit]
Description=Filebeat sends log files to Logstash or directly to Elasticsearch.
Documentation=https://www.elastic.co/guide/en/beats/filebeat/current/index.html
Wants=network-online.target
After=network-online.target
[Service]
User=root
Group=root
Environment="BEAT_LOG_OPTS=-e"
ExecStart=/data/soft/logstash-7.17.9/bin/logstash -f /data/soft/logstash-7.17.9/config/logstash.conf
Restart=always
[Install]
WantedBy=multi-user.target
EOF
# systemctl daemon-reload
# systemctl start logstash
elasticsearch優(yōu)化
- 堆內(nèi)存調(diào)整
config/jvm.options中的Xms和Xmx,根據(jù)服務(wù)器的內(nèi)存進(jìn)行調(diào)整,一般為整體可用內(nèi)存的70%左右
elasticsearch根據(jù)時(shí)間自動(dòng)創(chuàng)建索引
input{
kafka{
bootstrap_servers => ["192.168.50.38:9092,192.168.30.39:9092,192.168.50.40:9092"]
client_id => "elk-log"
group_id => "elk-log"
auto_offset_reset => "earliest"
consumer_threads => 3
decorate_events => true
topics_pattern => "log.*"
type => "kafka-to-elas"
codec => json
}
}
filter {
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
}
mutate {
add_field => {
"env" => "%{[tags][0]}"
}
remove_field => ["@version"]
}
ruby {
code => "
hostNameValues = event.get('[host][name]')
event.set('hostvalue',hostNameValues)
"
}
if [serviceName] == 'go-log' {
ruby {
code => "
envNew = event.get('[env]')
event.set('envNew',envNew)
"
}
}
}
output{
elasticsearch{
hosts => ["192.168.50.38:9200","192.168.50.39:9200","192.168.50.40:9200"]
index => "log-%{[serviceName]}-%{+YYYY.MM.dd}"
timeout => 300
}
}
logstash完整配置文件,其中在filter過(guò)濾器中使用date,將時(shí)間信息提取出來(lái),并賦值給timestamp,然后使用
%{+YYYY.MM.dd}格式化字符串作為索引名稱(chēng),將當(dāng)前日期作為索引的一部分
效果如下
[圖片上傳失敗...(image-aefcbb-1689256799160)]
filebeat多行收集
對(duì)于java項(xiàng)目的日志,很多堆棧信息,如果不使用多行收集,日志查看起來(lái)就很困難,其中日志消息內(nèi)容大致為:
2023-04-14 16:32:42.062 ERROR [english-book-workflow,,,] 6 --- [d2-857851adf5dc] c.a.n.client.config.impl.ClientWorker : longPolling error :
java.net.UnknownHostException: nacos-headless
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) ~[na:1.8.0_161]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_161
2023-04-14 16:32:44.063 ERROR [english-book-workflow,,,] 6 --- [d2-857851adf5dc] c.a.n.c.config.http.ServerHttpAgent : [NACOS IOException httpPost] currentServerAddr: http
java.net.UnknownHostException: nacos-headless
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) ~[na:1.8.0_161]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_161]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_161]
[2023-04-21 10:20:54.884 | TID:a21d429243ec4e098423dce61678f1d9.64.16820436548834911 | | | | | | | | | BASIC | XNIO-1 task-4 INFO c.ienglish.server.book.controller.HealthController - Method: [healthTest] execute times: [0], Response: [{"code":0,"message":"success","success":true}] ]
[2023-04-21 10:20:58.068 | TID:a21d429243ec4e098423dce61678f1d9.64.16820436580674913 | | | | | | | | | BASIC | XNIO-1 task-7 INFO c.ienglish.server.book.controller.HealthController - RequestMapping: [/health], Method: [healthTest], Param: [[]] ]
[2023-04-21 10:20:58.068 | TID:a21d429243ec4e098423dce61678f1d9.64.16820436580674913 | | | | | | | | | BASIC | XNIO-1 task-7 INFO c.ienglish.server.book.controller.HealthController - Method: [healthTest] execute times: [0], Response: [{"code":0,"message":"success","success":true}] ]
其中包含了錯(cuò)誤日志和正常日志,還有一些格式不統(tǒng)一的問(wèn)題
filebeat配置文件為:
filebeat.inputs:
- type: log
tags: ["dev"]
fields_under_root: true
processors:
- add_fields:
fields:
serviceName: ""
processors:
- script:
lang: javascript
source: >
var console = require('console');
function process(event) {
var logPath = event.Get("log.file.path");
var fileName = getFileName(logPath);
var fileServiceName = getServiceName(fileName);
event.Put('serviceName',fileServiceName);
}
function getFileName(o) {
var pos = o.lastIndexOf('/');
return o.substring(pos + 1);
}
function getServiceName(o) {
return o.replace(/\.|_/g,'-')
}
enabled: true
paths:
- /data/logs/*.log
multiline.pattern: '^(\[[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2}|[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2})'
multiline.negate: true
multiline.match: after
multiline.max_lines: 100
output.kafka:
enabled: true
hosts: ["192.168.50.38:9092","192.168.50.39:9092","192.168.50.40:9092"]
topic: 'log-%{[serviceName]}'
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1
multiline.pattern: 多行的正則表達(dá)式
multiline.negate: 匹配到指定模式之前合并多行
multiline.match: 匹配到新行之前將先合并舊行
multiline.max_lines: 了最大行數(shù),超過(guò)這個(gè)數(shù)目的行將被忽略
日志報(bào)警
es對(duì)接grafana
[圖片上傳失敗...(image-71c6ae-1689256799160)]
關(guān)于日志報(bào)警構(gòu)想
1、獲取es中數(shù)據(jù)(json格式),曬選出所需字段,其中必要字段為message、表示時(shí)間字段等
2、通過(guò)alertmanager接口將報(bào)警信息發(fā)送至alertmanager
3、alertmanager通過(guò)路由組等報(bào)警規(guī)則發(fā)送至其他接收?qǐng)?bào)警渠道
日志告警實(shí)現(xiàn)代碼 zxl/prometheus_script/elkAlert_alertmanager
實(shí)現(xiàn)效果展示
[圖片上傳失敗...(image-4814ad-1689256799160)]
kibana儀表盤(pán)制作
[圖片上傳失敗...(image-570d3c-1689256799160)]
[圖片上傳失敗...(image-c56a97-1689256799160)]
[圖片上傳失敗...(image-c4d5c0-1689256799160)]
根據(jù)自己需求添加即可
[圖片上傳失敗...(image-9f33f9-1689256799160)]
基于k8s的filebeat部署
參考filebeat.yaml
filebeat.inputs:
- type: container
paths:
- /var/log/containers/*.log
#- /root/tmp/*.log
document_type: "english-server"
multiline.pattern: '^\[|[0-9]{4}-[0-9]{2}-[0-9]{2}\ [0-9]{2}:[0-9]{2}:[0-9]{2}|(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)'
multiline.negate: true
multiline.match: after
multiline.max_lines: 100
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
#logs_path: "/root/tmp/"
processors:
- add_cloud_metadata:
- add_host_metadata:
processors:
- script:
lang: js
id: dispose
#file: /usr/share/filebeat/dispose.js
source: >
var console = require('console');
var envs = new Array("general-components","english-qa","english-prod","chinese-old-qa","chinese-old","chinese-new-qa","chinese-new","bigdata-dev","bigdata-qa","bigdata-prod","kong-prod","french-prod","math-prod","preschool-prod","quality-prod");
function process(event) {
var logPath = event.Get("log.file.path");
var fileName = getFileName(logPath);
var curEnv = getEnv(fileName);
console.log(curEnv);
if("" != curEnv){
event.Put('nameSpace',curEnv);
var idx = fileName.indexOf(curEnv);
var appStr = fileName.substring(0,idx-1);
var appName = getAppName(appStr);
event.Put('appName',appName);
}else{
event.Cancel();
return;
}
}
function getFileName(o) {
var pos = o.lastIndexOf('/');
return o.substring(pos + 1);
}
function getEnv(o){
var rs = "";
for (var i=0; i<envs.length; i++) {
var value = envs[i];
if(o.indexOf(value) != -1){
rs = value;
break;
}
}
return rs;
}
function getAppName(o){
return o.replace(/(.*)-.*?-.*?$/,'$1');
}
- type: container
paths:
- /var/log/containers/nginx-ingress-controller*kube-system*.log
document_type: "english-server"
multiline.pattern: '^(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)\.(25[0-5]|2[0-4]\d|[0-1]\d{2}|[1-9]?\d)'
multiline.negate: true
multiline.match: after
multiline.max_lines: 100
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
processors:
- add_cloud_metadata:
- add_host_metadata:
processors:
- script:
lang: js
id: system_filter
source: >
function process(event) {
event.Put("appName","nginx-ingress-controller")
event.Put("nameSpace","kube-system")
}
processors:
- add_kubernetes_metadata:
host: ${NODE_NAME}
matchers:
- logs_path:
logs_path: "/var/log/containers/"
cloud.id: ${ELASTIC_CLOUD_ID}
cloud.auth: ${ELASTIC_CLOUD_AUTH}
output.elasticsearch:
enabled: false
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
output.kafka:
enabled: true # 增加kafka的輸出
hosts: ["10.16.202.197:9092", "10.16.201.158:9092", "10.16.200.67:9092", "10.16.202.211:9092", "10.16.202.210:9092"]
topic: 'log-%{[appName]}'
max_message_bytes: 5242880
partition.round_robin:
reachable_only: true
keep-alive: 120
required_acks: 1
請(qǐng)根據(jù)實(shí)際情況自行修改
快樂(lè)交流
博客請(qǐng)?jiān)L問(wèn):https://kubesre.com/
公眾號(hào)請(qǐng)搜索:云原生運(yùn)維圈