Flink 1.10 sql-client連接hive、kafka

1、首先在集群上安裝flink服務(wù)

2、安裝完成,我們需要使用將指定的連接的jar,放在flink的lib下

本次使用hive的版本是3.1.0 ,kafka的版本2.0.0,集群版本是2.7.3

根據(jù)官網(wǎng)的提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/

找不到包可以在阿里云下載:https://maven.aliyun.com/mvn/search

連接hive的yaml配置


sql-client-hive.yaml

sql-client-hive.yaml


3、配置完成,重啟hive,以及flink

切換到flink的根目錄(bin的上一級(jí)目錄)

cd /usr/hdp/xxxx/flink

./bin/start-cluster.sh

#-s flink 指定yarn session的名稱(chēng),可不加

./bin/sql-client.sh embedded -e ./conf/sql-client-hive.yaml?

啟動(dòng)成功如下

show catalogs;

use catalog myhive;

show tables;


create table students(id Int,name String) WITH ('is_generic'='false');

insert into students(id,name) values(1,'lilei');

insert into students(id,name) values(2,'tom');

insert into students(id,name) values(3,'liming');

select * from students;


3、連接kafka和hive

根據(jù)官網(wǎng)提示:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

kafka的版本2.0.0,所以選擇:universal,將這3個(gè)jar包放到flink的lib下

先將kafka安裝好,并且調(diào)通,生產(chǎn)和消費(fèi)數(shù)據(jù)都沒(méi)問(wèn)題

還是啟動(dòng)./bin/sql-client.sh embedded -d ./conf/sql-client-hive.yaml


創(chuàng)建連接kafka的表

CREATE TABLE mykafka (name String, age Int) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'flink_in', 'connector.properties.zookeeper.connect' = 'ip:2181', 'connector.properties.bootstrap.servers' = 'ip:9092', 'format.type' = 'csv', 'update-mode' = 'append');

創(chuàng)建kafka消費(fèi)


select * from?mykafka;

參考鏈接:https://cloud.tencent.com/developer/article/1616330

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容