1、首先在集群上安裝flink服務(wù)
2、安裝完成,我們需要使用將指定的連接的jar,放在flink的lib下
本次使用hive的版本是3.1.0 ,kafka的版本2.0.0,集群版本是2.7.3
根據(jù)官網(wǎng)的提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/
找不到包可以在阿里云下載:https://maven.aliyun.com/mvn/search


連接hive的yaml配置

sql-client-hive.yaml

3、配置完成,重啟hive,以及flink
切換到flink的根目錄(bin的上一級(jí)目錄)
cd /usr/hdp/xxxx/flink
./bin/start-cluster.sh
#-s flink 指定yarn session的名稱(chēng),可不加
./bin/sql-client.sh embedded -e ./conf/sql-client-hive.yaml?

啟動(dòng)成功如下

show catalogs;
use catalog myhive;
show tables;


create table students(id Int,name String) WITH ('is_generic'='false');
insert into students(id,name) values(1,'lilei');
insert into students(id,name) values(2,'tom');
insert into students(id,name) values(3,'liming');
select * from students;
3、連接kafka和hive
根據(jù)官網(wǎng)提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
kafka的版本2.0.0,所以選擇:universal,將這3個(gè)jar包放到flink的lib下
先將kafka安裝好,并且調(diào)通,生產(chǎn)和消費(fèi)數(shù)據(jù)都沒(méi)問(wèn)題

還是啟動(dòng)./bin/sql-client.sh embedded -d ./conf/sql-client-hive.yaml

創(chuàng)建連接kafka的表
CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'flink_in', 'connector.properties.zookeeper.connect' = 'ip:2181', 'connector.properties.bootstrap.servers' = 'ip:9092', 'format.type' = 'csv', 'update-mode' = 'append');
創(chuàng)建kafka消費(fèi)

select * from?mykafka;

參考鏈接:https://cloud.tencent.com/developer/article/1616330