Apache Doris 整合 FLINK 、 Hudi 構(gòu)建湖倉(cāng)一體的聯(lián)邦查詢?nèi)腴T

1.概覽

多源數(shù)據(jù)目錄(Multi-Catalog)功能,旨在能夠更方便對(duì)接外部數(shù)據(jù)目錄,以增強(qiáng)Doris的數(shù)據(jù)湖分析和聯(lián)邦數(shù)據(jù)查詢能力。

在之前的 Doris 版本中,用戶數(shù)據(jù)只有兩個(gè)層級(jí):Database 和 Table。當(dāng)我們需要連接一個(gè)外部數(shù)據(jù)目錄時(shí),我們只能在Database 或 Table 層級(jí)進(jìn)行對(duì)接。比如通過(guò) create external table 的方式創(chuàng)建一個(gè)外部數(shù)據(jù)目錄中的表的映射,或通過(guò) create external database 的方式映射一個(gè)外部數(shù)據(jù)目錄中的 Database。如果外部數(shù)據(jù)目錄中的 Database 或 Table 非常多,則需要用戶手動(dòng)進(jìn)行一一映射,使用體驗(yàn)不佳。

而新的 Multi-Catalog 功能在原有的元數(shù)據(jù)層級(jí)上,新增一層Catalog,構(gòu)成 Catalog -> Database -> Table 的三層元數(shù)據(jù)層級(jí)。其中,Catalog 可以直接對(duì)應(yīng)到外部數(shù)據(jù)目錄。目前支持的外部數(shù)據(jù)目錄包括:

  1. Apache Hive
  2. Apache Iceberg
  3. Apache Hudi
  4. Elasticsearch
  5. JDBC: 對(duì)接數(shù)據(jù)庫(kù)訪問(wèn)的標(biāo)準(zhǔn)接口(JDBC)來(lái)訪問(wèn)各式數(shù)據(jù)庫(kù)的數(shù)據(jù)。
  6. Apache Paimon(Incubating)

該功能將作為之前外表連接方式(External Table)的補(bǔ)充和增強(qiáng),幫助用戶進(jìn)行快速的多數(shù)據(jù)目錄聯(lián)邦查詢。

這篇教程將展示如何使用 Flink + Hudi + Doris 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 Hudi 怎么使用,同時(shí)本教程整個(gè)環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗(yàn)整個(gè)搭建操作的過(guò)程。

2. 環(huán)境

本教程的演示環(huán)境如下:

  1. Centos7
  2. Apache doris 2.0.2
  3. Hadoop 3.3.3
  4. hive 3.1.3
  5. Fink 1.17.1
  6. Apache hudi 0.14
  7. JDK 1.8.0_311

3. 安裝

  1. 下載 Flink 1.17.1
    wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

    解壓安裝

    tar zxf flink-1.17.1-bin-scala_2.12.tgz
  2. 下載 Flink 和 Hudi 相關(guān)的依賴
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/0.14.0/hudi-hive-sync-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.14.0/hudi-flink1.17-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.14.0/hudi-hadoop-mr-bundle-0.14.0.jar

將上面這些依賴下載到 flink-1.17.1/lib 目錄,然后將之前的 flink-table-planner-loader-1.17.1.jar 刪除或者移除。

3. 創(chuàng)建 Hudi 表并寫(xiě)入數(shù)據(jù)

3.1 啟動(dòng) Flink

bin/start-cluster.sh

啟動(dòng) Flink client

./bin/sql-client.sh embedded shell

設(shè)置返回結(jié)果模式為tableau,讓結(jié)果直接顯示

set sql-client.execution.result-mode=tableau;

3.2 啟動(dòng) Hive MetaStore 和 HiveServer

nohup ./bin/hive --service hiveserver2 >/dev/null 2>&1  &
nohup ./bin/hive --service metastore >/dev/null 2>&1  &

3.3 創(chuàng)建 Hudi 表

我們來(lái)創(chuàng)建 Hudi 表,我們這里使用 Hive MetaStore Service 來(lái)保存 Hudi 的元數(shù)據(jù)。

CREATE TABLE table1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
  'connector'='hudi',
  'path' = 'hdfs://localhost:9000/user/hive/warehouse/demo.db',
  'table.type'='COPY_ON_WRITE',       
  'hive_sync.enable'='true',           
  'hive_sync.table'='hudi_hive',        
  'hive_sync.db'='demo',            
  'hive_sync.mode' = 'hms',         
  'hive_sync.metastore.uris' = 'thrift://192.168.31.54:9083' 
);

  1. 'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在沒(méi)生成 parquet 文件前,hive不會(huì)有輸出
  2. 'hive_sync.enable'='true', -- required,開(kāi)啟hive同步功能
  3. 'hive_sync.table'='${hive_table}', -- required, hive 新建的表名
  4. 'hive_sync.db'='${hive_db}', -- required, hive 新建的數(shù)據(jù)庫(kù)名
  5. 'hive_sync.mode' = 'hms', -- required, 將hive sync mode設(shè)置為hms, 默認(rèn)jdbc
  6. 'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口

