1.概覽
多源數(shù)據(jù)目錄(Multi-Catalog)功能,旨在能夠更方便對(duì)接外部數(shù)據(jù)目錄,以增強(qiáng)Doris的數(shù)據(jù)湖分析和聯(lián)邦數(shù)據(jù)查詢能力。
在之前的 Doris 版本中,用戶數(shù)據(jù)只有兩個(gè)層級(jí):Database 和 Table。當(dāng)我們需要連接一個(gè)外部數(shù)據(jù)目錄時(shí),我們只能在Database 或 Table 層級(jí)進(jìn)行對(duì)接。比如通過(guò) create external table 的方式創(chuàng)建一個(gè)外部數(shù)據(jù)目錄中的表的映射,或通過(guò) create external database 的方式映射一個(gè)外部數(shù)據(jù)目錄中的 Database。如果外部數(shù)據(jù)目錄中的 Database 或 Table 非常多,則需要用戶手動(dòng)進(jìn)行一一映射,使用體驗(yàn)不佳。
而新的 Multi-Catalog 功能在原有的元數(shù)據(jù)層級(jí)上,新增一層Catalog,構(gòu)成 Catalog -> Database -> Table 的三層元數(shù)據(jù)層級(jí)。其中,Catalog 可以直接對(duì)應(yīng)到外部數(shù)據(jù)目錄。目前支持的外部數(shù)據(jù)目錄包括:
- Apache Hive
- Apache Iceberg
- Apache Hudi
- Elasticsearch
- JDBC: 對(duì)接數(shù)據(jù)庫(kù)訪問(wèn)的標(biāo)準(zhǔn)接口(JDBC)來(lái)訪問(wèn)各式數(shù)據(jù)庫(kù)的數(shù)據(jù)。
- Apache Paimon(Incubating)
該功能將作為之前外表連接方式(External Table)的補(bǔ)充和增強(qiáng),幫助用戶進(jìn)行快速的多數(shù)據(jù)目錄聯(lián)邦查詢。
這篇教程將展示如何使用 Flink + Hudi + Doris 構(gòu)建實(shí)時(shí)湖倉(cāng)一體的聯(lián)邦查詢分析,Doris 2.0.3 版本提供了 的支持,本文主要展示 Doris 和 Hudi 怎么使用,同時(shí)本教程整個(gè)環(huán)境是都基于偽分布式環(huán)境搭建,大家按照步驟可以一步步完成。完整體驗(yàn)整個(gè)搭建操作的過(guò)程。
2. 環(huán)境
本教程的演示環(huán)境如下:
- Centos7
- Apache doris 2.0.2
- Hadoop 3.3.3
- hive 3.1.3
- Fink 1.17.1
- Apache hudi 0.14
- JDK 1.8.0_311
3. 安裝
- 下載 Flink 1.17.1
wget https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz解壓安裝
tar zxf flink-1.17.1-bin-scala_2.12.tgz - 下載 Flink 和 Hudi 相關(guān)的依賴
wget https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.12/1.17.1/flink-table-planner_2.12-1.17.1.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hive-sync-bundle/0.14.0/hudi-hive-sync-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.17-bundle/0.14.0/hudi-flink1.17-bundle-0.14.0.jar
wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-hadoop-mr-bundle/0.14.0/hudi-hadoop-mr-bundle-0.14.0.jar
將上面這些依賴下載到 flink-1.17.1/lib 目錄,然后將之前的 flink-table-planner-loader-1.17.1.jar 刪除或者移除。
3. 創(chuàng)建 Hudi 表并寫(xiě)入數(shù)據(jù)
3.1 啟動(dòng) Flink
bin/start-cluster.sh
啟動(dòng) Flink client
./bin/sql-client.sh embedded shell
設(shè)置返回結(jié)果模式為tableau,讓結(jié)果直接顯示
set sql-client.execution.result-mode=tableau;
3.2 啟動(dòng) Hive MetaStore 和 HiveServer
nohup ./bin/hive --service hiveserver2 >/dev/null 2>&1 &
nohup ./bin/hive --service metastore >/dev/null 2>&1 &
3.3 創(chuàng)建 Hudi 表
我們來(lái)創(chuàng)建 Hudi 表,我們這里使用 Hive MetaStore Service 來(lái)保存 Hudi 的元數(shù)據(jù)。
CREATE TABLE table1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://localhost:9000/user/hive/warehouse/demo.db',
'table.type'='COPY_ON_WRITE',
'hive_sync.enable'='true',
'hive_sync.table'='hudi_hive',
'hive_sync.db'='demo',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://192.168.31.54:9083'
);
- 'table.type'='COPY_ON_WRITE', -- MERGE_ON_READ方式在沒(méi)生成 parquet 文件前,hive不會(huì)有輸出
- 'hive_sync.enable'='true', -- required,開(kāi)啟hive同步功能
- 'hive_sync.table'='${hive_table}', -- required, hive 新建的表名
- 'hive_sync.db'='${hive_db}', -- required, hive 新建的數(shù)據(jù)庫(kù)名
- 'hive_sync.mode' = 'hms', -- required, 將hive sync mode設(shè)置為hms, 默認(rèn)jdbc
- 'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
寫(xiě)入數(shù)據(jù):
INSERT INTO table1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

