Apache Doris Broker數據導入使用示例及介紹

1.概要

Broker load 是一個異步的導入方式,支持的數據源取決于 Broker 進程支持的數據源。

用戶需要通過 MySQL 協(xié)議 創(chuàng)建 Broker load 導入,并通過查看導入命令檢查導入結果

主要適用于以下場景:

  • 外部數據源(如 HDFS等)讀取數據,導入到Doris中。
  • 數據量在 幾十到百GB 級別。
  • 主要用于數據遷移,或者定時批量導入

Broker load 支持文件類型:PARQUET、ORC、CSV格式

2. 原理

用戶在提交導入任務后,F(xiàn)E 會生成對應的 Plan 并根據目前 BE 的個數和文件的大小,將 Plan 分給 多個 BE 執(zhí)行,每個 BE 執(zhí)行一部分導入數據。

BE 在執(zhí)行的過程中會從 Broker 拉取數據,在對數據 transform 之后將數據導入系統(tǒng)。所有 BE 均完成導入,由 FE 最終決定導入是否成功

image-20210922153814143.png

3. 使用方式

Apache Doris Broker Load方式是通過 doris 提供的 Broker load SQL語句創(chuàng)建。

3.1 SQL 語法

下面是 SQL 語法,具體使用不清楚的地方也可以在Mysql Client 命令行下執(zhí)行 help broker load查看具體使用方法

LOAD LABEL db_name.label_name 
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]

* data_desc:

    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator ]
    [(col1, ...)]
    [PRECEDING FILTER predicate]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]

* broker_properties: 

    (key1=value1, ...)

3.2 實例

這里我們使用Broker load方式,從hive 分區(qū)表中將數據導入到Doris指定的表中

下面是我的hive表結構,數據格式是:ORC,分區(qū)字段是:budat

CREATE TABLE purchase_hive_iostock (
  lgort string NULL,
  mblnr string NULL,
  mblpo string NULL,
  werks string NULL,
  ebeln string NULL,
  ebelp string NULL,
  aufnr string NULL,
  rsnum string NULL,
  rspos string NULL,
  kdauf string NULL,
  kdpos string NULL,
  bwart string NULL,
  menge decimal(18, 3) NULL,,
  meins string NULL ,
  matnr string NULL ,
  bukrs string NULL ,
  waers string NULL ,
  dmbtr decimal(18, 3) NULL ,
  shkzg string NULL    ,
  bstme string NULL    ,
  bstmg decimal(13, 2) NULL ,
  temp1 string NULL ,
  temp2 string NULL ,
  temp3 string NULL ,
  temp4 string NULL ,
  temp5 string NULL ,
  rq datetime NULL        
)
COMMENT '出入庫記錄'
PARTITIONED BY(budat STRING)
....

Doris 中對應的表:

CREATE TABLE `ods_purchase_hive_iostock_delta` (
  `budat` date NOT NULL    ,
  `lgort` varchar(100) NULL,
  `mblnr` varchar(100) NULL,
  `mblpo` varchar(100) NULL,
  `werks` varchar(100) NULL,
  `ebeln` varchar(100) NULL,
  `ebelp` varchar(100) NULL,
  `aufnr` varchar(100) NULL,
  `rsnum` varchar(100) NULL,
  `rspos` varchar(100) NULL,
  `kdauf` varchar(100) NULL,
  `kdpos` varchar(100) NULL,
  `bwart` varchar(100) NULL,
  `menge` decimal(18, 3) NULL ,
  `meins` varchar(100) NULL,
  `matnr` varchar(100) NULL,
  `bukrs` varchar(100) NULL,
  `waers` varchar(100) NULL,
  `dmbtr` decimal(18, 3) NULL,
  `shkzg` varchar(10) NULL   ,
  `bstme` varchar(20) NULL   ,
  `bstmg` decimal(13, 2) NULL,
  `temp1` varchar(100) NULL ,
  `temp2` varchar(100) NULL ,
  `temp3` varchar(100) NULL ,
  `temp4` varchar(100) NULL ,
  `temp5` varchar(100) NULL ,
  `rq` datetime NULL ,
) ENGINE=OLAP
UNIQUE KEY(`budat`, `lgort`, `mblnr`, `mblpo`, `werks`)
COMMENT "出入庫記錄數據表"
PARTITION BY RANGE(`budat`)
(PARTITION P_000000 VALUES [('0000-01-01'), ('2021-01-01')),
PARTITION P_202101 VALUES [('2021-01-01'), ('2021-02-01')),
PARTITION P_202102 VALUES [('2021-02-01'), ('2021-03-01')),
PARTITION P_202103 VALUES [('2021-03-01'), ('2021-04-01')),
PARTITION P_202104 VALUES [('2021-04-01'), ('2021-05-01')),
PARTITION P_202105 VALUES [('2021-05-01'), ('2021-06-01')),
PARTITION P_202106 VALUES [('2021-06-01'), ('2021-07-01')),
PARTITION P_202107 VALUES [('2021-07-01'), ('2021-08-01')),
PARTITION P_202108 VALUES [('2021-08-01'), ('2021-09-01')),
PARTITION P_202109 VALUES [('2021-09-01'), ('2021-10-01')),
PARTITION P_202110 VALUES [('2021-10-01'), ('2021-11-01')),
PARTITION P_202111 VALUES [('2021-11-01'), ('2021-12-01')))
DISTRIBUTED BY HASH(`werks`) BUCKETS 5
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.replication_num" = "3",
"dynamic_partition.buckets" = "5",
"dynamic_partition.start_day_of_month" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);

下面的語句是將Hive 分區(qū)表中的數據導入到Doris 對應的表中

LOAD LABEL order_bill_2021_0915_3
(
    DATA INFILE("hdfs://namenodeservice1/user/data/hive_db/data_ods.db/purchase_hive_iostock/*/*")
    INTO TABLE ods_purchase_hive_iostock_delta
    COLUMNS TERMINATED BY "\\x01"
    FORMAT AS "orc"   (lgort,mblnr,mblpo,werks,ebeln,ebelp,aufnr,rsnum,rspos,kdauf,kdpos,bwart,menge,meins,matnr,bukrs,waers,dmbtr,shkzg,bstme,bstmg,temp1,temp2,temp3,temp4,temp5,rq)
    COLUMNS FROM PATH AS (budat)
    SET (budat=budat,lgort=lgort,mblnr=mblnr,mblpo=mblpo,werks=werks,ebeln=ebeln,ebelp=ebelp,aufnr=aufnr,rsnum=rsnum,rspos=rspos,kdauf=kdauf,kdpos=kdpos,bwart=bwart,menge=menge,meins=meins,matnr=matnr,bukrs=bukrs,waers=waers,dmbtr=dmbtr,shkzg=shkzg,bstme=bstme,bstmg=bstmg,temp1=temp1,temp2=temp2,temp3=temp3,temp4=temp4,temp5=temp5,rq=rq)
    where 1=1
)
WITH BROKER "hdfs_broker"
(
    "dfs.nameservices"="hadoop",
    "dfs.ha.namenodes.eadhadoop" = "nn1,nn2",
    "dfs.namenode.rpc-address.eadhadoop.nn1" = "222:8000",
    "dfs.namenode.rpc-address.eadhadoop.nn2" = "117:8000",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "ddd.COM",
    "kerberos_keytab_content" = "BQHIININ1111URAAE="
)
PROPERTIES
(
    "timeout"="1200",
    "max_filter_ratio"="0.1"
);

然后提交任務就行了

3.3 認證方式

  • 支持簡單認證訪問
  • 支持通過 kerberos 認證訪問
  • 支持 HDFS HA 模式訪問

3.3.1 簡單認證方式

簡單認證即 Hadoop 配置 hadoop.security.authenticationsimple

使用系統(tǒng)用戶訪問 HDFS?;蛘咴?Broker 啟動的環(huán)境變量中添加:HADOOP_USER_NAME。密碼置空即可

(
    "username" = "user",
    "password" = ""
);

3.3.2 Kerberos 認證