寫(xiě)入數(shù)據(jù):

INSERT INTO table1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
image.png

通過(guò) Flink 查詢 Hudi 表的數(shù)據(jù)

SELECT * FROM TABLE1
image.png

我們可以查看 HDFS 上這個(gè)數(shù)據(jù)文件已經(jīng)存在,在 hive client 下也可以看到這表

hive> use demo;
OK
Time taken: 0.027 seconds
hive> show tables;
OK
hudi_hive
image.png

4. Doris On Hudi

Doris 操作訪問(wèn) Hudi 的數(shù)據(jù)很簡(jiǎn)單,我們只需要?jiǎng)?chuàng)建一個(gè) catalog 就可以,不需要再想之前一樣寫(xiě)一個(gè)完整的建表語(yǔ)句,同時(shí)當(dāng) Hudi 數(shù)據(jù)源中增刪表或者增刪字段,Doris 這邊可以通過(guò)配置自動(dòng)刷新或者手動(dòng)刷新Catalog 自動(dòng)感知。

下面我們?cè)贒oris 下創(chuàng)建一個(gè) Catalog 來(lái)訪問(wèn) Hudi 外部表的數(shù)據(jù)

CREATE CATALOG hudi PROPERTIES (
    'type'='hms',
    'hive.metastore.uris' = 'thrift://192.168.31.54:9083'
);

這里我們上面Hudi的元數(shù)據(jù)是使用HMS存儲(chǔ)的,我們創(chuàng)建的時(shí)候只需要指定上面兩個(gè)信息即可,如果你的HDFS是高可用的,你需要添加NameNode HA的信息:

'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'

具體參照Doris 官網(wǎng)文檔

創(chuàng)建成功之后我們可以通過(guò)下面的紅框標(biāo)識(shí)出來(lái)的步驟去看到 Hudi 的表。


image.png

執(zhí)行查詢 Hudi 表:

image.png

將 Hudi 表里的數(shù)據(jù)遷移到 Doris

這里我們先創(chuàng)建好 Doris的表,建表語(yǔ)句如下:

CREATE TABLE doris_hudi(
  uuid VARCHAR(20) ,
  name VARCHAR(10),
  age INT,
  ts datetime(3),
  `partition` VARCHAR(20)
)
UNIQUE KEY(`uuid`)
DISTRIBUTED BY HASH(`uuid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);

通過(guò) Insert Select 語(yǔ)句將 Hudi 數(shù)據(jù)遷移到 Doris :

insert into doris_hudi select uuid,name,age,ts,partition from hudi.demo.hudi_hive;

查詢 Doris 表

mysql> select * from doris_hudi;
+------+---------+------+-------------------------+-----------+
| uuid | name    | age  | ts                      | partition |
+------+---------+------+-------------------------+-----------+
| id1  | Danny   |   23 | 1970-01-01 08:00:01.000 | par1      |
| id2  | Stephen |   33 | 1970-01-01 08:00:02.000 | par1      |
| id3  | Julian  |   53 | 1970-01-01 08:00:03.000 | par2      |
| id4  | Fabian  |   31 | 1970-01-01 08:00:04.000 | par2      |
| id5  | Sophia  |   18 | 1970-01-01 08:00:05.000 | par3      |
| id6  | Emma    |   20 | 1970-01-01 08:00:06.000 | par3      |
| id7  | Bob     |   44 | 1970-01-01 08:00:07.000 | par4      |
| id8  | Han     |   56 | 1970-01-01 08:00:08.000 | par4      |
+------+---------+------+-------------------------+-----------+
8 rows in set (0.02 sec)

我們那還可以通過(guò) CATS方式將 hudi數(shù)據(jù)遷移到Doris,Doris 自動(dòng)完成建表

create table doris_hudi_01
PROPERTIES("replication_num" = "1")  as  
select uuid,name,age,ts,`partition` from hudi.demo.hudi_hive;
image.png

5. 總結(jié)

是不是使用非常簡(jiǎn)單,快快體驗(yàn)Doris 湖倉(cāng)一體,聯(lián)邦查詢的能力,來(lái)加速你的數(shù)據(jù)分析性能

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容