Filebeat7 Kafka Gunicorn Flask Web應(yīng)用程序日志采集

本文的內(nèi)容

  • 如何用filebeat kafka es做一個好用,好管理的日志收集工具
  • 放棄logstash,使用elastic pipeline
  • gunicron日志格式與filebeat/es配置
  • flask日志格式與異常日志采集與filebeat/es配置
  • 以上的配置

概況

我有一個HTTP請求,經(jīng)過的路徑為

Gateway(kong)-->WebContainer(gunicorn)-->WebApp(flask)

我準(zhǔn)備以下流向處理我的日志

file --> filebeat --> kafka topic--> filebeat --> elastic pipeline --> elasticsearch
                       |
                       |  ----------> HBase

為什么這么做

Logstash去哪里了?

  • Logstash太重了,不過這不是問題,也就是多個機器加點錢的問題。能把事情處理就行。
  • Logstash不美,Logstash雖然是集中管理配置,但是一個logstash好像總是不夠,Logstash好像可以分開配置,但是你永遠不知道如何劃分哪些配置應(yīng)該放在一個配置文件,哪些應(yīng)該分開。
  • 刪除一個配置?不可能的,我怎么知道應(yīng)該刪除什么配置。
  • 如果用了Logstash. As a 'poor Ops guys having to understand and keep up with all the crazy input possibilities. _

Filebeat的痛處

  • 看看這個Issue吧, 萬人血書讓filebeat支持grok, 但是就是不支持,不過給了我們兩條路,比如你可以用存JSON的日志啊, 或者用pipeline
  • Filebeat以前是沒有一個好的kafka-input。只能自己寫kafka-es的轉(zhuǎn)發(fā)工具

簡單點

我想要的日志采集就是簡簡單單,或者說微服務(wù)的內(nèi)聚力。 一條日志采集線就不該和其他業(yè)務(wù)混合。最好的就是以下這種狀態(tài)

onefile -> filebeat_config -> kafka_topic -> filebeat_config -> elastic pipepline -> es index

Gunicorn日志

gunicorn日志

gunicorn日志采集如下的信息

  • time
  • client_ip
  • http method
  • http scheme
  • url
  • url query string
  • response status code
  • client name
  • rt
  • trace id
  • remote ips

日志格式

%(t)s [%(h)s] [%(m)s] [%(H)s] [%(U)s] [%(q)s] [%(s)s] [%(a)s] [%(D)s] [%({Kong-Request-ID}i)s] [%({X-Forwarded-For}i)s]

日志例子

[15/Nov/2019:10:23:37 +0000] [172.31.37.123] [GET] [HTTP/1.1] [/api/v1/_instance/json_schema/Team/list] [a=1] [200] [Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36] [936] [9cbf6a3b-9c3a-4835-a2ef-02e03ee826d7#16] [137.59.103.3, 172.30.17.253, 172.30.18.12]

Es processing解析

es processing是6.0之后的功能,相當(dāng)于es之前自帶了一個logstash.對于復(fù)雜日志有多種processing,
可以使用grok或者dissect.某些情況下dissect更加快一些.
經(jīng)過kafka,再有filebeat打到ES, 需要刪除多余的信息

PUT _ingest/pipeline/gunicorn
{
  "description" : "devops gunicorn pipeline",
  "processors" : [
    {
        "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
    },
    {
        "json": {
            "field": "message",
            "add_to_root": true
        }
    },
    {
        "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
    },
    {
      "dissect" : {
        "field": "message",
        "pattern": "[%{@timestamp}] [%{client_ip}] [%{method}] [%{scheme}] [%{path}] [%{query_string}] [%{status}] [%{client}] [%{rt_millo}] [%{trace_id}] [%{remote_ips}]"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }  
  ]
}

Es mapping

這里比較關(guān)鍵的是ES時間格式文檔的定義, 如果某些字段我們覺得有必要分詞,就是用text。否則使用keyword。這樣可以更加
方便的聚合和查詢?nèi)罩緮?shù)據(jù), 開啟_source方便做一些數(shù)據(jù)統(tǒng)計

PUT _template/gunicorn
{
  "index_patterns": ["*gunicorn*"],
  "settings": {
    "number_of_shards": 1
  },
  "version": 1,
  "mappings": {
    "_source": {
      "enabled": true
    },
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "dd/LLL/yyyy:HH:mm:ss Z"
      },
      "client_ip": {
"type": "ip"
      },
      "method": {
        "type": "keyword"
      },
      "scheme": {
        "type": "keyword"
      },
      "path": {
        "type": "text"
      },
     "query_string": {
        "type": "text"
      },
     "status": {
        "type": "integer"
      },
            "client": {
        "type": "text"
      },
            "rt_millo": {
        "type": "long"
      },
            "trace_id": {
        "type": "keyword"
      },
      "remote_ips": {
        "type": "text"
      }
    }
  }
}

filebeat 采集到kafka配置文件

filebeat.inputs:
  - type: log
    paths:
      - /yourpath/gunicorn-access.log
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after
    tail_files: true

queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 5s