上面示例使用的就是Kerberos認證方式。

該認證方式需提供以下信息:

  • hadoop.security.authentication:指定認證方式為 kerberos。
  • kerberos_principal:指定 kerberos 的 principal。
  • kerberos_keytab:指定 kerberos 的 keytab 文件路徑。該文件必須為 Broker 進程所在服務器上的文件的絕對路徑。并且可以被 Broker 進程訪問。
  • kerberos_keytab_content:指定 kerberos 中 keytab 文件內容經過 base64 編碼之后的內容。這個跟 kerberos_keytab 配置二選一即可

示例:

(
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris@YOUR.COM",
    "kerberos_keytab" = "/home/doris/my.keytab"
)

或者

(
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris@YOUR.COM",
    "kerberos_keytab_content" = "ASDOWHDLAWIDJHWLDKSALDJSDIWALD"
)

如果采用Kerberos認證方式,則部署B(yǎng)roker進程的時候需要krb5.conf文件, krb5.conf文件包含Kerberos的配置信息,通常,您應該將krb5.conf文件安裝在目錄/etc中。您可以通過設置環(huán)境變量KRB5_CONFIG覆蓋默認位置。 krb5.conf文件的內容示例如下:

[libdefaults]
    default_realm = DORIS.HADOOP
    default_tkt_enctypes = des3-hmac-sha1 des-cbc-crc
    default_tgs_enctypes = des3-hmac-sha1 des-cbc-crc
    dns_lookup_kdc = true
    dns_lookup_realm = false

[realms]
    DORIS.HADOOP = {
        kdc = kerberos-doris.hadoop.service:7005
    }

3.3.3 HDFS HA 模式

Doris 只是HDFS HA模式,下面這個配置用于訪問以 HA 模式部署的 HDFS 集群。

  • dfs.nameservices:指定 hdfs 服務的名字,自定義,如:"dfs.nameservices" = "my_ha"。
  • dfs.ha.namenodes.xxx:自定義 namenode 的名字,多個名字以逗號分隔。其中 xxx 為 dfs.nameservices 中自定義的名字,如: "dfs.ha.namenodes.my_ha" = "my_nn"。
  • dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"。
  • dfs.client.failover.proxy.provider:指定 client 連接 namenode 的 provider,默認為:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。

示例如下:

