如何編寫 Pipeline 腳本

前言

Pipeline 編寫較為麻煩,為此,DataKit 中內(nèi)置了簡單的調(diào)試工具,用以輔助大家來編寫 Pipeline 腳本。

調(diào)試 grok 和 pipeline

指定 pipeline 腳本名稱,輸入一段文本即可判斷提取是否成功

Pipeline 腳本必須放在?/pipeline 目錄下。

$ datakit pipeline your_pipeline.p -T '2021-01-11T17:43:51.887+0800? DEBUG io? io/io.go:458? post cost 6.87021ms'

Extracted data(cost: 421.705μs):? ?# 表示切割成功

{? ? "code"? : "io/io.go: 458",? ? ? ? ? ? # 對應(yīng)代碼位置? ?

?"level"? : "DEBUG",? ? ? ? ? ? ? ? ? ? ? ?# 對應(yīng)日志等級? ??

"module" : "io",? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?# 對應(yīng)代碼模塊? ??

"msg"? ? : "post cost 6.87021ms",? ?# 純?nèi)罩緝?nèi)容? ??

"time"? : 1610358231887000000? ? # 日志時間(Unix 納秒時間戳)? ??

"message": "2021-01-11T17:43:51.887+0800? DEBUG io? io/io.g o:458? post cost 6.87021ms"}

提取失敗示例(只有 message 留下了,說明其它字段并未提取出來):

$ datakit pipeline other_pipeline.p -T '2021-01-11T17:43:51.887+0800? DEBUG io? io/io.g o:458? post cost 6.87021ms'

{? ? "message": "2021-01-11T17:43:51.887+0800? DEBUG io? io/io.g o:458? post cost 6.87021ms"}

如果調(diào)試文本比較復(fù)雜,可以將它們寫入一個文件(sample.log),用如下方式調(diào)試:

$?datakit?pipeline?your_pipeline.p?-F?sample.log

更多 Pipeline 調(diào)試命令,參見 datakit help pipeline。

Grok 通配搜索

由于 Grok pattern 數(shù)量繁多,人工匹配較為麻煩。DataKit 提供了交互式的命令行工具?grokq(grok query):

datakit tool --grokq

grokq > Mon Jan 25 19:41:17 CST 2021? # 此處輸入你希望匹配的文本? ? ? ??

2 %{DATESTAMP_OTHER: ?}? ? ? ? # 工具會給出對應(yīng)對的建議,越靠前匹配月精確(權(quán)重也越大)。前面的數(shù)字表明權(quán)重。? ? ? ??

0 %{GREEDYDATA: ?}

grokq > 2021-01-25T18:37:22.016+0800? ? ? ??

4 %{TIMESTAMP_ISO8601: ?}? ? ? # 此處的 ? 表示你需要用一個字段來命名匹配到的文本? ? ? ??

0 %{NOTSPACE: ?}? ? ? ??

0 %{PROG: ?}? ? ? ??

0 %{SYSLOGPROG: ?}? ? ? ??

0 %{GREEDYDATA: ?}? ? ? ? ? ? # 像 GREEDYDATA 這種范圍很廣的 pattern,權(quán)重都較低? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?# 權(quán)重越高,匹配的精確度越大

grokq > Q? ? ? ? ? ? ? ? ? ? ? ? ? ? ? # Q 或 exit 退出

Bye!

Windows 下,請在 Powershell 中執(zhí)行調(diào)試。

多行如何處理

在處理一些調(diào)用棧相關(guān)的日志時,由于其日志行數(shù)不固定,直接用?GREEDYDATA?這個 pattern 無法處理如下情況的日志:

2022-02-10 16:27:36.116 ERROR 1629881 --- [scheduling-1] o.s.s.s.TaskUtils$LoggingErrorHandler? ? : Unexpected error occurred in scheduled task? ??

java.lang.NullPointerException: null? ??

at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)? ??

at com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)? ??

at java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)? ??

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)? ??

at java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)? ??

at java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)? ??

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)? ??

at java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)????

at?java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)

此處可以使用?GREEDYLINES?規(guī)則來通配,如(/usr/local/datakit/pipeline/test.p):

add_pattern('_dklog_date', '%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}%{INT}')