output.kafka:
  hosts:  ["kafka-01","kafka-02","kafka-03"]
  topic: 'gunicron_access'
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

filebeat 從kafka消費配置文件

filebeat.inputs:
- type: kafka
  hosts:  ["kafka-01","kafka-02","kafka-03"]
  topics: ["gunicron_access"]
  group_id: "filebeat_gunicron"


output.elasticsearch:
  hosts: ["es-url"]
  pipeline: "gunicorn"
  index: "gunicorn-%{+yyyy.MM.dd}"
  
setup.template.name: "gunicorn"
setup.template.pattern: "gunicorn-*"
setup.ilm.enabled: false
setup.template.enabled: false

Flask日志

Flask日志是我們程序打印的,用于查看一些異常和錯誤的日志。在上線初期,info日志是可以打開debug的日志的。這樣方便我們進行調(diào)試。
在穩(wěn)定之后應(yīng)該將日志接受級別調(diào)高。info日志不適合做統(tǒng)計,只是除了問題我們可以快速定位問題所在。 異常應(yīng)該打到info日志中

INFO日志可以使用我建議的格式。我們關(guān)心

  • time
  • levelname: 日志級別
  • host, process, thread: 用于定位到某臺機器的某個進程下的某個線程(一些復(fù)雜的bug需要,或者開啟了異步進程)
  • name, funcname, filename, lineno: 用于定位日志發(fā)生的代碼位置
  • message: 日志內(nèi)容

日志格式

{
    "format": "[%(asctime)s.%(msecs)03d] [%(levelname)s] [{}:%(process)d:%(thread)d] [%(name)s:%(funcName)s] [%(filename)s:%(lineno)d] %(message)s".format(HOST),
    "datefmt": "%Y-%m-%d %H:%M:%S"
}

日志例子

[2019-11-18 08:47:49.424] [INFO] [cmdb-008069:5990:140482161399552] [cmdb:execute_global_worker] [standalone_scheduler.py:116] RUN_INFO: tiny_collector_ali starting at 2019-11-18 08:47:49, next run will be at approximately 2019-11-18 09:47:49
[2019-11-18 08:11:27.715] [ERROR] [cmdb-008069:5985:140184204932928] [cmdb:common_handler] [error.py:48] 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
Traceback (most recent call last):
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1805, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1783, in dispatch_request
    self.raise_routing_exception(req)
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1766, in raise_routing_exception
    raise request.routing_exception
  File "/home/server/venv3/lib/python3.6/site-packages/flask/ctx.py", line 336, in match_request
    self.url_adapter.match(return_rule=True)
  File "/home/server/venv3/lib/python3.6/site-packages/werkzeug/routing.py", line 1799, in match
    raise NotFound()
werkzeug.exceptions.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.

Es processing解析

經(jīng)過kafka,再有filebeat打到ES, 需要刪除多余的信息

PUT _ingest/pipeline/info
{
  "description" : "devops info pipeline",
  "processors" : [
    {
        "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
    },
    {
        "json": {
            "field": "message",
            "add_to_root": true
        }
    },
    {
        "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
    },
    {
      "dissect" : {
        "field": "message",
        "pattern": "[%{@timestamp}] [%{level}] [%{host}:%{process_id}:%{thread_id}] [%{name}:%{func_name}] [%{file}:%{line_no}] %{content}"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }  
  ]
}

Es mapping

thread_id 要給一個long字段, python如果獲取不到會給一個超出integer范圍的數(shù)字

PUT _template/info
{
  "index_patterns": ["*info*"],
  "settings": {
    "number_of_shards": 1
  },
  "version": 1,
  "mappings": {
    "_source": {
      "enabled": true
    },
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss.SSS"
      },
      "level": {
        "type": "keyword"
      },
      "host": {
        "type": "keyword"
      },
      "process_id": {
        "type": "integer"
      },
     "thread_id": {
        "type": "long"
      },
       "name": {
        "type": "keyword"
      },
            "func_name": {
        "type": "keyword"
      },
             "file": {
        "type": "keyword"
      },
             "line_no": {
        "type": "integer"
      },
      "content": {
          "type": "text"
      }
    }
  }
}

filebeat 采集到Kafka配置文件

這里采用^\[20\d{2}來區(qū)分行首

filebeat.inputs:
  - type: log
    paths:
      - /you_path/app.log
    multiline.pattern: '^\[20\d{2}'
    multiline.negate: true
    multiline.match: after
    tail_files: true

queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 5s

output.kafka:
  hosts: ["kafka-01", "kafka-02", "kafka-03"]
  topic: 'devops_app'
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

filebeat 從kafka消費配置文件

filebeat.inputs:
- type: kafka
  hosts:   ["kafka-01", "kafka-02", "kafka-03"]
  topics: ["devops_app"]
  group_id: "filebeat_app"


output.elasticsearch:
  hosts: ["es_url"]
  pipeline: "info"
  index: "app-info-%{+yyyy.MM.dd}"
  
setup.template.name: "info"
setup.template.pattern: "app-info-*"
setup.ilm.enabled: false
setup.template.enabled: false
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容