(
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

如果HA模式和認證結合使用,示例如下:

(
    "username"="user",
    "password"="passwd",
    "dfs.nameservices" = "my_ha",
    "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
    "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
    "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)

關于HDFS集群的配置可以寫入hdfs-site.xml文件中,用戶使用Broker進程讀取HDFS集群的信息時,只需要填寫集群的文件路徑名和認證信息即可。

3.4 主要參數介紹

創(chuàng)建導入的詳細語法執(zhí)行 HELP BROKER LOAD 查看語法幫助。這里主要介紹 Broker load 的創(chuàng)建導入語法中參數意義和注意事項:

3.4.1 Label

導入任務的標識。每個導入任務,都有一個在單 database 內部唯一的 Label。Label 是用戶在導入命令中自定義的名稱。通過這個 Label,用戶可以查看對應導入任務的執(zhí)行情況。

Label 的另一個作用,是防止用戶重復導入相同的數據。強烈推薦用戶同一批次數據使用相同的label。這樣同一批次數據的重復請求只會被接受一次,保證了 At-Most-Once 語義

當 Label 對應的導入作業(yè)狀態(tài)為 CANCELLED 時,可以再次使用該 Label 提交導入作業(yè)

3.4.2 數據描述類參數

數據描述類參數主要指的是 Broker load 創(chuàng)建導入語句中的屬于 data_desc 部分的參數。每組 data_desc 主要表述了本次導入涉及到的數據源地址,ETL 函數,目標表及分區(qū)等信息

  • 多表導入

    Broker load 支持一次導入任務涉及多張表,每個 Broker load 導入任務可在多個 data_desc 聲明多張表來實現(xiàn)多表導入。每個單獨的 data_desc 還可以指定屬于該表的數據源地址。Broker load 保證了單次導入的多張表之間原子性成功或失敗。

  • negative

    data_desc中還可以設置數據取反導入。這個功能主要用于,當數據表中聚合列的類型都為 SUM 類型時。如果希望撤銷某一批導入的數據。則可以通過 negative 參數導入同一批數據。Doris 會自動為這一批數據在聚合列上數據取反,以達到消除同一批數據的功能。

  • partition

    data_desc 中可以指定待導入表的 partition 信息,如果待導入數據不屬于指定的 partition 則不會被導入。同時,不在指定 Partition 的數據會被認為是錯誤數據。

  • set column mapping

    data_desc 中的 SET 語句負責設置列函數變換,這里的列函數變換支持所有查詢的等值表達式變換。如果原始數據的列和表中的列不一一對應,就需要用到這個屬性。

  • preceding filter predicate

    用于過濾原始數據。原始數據是未經列映射、轉換的數據。用戶可以在對轉換前的數據前進行一次過濾,選取期望的數據,再進行轉換。

  • where predicate

    data_desc 中的 WHERE 語句中負責過濾已經完成 transform 的數據,被 filter 的數據不會進入容忍率的統(tǒng)計中。如果多個 data_desc 中聲明了同一張表的多個條件的話,則會 merge 同一張表的多個條件,merge 策略是 AND

  • COLUMNS FROM PATH AS

    因為Hive的分區(qū)是體現(xiàn)在HDFS的路徑上,我們在導入的時候,可以通過這個參數動態(tài)的通過HDFS路徑獲取hive分區(qū)字段,

  • FORMAT

    數據源的數據文件格式,如果是PARQUET或者ORC格式的數據,需要再文件頭的列名與doris表中的列名一致

    示例:

    (tmp_c1,tmp_c2)
    SET
    (
        id=tmp_c2,
        name=tmp_c1
    )
    

代表獲取在parquet或orc中以(tmp_c1, tmp_c2)為列名的列,映射到doris表中的(id, name)列。如果沒有設置set, 則以column中的列作為映射

3.4.3 導入作業(yè)參數

導入作業(yè)參數主要指的是 Broker load 創(chuàng)建導入語句中的屬于 opt_properties部分的參數。導入作業(yè)參數是作用于整個導入作業(yè)的。

  • timeout

    導入作業(yè)的超時時間(以秒為單位),用戶可以在 opt_properties 中自行設置每個導入的超時時間。導入任務在設定的 timeout 時間內未完成則會被系統(tǒng)取消,變成 CANCELLED。Broker load 的默認導入超時時間為4小時。

    通常情況下,用戶不需要手動設置導入任務的超時時間。當在默認超時時間內無法完成導入時,可以手動設置任務的超時時間。

    推薦超時時間

    總文件大?。∕B) / 用戶 Doris 集群最慢導入速度(MB/s) > timeout > ((總文件大小(MB) * 待導入的表及相關 Roll up 表的個數) / (10 * 導入并發(fā)數) )

    導入并發(fā)數見文檔最后的導入系統(tǒng)配置說明,公式中的 10 為目前的導入限速 10MB/s。

    例如一個 1G 的待導入數據,待導入表包含3個 Rollup 表,當前的導入并發(fā)數為 3。則 timeout 的 最小值為 (1 * 1024 * 3 ) / (10 * 3) = 102 秒

    由于每個 Doris 集群的機器環(huán)境不同且集群并發(fā)的查詢任務也不同,所以用戶 Doris 集群的最慢導入速度需要用戶自己根據歷史的導入任務速度進行推測。

  • max_filter_ratio

    導入任務的最大容忍率,默認為0容忍,取值范圍是0~1。當導入的錯誤率超過該值,則導入失敗。

    如果用戶希望忽略錯誤的行,可以通過設置這個參數大于 0,來保證導入可以成功。

    計算公式為:

    max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )

    dpp.abnorm.ALL 表示數據質量不合格的行數。如類型不匹配,列數不匹配,長度不匹配等等。

    dpp.norm.ALL 指的是導入過程中正確數據的條數。可以通過 SHOW LOAD 命令查詢導入任務的正確數據量。

    原始文件的行數 = dpp.abnorm.ALL + dpp.norm.ALL

  • exec_mem_limit

    導入內存限制。默認是 2GB。單位為字節(jié)。

  • strict_mode

    Broker load 導入可以開啟 strict mode 模式。開啟方式為 properties ("strict_mode" = "true") 。默認的 strict mode 為關閉。

    strict mode 模式的意思是:對于導入過程中的列類型轉換進行嚴格過濾。嚴格過濾的策略如下:

    1. 對于列類型轉換來說,如果 strict mode 為true,則錯誤的數據將被 filter。這里的錯誤數據是指:原始數據并不為空值,在參與列類型轉換后結果為空值的這一類數據。
    2. 對于導入的某列由函數變換生成時,strict mode 對其不產生影響。
    3. 對于導入的某列類型包含范圍限制的,如果原始數據能正常通過類型轉換,但無法通過范圍限制的,strict mode 對其也不產生影響。例如:如果類型是 decimal(1,0), 原始數據為 10,則屬于可以通過類型轉換但不在列聲明的范圍內。這種數據 strict 對其不產生影響。
  • merge_type 數據的合并類型,一共支持三種類型APPEND、DELETE、MERGE 其中,APPEND是默認值,表示這批數據全部需要追加到現(xiàn)有數據中,DELETE 表示刪除與這批數據key相同的所有行,MERGE 語義 需要與delete 條件聯(lián)合使用,表示滿足delete 條件的數據按照DELETE 語義處理其余的按照APPEND 語義處理

