Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫(kù)、表、分區(qū)、視圖以及數(shù)據(jù)庫(kù)或其他外部系統(tǒng)中存儲(chǔ)的函數(shù)和信息。
數(shù)據(jù)處理最關(guān)鍵的方面之一是管理元數(shù)據(jù)。 元數(shù)據(jù)可以是臨時(shí)的,例如臨時(shí)表、或者通過(guò) TableEnvironment 注冊(cè)的 UDF。 元數(shù)據(jù)也可以是持久化的,例如 Hive Metastore 中的元數(shù)據(jù)。Catalog 提供了一個(gè)統(tǒng)一的API,用于管理元數(shù)據(jù),并使其可以從 Table API 和 SQL 查詢語(yǔ)句中來(lái)訪問(wèn)。
前面用到Connector其實(shí)就是在使用Catalog
Catalog類型
GenericInMemoryCatalog
GenericInMemoryCatalog 是基于內(nèi)存實(shí)現(xiàn)的 Catalog,所有元數(shù)據(jù)只在 session 的生命周期內(nèi)可用。JdbcCatalog
JdbcCatalog 使得用戶可以將 Flink 通過(guò) JDBC 協(xié)議連接到關(guān)系數(shù)據(jù)庫(kù)。PostgresCatalog 是當(dāng)前實(shí)現(xiàn)的唯一一種 JDBC Catalog。HiveCatalog
HiveCatalog 有兩個(gè)用途:作為原生 Flink 元數(shù)據(jù)的持久化存儲(chǔ),以及作為讀寫(xiě)現(xiàn)有 Hive 元數(shù)據(jù)的接口。 Flink 的 Hive 文檔 提供了有關(guān)設(shè)置 HiveCatalog 以及訪問(wèn)現(xiàn)有 Hive 元數(shù)據(jù)的詳細(xì)信息。
HiveCatalog
- 導(dǎo)入相關(guān)的依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
啟動(dòng) Hadoop
copy hive-site.xml
啟動(dòng)元數(shù)據(jù)服務(wù)
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop162:9083</value>
</property>
在hive-site.xml中配置元數(shù)據(jù)服務(wù),所以需要啟動(dòng)該服務(wù),否則無(wú)法讀取數(shù)據(jù)。
hive service -metestore
元數(shù)據(jù)服務(wù)的作用:若沒(méi)有元數(shù)據(jù)服務(wù)器,所有的請(qǐng)求將直接訪問(wèn)到hive,這樣可能導(dǎo)致hive吃不消,有了元數(shù)據(jù)服務(wù)之后,請(qǐng)求將全部交由元數(shù)據(jù)服務(wù)器,元數(shù)據(jù)服務(wù)再去請(qǐng)求hive,起到一個(gè)緩沖的作用。
- 創(chuàng)建對(duì)應(yīng)的表
隨便創(chuàng)建一個(gè)庫(kù)
create database gmall;
隨便創(chuàng)建一張表
use gmall;
create table student(
id int,
name string,
age int,
sex string
);
往表里插入一些數(shù)據(jù)
insert into student(id, name, age, sex)
values
(1,'張三',18,'男'),
(2,'李四',56,'女'),
(3,'王五',34,'男'),
(4,'趙六',33,'女'),
(5,'孫七',78,'男');
- 連接hive
@Test
public void test1(){
// 環(huán)境準(zhǔn)備
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
/**
* 創(chuàng)建 hivecatalog
*
* my_hive catalog 名稱(隨意
* gmall 指定數(shù)據(jù)庫(kù)
* input/ 配置hive-site.xml 的目錄。
*/
HiveCatalog hiveCatalog = new HiveCatalog("my_hive", "gmall", "input/");
/**
* 注冊(cè)Catalog
* my_hive2 名稱隨意,通常和 new HiveCatalog 中的一致
*/
tableEnv.registerCatalog("my_hive2",hiveCatalog);
/**
* 指定數(shù)據(jù)庫(kù),查詢
* my_hive tableEnv.registerCatalog 中指定的
* gmall :庫(kù)名
* student:表名
*/
tableEnv.sqlQuery("select * from my_hive2.gmall.student").execute().print();
}
- 查詢結(jié)果
+----+-------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | sex |
+----+-------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | 張三 | 18 | 男 |
| +I | 2 | 李四 | 56 | 女 |
| +I | 3 | 王五 | 34 | 男 |
| +I | 4 | 趙六 | 33 | 女 |
| +I | 5 | 孫七 | 78 | 男 |
+----+-------------+--------------------------------+-------------+--------------------------------+
- 全局設(shè)置
select * from my_hive2.gmall.student
這樣寫(xiě)比較麻煩(如上),通常我們可以將其他提出去,設(shè)置成全局
tableEnv.useCatalog("my_hive2");
tableEnv.useDatabase("gmall");
/**
* 指定數(shù)據(jù)庫(kù),查詢
* my_hive tableEnv.registerCatalog 中指定的
* gmall :庫(kù)名
* student:表名
*/
tableEnv.sqlQuery("select * from student").execute().print();