grok(_, '%{_dklog_date:log_time}\\s+%{LOGLEVEL:Level}\\s+%{NUMBER:Level_value}\\s+---\\s+\\[%{NOTSPACE:thread_name}\\]\\s+%{GREEDYDATA:Logger_name}\\s+(\\n)?(%{GREEDYLINES:stack_trace})'

# 此處移除 message 字段便于調(diào)試

drop_origin_data()

將上述多行日志存為?multi-line.log,調(diào)試一下:

$?datakit?--pl?test.p?--txt?"$(<multi-line.log)"

得到如下切割結(jié)果:

{??

"Level": "ERROR",??

"Level_value": "1629881",?

?"Logger_name": "o.s.s.s.TaskUtils$LoggingErrorHandler? ? : Unexpected error occurred in scheduled task",? "log_time": "2022-02-10 16:27:36.116",??

"stack_trace": "java.lang.NullPointerException: null\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.isSimilarPrize(xxxxxxxxxxxxxxxxx.java:442)\n\tat com.xxxxx.xxxxxxxxxxx.xxxxxxx.impl.SxxxUpSxxxxxxImpl.lambda$getSimilarPrizeSnapUpDo$0(xxxxxxxxxxxxxxxxx.java:595)\n\tat java.util.stream.ReferencePipeline$3$1.accept(xxxxxxxxxxxxxxxxx.java:193)\n\tat java.util.ArrayList$ArrayListSpliterator.forEachRemaining(xxxxxxxxx.java:1382)\n\tat java.util.stream.AbstractPipeline.copyInto(xxxxxxxxxxxxxxxx.java:481)\n\tat java.util.stream.AbstractPipeline.wrapAndCopyInto(xxxxxxxxxxxxxxxx.java:471)\n\tat java.util.stream.ReduceOps$ReduceOp.evaluateSequential(xxxxxxxxx.java:708)\n\tat java.util.stream.AbstractPipeline.evaluate(xxxxxxxxxxxxxxxx.java:234)\n\tat java.util.stream.ReferencePipeline.collect(xxxxxxxxxxxxxxxxx.java:499)",??

"thread_name": "scheduling-1"}

Pipeline 字段命名注意事項

在所有 Pipeline 切割出來的字段中,它們都是指標(biāo)(field)而不是標(biāo)簽(tag)。由于行協(xié)議約束,我們不應(yīng)該切割出任何跟 tag 同名的字段。這些 Tag 包含如下幾類:

DataKit 中的全局 Tag

日志采集器中自定義的 Tag

另外,所有采集上來的日志,均存在如下多個保留字段。我們不應(yīng)該去覆蓋這些字段,否則可能導(dǎo)致數(shù)據(jù)在查看器頁面顯示不正常。

字段名類型說明

source? ? ? ? ?string(tag)? ? ? ? ? ? ?日志來源

service? ? ? ? ?string(tag)? ? ? ? ? ? 日志對應(yīng)的服務(wù),默認(rèn)跟service一樣

status? ? ? ? ? ?string(tag)? ? ? ? ? ? 日志對應(yīng)的等級

message? ? ? string(field)? ? ? ? ? ?原始日志

time? ? ? ? ? ? ? int? ? ? ? ? ? ? ? ? ? ? ? ?日志對應(yīng)的時間戳

當(dāng)然我們可以通過特定的 Pipeline 函數(shù)覆蓋上面這些 tag 的值。

一旦 Pipeline 切割出來的字段跟已有 Tag 重名(大小寫敏感),都會導(dǎo)致如下數(shù)據(jù)報錯。故建議在 Pipeline 切割中,繞開這些字段命名。

# 該錯誤在 DataKit monitor 中能看到

same?key?xxx?in?tag?and?field

完整 Pipeline 示例

這里以 DataKit 自身的日志切割為例。DataKit 自身的日志形式如下:

2021-01-11T17:43:51.887+0800??DEBUG?io??io/io.go:458??post?cost?6.87021ms

編寫對應(yīng) pipeline:

# pipeline for datakit log

# Mon Jan 11 10:42:41 CST 2021

# auth: tanb

grok(_, '%{_dklog_date:log_time}%{SPACE}%{_dklog_level:level}%{SPACE}%{_dklog_mod:module}%{SPACE}%{_dklog_source_file:code}%{SPACE}%{_dklog_msg:msg}')

rename("time", log_time)? ? ? # 將 log_time 重名命名為 time

default_time(time)? ? ? ? ? ? ? ? ?# 將 time 字段作為輸出數(shù)據(jù)的時間戳

drop_origin_data()? ? ? ? ? ? ? ? #?丟棄原始日志文本(不建議這么做)

這里引用了幾個用戶自定義的 pattern,如?_dklog_date、_dklog_level。我們將這些規(guī)則存放?<datakit安裝目錄>/pipeline/pattern 下。

注意,用戶自定義 pattern 如果需要==全局生效==(即在其它 Pipeline 腳本中應(yīng)用),必須放置在?<DataKit安裝目錄/pipeline/pattern/>?目錄下):

$ cat pipeline/pattern/datakit

# 注意:自定義的這些 pattern,命名最好加上特定的前綴,以免跟內(nèi)置的命名沖突(內(nèi)置 pattern 名稱不允許覆蓋)

# 自定義 pattern 格式為:

#? ? <pattern-name><空格><具體 pattern 組合>

_dklog_date %{YEAR}-%{MONTHNUM}-%{MONTHDAY}T%{HOUR}:%{MINUTE}:%{SECOND}%{INT}

_dklog_level (DEBUG|INFO|WARN|ERROR|FATAL)

_dklog_mod %{WORD}

_dklog_source_file (/?[\w_%!$@:.,-]?/?)(\S+)?

_dklog_msg?%{GREEDYDATA}

