Apache Druid 平臺化 - 數(shù)據(jù)接入篇

數(shù)據(jù)接入

背景

使用界面化和 sql 的方式將數(shù)據(jù)導入到 druid,提供數(shù)據(jù)給后續(xù)告警,監(jiān)控,查詢等服務使用。

方案設計

數(shù)據(jù)源1:內部消息服務 dclog ,本質上是一個 kafka topic,使用 record header 進行應用劃分,可以抽象為單 Kafka topic 寫多個 Druid datasource 的場景
數(shù)據(jù)源2:Kafka
數(shù)據(jù)源3:Hive

kafka 使用 kis 方式接入,hive 使用 hadoop batch ingestion 接入,這里主要是考慮消費 dclog 寫入 druid 的三種方式:

方案 描述 優(yōu)缺點
KIS 在 kafka indexing service 消費的時候對 header 進行 filter (需要修改源碼在消費時增加 header filter 的功能),然后寫入相應的 datasource。
參考: Druid 消費一個 kafka topic 發(fā)送到不同數(shù)據(jù)源
這種方式不需要額外的外部依賴,修改的源碼也比較少,但是每個 task 都需要遍歷相同 topic 的數(shù)據(jù)對其進行過濾,對 kafka 的壓力比較大,同時每個 datasource 都需要一個 task。
Spark SS + Tranquility 使用 spark struct streaming 進行消費,(目前的 release 版本還不支持直接讀取 kafka header,需要打入一個 patch Add support for Kafka headers in Structured Streaming),消費后直接通過http 的方式發(fā)送到 tranquility server,然后由 tranquility 寫入相應的 datasource。 task 由統(tǒng)一的 tranquility 負責,每次更新配置需要重新啟動配置,該過程中可能會丟失數(shù)據(jù),需要用 batch 任務進行 overwrite。
Spark SS + KIS 同樣使用 spark SS 進行消費,將消費后的數(shù)據(jù)寫入到對應的 kafka topic 中,使用 KIS 消費相應 topic 并將數(shù)據(jù)寫入到對應的 datasource 類似第一種,額外的一些區(qū)別是每個 kis 只需要訪問相應的數(shù)據(jù),但是需要對 topic 中的每種 header 建立一個新的 kafka topic。

出于控制 druid indexing task 和 kafka topic 數(shù)量的目的,最早的時候選擇了方案二:


方案二

但在實際使用的一段過程中,發(fā)現(xiàn)諸多問題,例如:

  1. tranquility 從 druid 0.9 以后就停止更新,因此相關的 ingestion 相關功能缺失,例如 jq 解析,數(shù)值類型的維度列等。
  2. 提供的接口過于簡單,很多狀態(tài)無法獲取,例如 kafka lag 等。
  3. 使用的 stream push 模型,任務異常終端會造成數(shù)據(jù)丟失,需要使用 hive 備份數(shù)據(jù)源進行回補。
  4. 超過時間窗口的數(shù)據(jù)會丟失。
  5. tranquility 配置更新需要重啟服務。

此外同時維護兩套代碼邏輯不僅增加了系統(tǒng)復雜度,還大大增加了編碼和維護的工作量。

后續(xù)改為方案三:


方案三

實際生成的 indexing task 和 kafka topic 數(shù)量其實并不多,不需要把這個因素作為選型的重要指標進行判斷。

該方案數(shù)據(jù)消費的延時和配置更新的間隔對比第二種都有大幅度的減少,目前能做到數(shù)據(jù)1S接入延時,1分鐘內更新配置,且更新配置的過程中不會丟失數(shù)據(jù)。

druid 數(shù)據(jù)接入的一些經(jīng)驗

一. hive 數(shù)據(jù)導入自動填充分區(qū)

因為 hive 分區(qū)信息并不包含在真實數(shù)據(jù)中,可以使用 missing value 進行填充

// 該例子中的 ds 字段為分區(qū)字段,如果該字段在真實數(shù)據(jù)中不存在,就會以 2019-01-01 值進行填充
"timestampSpec": {
    "column": "ds"
    "format": "yyyyMMdd"
    "missingValue": "2019-01-01"
}

二. zstd 編碼支持

有部分 hive 表使用 zstd 壓縮,所以需要 druid 支持相應的編碼。