3.4.4 strict mode 與 source data 的導入關系

這里以列類型為 TinyInt 來舉例

注:當表中的列允許導入空值時

源數據 源數據示例 string to int strict_mode result
空值 \N N/A true or false NULL
not null aaa or 2000 NULL true 無效數據(被過濾)
not null aaa NULL false NULL
not null 1 1 true or false 正確數據

這里以列類型為 Decimal(1,0) 舉例

注:當表中的列允許導入空值時

源數據 源數據示例 string to int strict_mode result
空值 \N N/A true or false NULL
not null aaa NULL true 無效數據(被過濾)
not null aaa NULL false NULL
not null 1 or 10 1 true or false 正確數據

注意:10 雖然是一個超過范圍的值,但是因為其類型符合 decimal的要求,所以 strict mode對其不產生影響。10 最后會在其他 ETL 處理流程中被過濾。但不會被 strict mode 過濾。

3.5 查看結果

Broker load 導入方式由于是異步的,所以用戶必須將創(chuàng)建導入的 Label 記錄,并且在查看導入命令中使用 Label 來查看導入結果。查看導入命令在所有導入方式中是通用的,具體語法可執(zhí)行 HELP SHOW LOAD 查看。

mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:N/A; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:46:44
 LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://192.168.1.10:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
    JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}