現(xiàn)在 pipeline 以及其引用的 pattern 都有了,就能通過 DataKit 內(nèi)置的 pipeline 調(diào)試工具,對這一行日志進(jìn)行切割:

# 提取成功示例

$ ./datakit --pl dklog_pl.p --txt '2021-01-11T17:43:51.887+0800? DEBUG io? io/io.go:458? post cost 6.87021ms'

Extracted data(cost: 421.705μs):

{? ??

"code": "io/io.go:458",? ??

"level": "DEBUG",? ??

"module": "io",? ??

"msg": "post cost 6.87021ms",? ??

"time": 1610358231887000000

}

FAQ

Pipeline 調(diào)試時,為什么變量無法引用?

Pipeline 為:

json(_, message, "message")

json(_, thread_name, "thread")

json(_, level, "status")

json(_,?@timestamp,?"time")

其報錯如下:

[E]?new?piepline?failed:?4:8?parse?error:?unexpected?character:?'@'

A: 對于有特殊字符的變量,需將其用兩個?`?修飾一下:

json(_,?`@timestamp`,?"time")

參見【?Pipeline 的基本語法規(guī)則?】https://docs.guance.com/developers/pipeline/#basic-syntax

Pipeline 調(diào)試時,為什么找不到對應(yīng)的 Pipeline 腳本?

命令如下:

$ datakit pipeline test.p -T "..."[E]?get?pipeline?failed:?stat?/usr/local/datakit/pipeline/test.p:?no?such?file?or?directory

A: 調(diào)試用的 Pipeline 腳本,需將其放置到?/pipeline?目錄下。

如何在一個 Pipeline 中切割多種不同格式的日志?

在日常的日志中,因為業(yè)務(wù)的不同,日志會呈現(xiàn)出多種形態(tài),此時,需寫多個 Grok 切割,為提高 Grok 的運行效率,可根據(jù)日志出現(xiàn)的頻率高低,優(yōu)先匹配出現(xiàn)頻率更高的那個 Grok,這樣,大概率日志在前面幾個 Grok 中就匹配上了,避免了無效的匹配。

在日志切割中,Grok 匹配是性能開銷最大的部分,故避免重復(fù)的 Grok 匹配,能極大的提高 Grok 的切割性能。

grok(_, "%{NOTSPACE:client_ip} %{NOTSPACE:http_ident} ...")

if client_ip != nil {? ??

# 證明此時上面的 grok 已經(jīng)匹配上了,那么就按照該日志來繼續(xù)后續(xù)處理? ??

...

} else {? ??

# 這里說明是不同的日志來了,上面的 grok 沒有匹配上當(dāng)前的日志? ??

grok(_, "%{date2:time} \\[%{LOGLEVEL:status}\\] %{GREEDYDATA:msg} ...")? ??

if status != nil {? ? ? ?

?# 此處可再檢查上面的 grok 是否匹配上...? ??

} else {? ? ? ??

# 未識別的日志,或者,在此可再加一個 grok 來處理,如此層層遞進(jìn)? ??

}

}

如何丟棄字段切割

在某些情況下,我們需要的只是日志==中間的幾個字段==,但不好跳過前面的部分,比如

200?356?1?0?44?30032?other?messages

其中,我們只需要 44 這個值,它可能代碼響應(yīng)延遲,那么可以這樣切割(即 Grok 中不附帶?:some_field 這個部分):

grok(_,?"%{INT}?%{INT}?%{INT}?%{INT:response_time}?%{GREEDYDATA}")

add_pattern()?轉(zhuǎn)義問題

大家在使用 add_pattern()?添加局部模式時,容易陷入轉(zhuǎn)義問題,比如如下這個 pattern(用來通配文件路徑以及文件名):

(/?[\w_%!$@:.,-]?/?)(\S+)?

如果我們將其放到全局 pattern 目錄下(即?pipeline/pattern?目錄),可這么寫:

# my-testsource_file?(/?[\w_%!$@:.,-]?/?)(\S+)?

如果使用 add_pattern(),就需寫成這樣:

# my-test.padd_pattern('source_file','(/?[\\w_%!$@:.,-]?/?)(\\S+)?')

即這里面反斜杠需要轉(zhuǎn)義。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,628評論 19 139
  • 標(biāo)簽(空格分隔): 架構(gòu) 日志 聚合 一、背景需求&常見方案 1、日志分類和作用 2、傳統(tǒng)日志管理方案&存在的問題...
    ivan_cloud閱讀 15,929評論 1 80
  • 官網(wǎng)地址:https://www.elastic.co/cn/ 官網(wǎng)權(quán)威指南:https://www.elasti...
    Anwar_ec28閱讀 6,907評論 0 11
  • Java 常見英語單詞 (1.0 版本) 1. Java 基礎(chǔ)常見英語詞匯(70 個) OO: object-or...
    Nemo359閱讀 2,637評論 0 0
  • Java基礎(chǔ)常見英語詞匯(共70個)['?bd?ekt] ['?:rientid]導(dǎo)向的 ...
    今夜子辰閱讀 3,487評論 1 34

友情鏈接更多精彩內(nèi)容