Add Codec for ZStandard Compression hadoop 在2.9版本提供了 zstd 的支持。

我們目前使用的版本為 hadoop2.6.0-cdh5.15.0,原生還不支持 zstd,需要打入相關補丁,重新編譯獲取 hadoop-common 后,將 druid 目錄下的 hadoop-dependencies/hadoop-client/2.6.0-cdh5.15.0 / extensions/druid-hdfs-storage / extensions/druid-kerberos 中的 hadoop-common 包進行替換。

然后在 hadoop 導入作業(yè)中指定包含 zstd.so 的 hadoop_native 地址

"mapreduce.reduce.java.opts": "-Djava.library.path=/usr/install/libraries/hadoop_native"
"mapreduce.map.java.opts": "-Djava.library.path=/usr/install/libraries/hadoop_native"

三. keberos

historical 節(jié)點需要 kinit 登錄,否則 historical 無法連接 hdfs

四. druid-0.15.0 進程自動退出

druid-0.15.0 以后提供了新的服務啟動方式,例如 /bin/start-cluster-data-server,如果用 nohup 啟動后沒有使用 exit 命令退出終端,在終端斷開時會被認為是異常中斷,相應進程也會被關閉,日志信息如下:

[Fri Aug 16 18:36:22 2019] Sending signal[15] to command[broker] (timeout 360s).
[Fri Aug 16 18:36:22 2019] Sending signal[15] to command[coordinator-overlord] (timeout 360s).
[Fri Aug 16 18:36:22 2019] Sending signal[15] to command[router] (timeout 360s).
[Fri Aug 16 18:36:22 2019] Command[router] exited (pid = 37865, exited = 143)
[Fri Aug 16 18:36:23 2019] Command[broker] exited (pid = 37864, exited = 143)
[Fri Aug 16 18:36:23 2019] Exiting.
[Fri Aug 16 18:37:44 2019] Command[coordinator-overlord] exited (pid = 42773, exited = 143)
[Fri Aug 16 18:37:44 2019] Exiting.

五. 啟動 supervisor 失敗

啟動 supervisor 時提示錯誤

"error": "Instantiation of [simple type, class io.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig] value failed: Failed to create directory within 10000 attempts (tried 1508304116732-0 to 1508304116732-9999)"

overlord 節(jié)點 java.io.tmpdir 指定路徑不存在導致。

六. druid 對數(shù)據(jù)進行 ETL

一些簡單的 ETL 使用 ingestion 中的 transformSpec 中的 filter / transform 和 flattenSpec 就能解決,較為復雜的例如數(shù)據(jù)1對多等可以借助于 flink / spark SS 的計算能力。下面舉3個??

  1. dimension 重命名

    "dataSchema" : {
        "dimensionsSpec" : {
            "dimensions" : ["mb"]
        }
    }
      
    "transformSpec" : {
        "transforms" : [{
            "type" : "expression",
            "name" : "mb",
            "expression" : "mobile"
        }]
    }
    
  2. 從 seqId 中抽取前 13 位作為 timestampSpec 的時間戳

    "flattenSpec": {
        "fields": [{
             "expr": ".seqId[0:13]",
                 "name": "ts",
                 "type": "jq"
         }]
    },
    "timestampSpec": {
        "column": "ts",
         "format": "auto"
    }
    
  3. 一個相對復雜的判斷

    數(shù)據(jù)格式:將記錄只有存在 event_type 為 a 的數(shù)據(jù)才接入 datasource,service_info 的值可能是一個 json 對象也可能是一個 json 數(shù)組

    // record1
    "service_info":[{"event_type","a"},{"event_type","b"}]
    
    // record2
    "service_info":{"event_type","a"}
    

    ingestion 描述:

    "flattenSpec": {
        "fields": [{
             "expr": "[try .service_info[].event_type, try .service_info.event_type] | contains([\"a\"])",
                 "name": "isA",
                 "type": "jq"
         }]
    }
                 
    "transformSpec": {
        "filter": {
            "type": "and",
            "fields": [{
                 "type": "selector",
                     "dimension": "isA",
                     "value": "true",
                     "extractionFn": null
                  }]
         },
        "transforms": []
    }
    

一些截圖

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容