下面主要介紹了查看導入命令返回結果集中參數意義:

  • JobId

    導入任務的唯一ID,每個導入任務的 JobId 都不同,由系統(tǒng)自動生成。與 Label 不同的是,JobId永遠不會相同,而 Label 則可以在導入任務失敗后被復用。

  • Label

    導入任務的標識。

  • State

    導入任務當前所處的階段。在 Broker load 導入過程中主要會出現(xiàn) PENDING 和 LOADING 這兩個導入中的狀態(tài)。如果 Broker load 處于 PENDING 狀態(tài),則說明當前導入任務正在等待被執(zhí)行;LOADING 狀態(tài)則表示正在執(zhí)行中。

    導入任務的最終階段有兩個:CANCELLED 和 FINISHED,當 Load job 處于這兩個階段時,導入完成。其中 CANCELLED 為導入失敗,F(xiàn)INISHED 為導入成功。

  • Progress

    導入任務的進度描述。分為兩種進度:ETL 和 LOAD,對應了導入流程的兩個階段 ETL 和 LOADING。目前 Broker load 由于只有 LOADING 階段,所以 ETL 則會永遠顯示為 N/A

    LOAD 的進度范圍為:0~100%。

    LOAD 進度 = 當前完成導入的表個數 / 本次導入任務設計的總表個數 * 100%

    如果所有導入表均完成導入,此時 LOAD 的進度為 99% 導入進入到最后生效階段,整個導入完成后,LOAD 的進度才會改為 100%。

    導入進度并不是線性的。所以如果一段時間內進度沒有變化,并不代表導入沒有在執(zhí)行。

  • Type

    導入任務的類型。Broker load 的 type 取值只有 BROKER。

  • EtlInfo

    主要顯示了導入的數據量指標 unselected.rows , dpp.norm.ALLdpp.abnorm.ALL。用戶可以根據第一個數值判斷 where 條件過濾了多少行,后兩個指標驗證當前導入任務的錯誤率是否超過 max_filter_ratio

    三個指標之和就是原始數據量的總行數。

  • TaskInfo

    主要顯示了當前導入任務參數,也就是創(chuàng)建 Broker load 導入任務時用戶指定的導入任務參數,包括:cluster,timeoutmax_filter_ratio

  • ErrorMsg

    在導入任務狀態(tài)為CANCELLED,會顯示失敗的原因,顯示分兩部分:type 和 msg,如果導入任務成功則顯示 N/A

    type的取值意義:

    USER_CANCEL: 用戶取消的任務
    ETL_RUN_FAIL:在ETL階段失敗的導入任務
    ETL_QUALITY_UNSATISFIED:數據質量不合格,也就是錯誤數據率超過了 max_filter_ratio
    LOAD_RUN_FAIL:在LOADING階段失敗的導入任務
    TIMEOUT:導入任務沒在超時時間內完成
    UNKNOWN:未知的導入錯誤
    
  • CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime

    這幾個值分別代表導入創(chuàng)建的時間,ETL階段開始的時間,ETL階段完成的時間,Loading階段開始的時間和整個導入任務完成的時間。

    Broker load 導入由于沒有 ETL 階段,所以其 EtlStartTime, EtlFinishTime, LoadStartTime 被設置為同一個值。

    導入任務長時間停留在 CreateTime,而 LoadStartTime 為 N/A 則說明目前導入任務堆積嚴重。用戶可減少導入提交的頻率。

    LoadFinishTime - CreateTime = 整個導入任務所消耗時間
    LoadFinishTime - LoadStartTime = 整個 Broker load 導入任務執(zhí)行時間 = 整個導入任務所消耗時間 - 導入任務等待的時間
    
  • URL

    導入任務的錯誤數據樣例,訪問 URL 地址既可獲取本次導入的錯誤數據樣例。當本次導入不存在錯誤數據時,URL 字段則為 N/A。

  • JobDetails

    顯示一些作業(yè)的詳細運行狀態(tài)。包括導入文件的個數、總大?。ㄗ止?jié))、子任務個數、已處理的原始行數,運行子任務的 BE 節(jié)點 Id,未完成的 BE 節(jié)點 Id。

    {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
    

    其中已處理的原始行數,每 5 秒更新一次。該行數僅用于展示當前的進度,不代表最終實際的處理行數。實際處理行數以 EtlInfo 中顯示的為準。

3.6 取消導入

當 Broker load 作業(yè)狀態(tài)不為 CANCELLED 或 FINISHED 時,可以被用戶手動取消。取消時需要指定待取消導入任務的 Label 。取消導入命令語法可執(zhí)行 HELP CANCEL LOAD查看

3.7 Broker 導入相關參數

以下三個參數主要是為了控制導入的速度

  • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

    前兩個配置限制了單個 BE 處理的數據量的最小和最大值。第三個配置限制了一個作業(yè)的最大的導入并發(fā)數。最小處理的數據量,最大并發(fā)數,源文件的大小和當前集群 BE 的個數 共同決定了本次導入的并發(fā)數。

    本次導入并發(fā)數 = Math.min(源文件大小/最小處理量,最大并發(fā)數,當前BE節(jié)點個數)
    本次導入單個BE的處理量 = 源文件大小/本次導入的并發(fā)數
    

    通常一個導入作業(yè)支持的最大數據量為 max_bytes_per_broker_scanner * BE 節(jié)點數。如果需要導入更大數據量,則需要適當調整 max_bytes_per_broker_scanner 參數的大小。

    默認配置:

    參數名:min_bytes_per_broker_scanner, 默認 64MB,單位bytes。
    參數名:max_broker_concurrency, 默認 10。
    參數名:max_bytes_per_broker_scanner,默認 3G,單位bytes。
    

4. 最佳實踐

使用 Broker load 最適合的場景就是原始數據在文件系統(tǒng)(HDFS等)中的場景。其次,由于 Broker load 是單次導入中唯一的一種異步導入的方式,所以如果用戶在導入大文件中,需要使用異步接入,也可以考慮使用 Broker load

下面以單個BE使用Broker方式導入,在導入的數據量上的建議,如果你有多個BE,數據量應該乘以BE的數量來進行計算

  1. 3G數據

    用戶可以直接提交 Broker load 創(chuàng)建導入請求。

  2. 3G以上

    由于單個導入 BE 最大的處理量為 3G,超過 3G 的待導入文件就需要通過調整 Broker load 的導入參數來實現(xiàn)大文件的導入

    • 最大掃描量和最大并發(fā)數

      根據當前 BE 的個數和原始文件的大小修改單個 BE 的最大掃描量和最大并發(fā)數

      修改 fe.conf 中配置
      
      max_broker_concurrency = BE 個數
      當前導入任務單個 BE 處理的數據量 = 原始文件大小 / max_broker_concurrency
      max_bytes_per_broker_scanner >= 當前導入任務單個 BE 處理的數據量
      
      比如一個 100G 的文件,集群的 BE 個數為 10 個
      max_broker_concurrency = 10
      max_bytes_per_broker_scanner >= 10G = 100G / 10
      

      修改后,所有的 BE 會并發(fā)的處理導入任務,每個 BE 處理原始文件的一部分。

      注意:上述兩個 FE 中的配置均為系統(tǒng)配置,也就是說其修改是作用于所有的 Broker load的任務的。

    • 超時時間(timeout)

      在創(chuàng)建導入的時候自定義當前導入任務的 timeout 時間

      當前導入任務單個 BE 處理的數據量 / 用戶 Doris 集群最慢導入速度(MB/s) >= 當前導入任務的 timeout 時間 >= 當前導入任務單個 BE 處理的數據量 / 10M/s
      
      比如一個 100G 的文件,集群的 BE 個數為 10個
      timeout >= 1000s = 10G / 10M/s
      
    • 默認的導入最大超時時間

      當用戶發(fā)現(xiàn)第二步計算出的 timeout 時間超過系統(tǒng)默認的導入最大超時時間 4小時

      這時候不推薦用戶將導入最大超時時間直接改大來解決問題。單個導入時間如果超過默認的導入最大超時時間4小時,最好是通過切分待導入文件并且分多次導入來解決問題。主要原因是:單次導入超過4小時的話,導入失敗后重試的時間成本很高。

      可以通過如下公式計算出 Doris 集群期望最大導入文件數據量:

      期望最大導入文件數據量 = 14400s * 10M/s * BE 個數
      比如:集群的 BE 個數為 10個
      期望最大導入文件數據量 = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
      
      注意:一般用戶的環(huán)境可能達不到 10M/s 的速度,所以建議超過 500G 的文件都進行文件切分,再導入。
      

5. 性能分析

可以在提交 LOAD 作業(yè)前,先執(zhí)行 set is_report_success=true 打開會話變量。然后提交導入作業(yè)。待導入作業(yè)完成后,可以在 FE 的 web 頁面的 Queris 標簽中查看到導入作業(yè)的 Profile。

這個 Profile 可以幫助分析導入作業(yè)的運行狀態(tài)。

當前只有作業(yè)成功執(zhí)行后,才能查看 Profile。

這個同樣適用去其他作業(yè),包括查詢

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容