通過(guò) Flink 查詢 Hudi 表的數(shù)據(jù)
SELECT * FROM TABLE1

我們可以查看 HDFS 上這個(gè)數(shù)據(jù)文件已經(jīng)存在,在 hive client 下也可以看到這表
hive> use demo;
OK
Time taken: 0.027 seconds
hive> show tables;
OK
hudi_hive

4. Doris On Hudi
Doris 操作訪問(wèn) Hudi 的數(shù)據(jù)很簡(jiǎn)單,我們只需要?jiǎng)?chuàng)建一個(gè) catalog 就可以,不需要再想之前一樣寫(xiě)一個(gè)完整的建表語(yǔ)句,同時(shí)當(dāng) Hudi 數(shù)據(jù)源中增刪表或者增刪字段,Doris 這邊可以通過(guò)配置自動(dòng)刷新或者手動(dòng)刷新Catalog 自動(dòng)感知。
下面我們?cè)贒oris 下創(chuàng)建一個(gè) Catalog 來(lái)訪問(wèn) Hudi 外部表的數(shù)據(jù)
CREATE CATALOG hudi PROPERTIES (
'type'='hms',
'hive.metastore.uris' = 'thrift://192.168.31.54:9083'
);
這里我們上面Hudi的元數(shù)據(jù)是使用HMS存儲(chǔ)的,我們創(chuàng)建的時(shí)候只需要指定上面兩個(gè)信息即可,如果你的HDFS是高可用的,你需要添加NameNode HA的信息:
'hadoop.username' = 'hive',
'dfs.nameservices'='your-nameservice',
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
具體參照Doris 官網(wǎng)文檔
創(chuàng)建成功之后我們可以通過(guò)下面的紅框標(biāo)識(shí)出來(lái)的步驟去看到 Hudi 的表。

執(zhí)行查詢 Hudi 表:

將 Hudi 表里的數(shù)據(jù)遷移到 Doris
這里我們先創(chuàng)建好 Doris的表,建表語(yǔ)句如下:
CREATE TABLE doris_hudi(
uuid VARCHAR(20) ,
name VARCHAR(10),
age INT,
ts datetime(3),
`partition` VARCHAR(20)
)
UNIQUE KEY(`uuid`)
DISTRIBUTED BY HASH(`uuid`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
通過(guò) Insert Select 語(yǔ)句將 Hudi 數(shù)據(jù)遷移到 Doris :
insert into doris_hudi select uuid,name,age,ts,partition from hudi.demo.hudi_hive;
查詢 Doris 表
mysql> select * from doris_hudi;
+------+---------+------+-------------------------+-----------+
| uuid | name | age | ts | partition |
+------+---------+------+-------------------------+-----------+
| id1 | Danny | 23 | 1970-01-01 08:00:01.000 | par1 |
| id2 | Stephen | 33 | 1970-01-01 08:00:02.000 | par1 |
| id3 | Julian | 53 | 1970-01-01 08:00:03.000 | par2 |
| id4 | Fabian | 31 | 1970-01-01 08:00:04.000 | par2 |
| id5 | Sophia | 18 | 1970-01-01 08:00:05.000 | par3 |
| id6 | Emma | 20 | 1970-01-01 08:00:06.000 | par3 |
| id7 | Bob | 44 | 1970-01-01 08:00:07.000 | par4 |
| id8 | Han | 56 | 1970-01-01 08:00:08.000 | par4 |
+------+---------+------+-------------------------+-----------+
8 rows in set (0.02 sec)
我們那還可以通過(guò) CATS方式將 hudi數(shù)據(jù)遷移到Doris,Doris 自動(dòng)完成建表
create table doris_hudi_01
PROPERTIES("replication_num" = "1") as
select uuid,name,age,ts,`partition` from hudi.demo.hudi_hive;

5. 總結(jié)
是不是使用非常簡(jiǎn)單,快快體驗(yàn)Doris 湖倉(cāng)一體,聯(lián)邦查詢的能力,來(lái)加速你的數(shù)據(jù)分析性能