背景
隨著大數(shù)據(jù)處理結(jié)果的實(shí)時(shí)性要求越來越高,越來越多的大數(shù)據(jù)處理從離線轉(zhuǎn)到了實(shí)時(shí),其中以flink為主的實(shí)時(shí)計(jì)算在大數(shù)據(jù)處理中占有重要地位。
Flink消費(fèi)kafka等實(shí)時(shí)數(shù)據(jù)流。然后實(shí)時(shí)寫入hive,在大數(shù)據(jù)處理方面有著廣泛的應(yīng)用。此外由于列式存儲(chǔ)格式如parquet或者orc在查詢性能方面有著顯著的提高,所以大家都會(huì)優(yōu)先選擇列式存儲(chǔ)作為我們的存儲(chǔ)格式。
傳統(tǒng)的這種架構(gòu)看似不錯(cuò),但是還是有很多沒有解決的問題:
- 實(shí)時(shí)寫入造成大量小文件,需要單獨(dú)的程序來進(jìn)行合并
- 實(shí)時(shí)的寫入,讀取,還有合并小文件在同時(shí)進(jìn)行,那么如何保證事務(wù),讀取數(shù)據(jù)的時(shí)候不會(huì)出現(xiàn)臟讀。
- Hdfs的數(shù)據(jù)一般是一次寫入。多次讀寫,但是如果因?yàn)槌绦虺鲥e(cuò)導(dǎo)致數(shù)據(jù)錯(cuò)了,確實(shí)要修改某一條數(shù)據(jù)改怎么辦
- 消費(fèi)kafka的數(shù)據(jù)落地到hive,有一天kafka的數(shù)據(jù)多了幾個(gè)字段,如何同步到hive?必須刪了重建嗎?
- 訂單等業(yè)務(wù)數(shù)據(jù)一般存儲(chǔ)在傳統(tǒng)數(shù)據(jù)庫,如mysql等。如何實(shí)時(shí)同步這些cdc數(shù)據(jù)到hive倉庫呢,包括ddl和dml
如果你有上面的需求,那么你可以考慮一下數(shù)據(jù)湖了,目前開源的數(shù)據(jù)湖技術(shù)主要有以下幾個(gè):delta、hudi、iceberg,但是側(cè)重點(diǎn)有所不同,我上面說的問題也不完全都能實(shí)現(xiàn),但是這些都是數(shù)據(jù)湖要做的東西,隨著社區(qū)的不斷發(fā)展,這些功能都會(huì)有的。
但是目前世面上這些數(shù)據(jù)湖技術(shù)都與spark緊密綁定。而我們目前實(shí)時(shí)計(jì)算主要以flink為主,而且我個(gè)人覺得未來實(shí)時(shí)計(jì)算也將以flink為主,所以我選擇了iceberg為我們的數(shù)據(jù)湖,雖然他有一些功能不是很完善,但是有著良好的抽象,并且不強(qiáng)制綁定spark,所以對(duì)于iceberg沒有的功能,我們可以自己給補(bǔ)全,再回饋給社區(qū),一起成長。
iceberg簡介
其實(shí)對(duì)于iceberg,官方的定義是一種表格式。
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.
我們可以簡單理解為他是基于計(jì)算層(flink , spark)和存儲(chǔ)層(orc,parqurt)的一個(gè)中間層,我們?cè)趆ive建立一個(gè)iceberg格式的表。用flink或者spark寫入iceberg,然后再通過其他方式來讀取這個(gè)表,比如spark,flink,presto等。
當(dāng)然數(shù)據(jù)湖的概念遠(yuǎn)不止這點(diǎn),我們今天就先簡單的這么理解,后續(xù)寫一篇文章專門介紹一下iceberg。
flink實(shí)時(shí)寫入
準(zhǔn)備sql client環(huán)境
目前官方的測試版本是基于scala 2.12版本的flink。所以我們也用和官方同步的版本來測試下,下載下面的兩個(gè)jar放到flink的lib下面,然后啟動(dòng)一下flink集群,standalone模式。
下載flink :flink-1.11.2-bin-scala_2.12.tgz
下載 iceberg-flink-runtime-xxx.jar
下載flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
目前官方的hive測試版本是 2.3.7,其他的版本可能有不兼容
注意要配置flink的checkpoint,因?yàn)槟壳癴link提交iceberg的信息是在每次checkpoint的時(shí)候提交的。在sql client配置checkpoint的方法如下:
在flink-conf.yaml添加如下配置
execution.checkpointing.interval: 10s # checkpoint間隔時(shí)間
execution.checkpointing.tolerable-failed-checkpoints: 10 # checkpoint 失敗容忍次數(shù)
創(chuàng)建catalog
目前系統(tǒng)提供的catalog有hivecatalog和hadoopcatalog以及自定義catlog
CREATE CATALOG iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://nn:8020/warehouse/path'
);
執(zhí)行完之后,顯示如下:
Flink SQL> show catalogs;
default_catalog
iceberg
如果不想每次啟動(dòng)sql client都重新執(zhí)行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:
catalogs: # empty list
# A typical catalog definition looks like:
- name: hive
type: hive
hive-conf-dir: /Users/user/work/hive/conf
default-database: default
- name: iceberg
type: iceberg
warehouse: hdfs://localhost/user/hive2/warehouse
uri: thrift://localhost:9083
catalog-type: hive
創(chuàng)建db
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;
創(chuàng)建table
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
id BIGINT COMMENT 'unique id',
data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');
插入數(shù)據(jù)
我們依然創(chuàng)建一個(gè)datagen的connector。
CREATE TABLE sourceTable (
userid int,
f_random_str STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='100',
'fields.userid.kind'='random',
'fields.userid.min'='1',
'fields.userid.max'='100',
'fields.f_random_str.length'='10'
)
這時(shí)候我們看到有兩個(gè)表了
Flink SQL> show tables;
iceberg_001
sourcetable
然后執(zhí)行insert into插入數(shù)據(jù):
insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable
查詢
我們這里使用presto來查詢
presto的配置iceberg.properties 如下:
connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
代碼版本
public class Flink2Iceberg{
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive'," +
" 'hive-conf-dir'='/Users/user/work/hive/conf/'" +
")");
tenv.useCatalog("iceberg");
tenv.executeSql("CREATE DATABASE iceberg_db");
tenv.useDatabase("iceberg_db");
tenv.executeSql("CREATE TABLE sourceTable (\n" +
" userid int,\n" +
" f_random_str STRING\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='100',\n" +
" 'fields.userid.kind'='random',\n" +
" 'fields.userid.min'='1',\n" +
" 'fields.userid.max'='100',\n" +
"'fields.f_random_str.length'='10'\n" +
")");
tenv.executeSql(
"insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");
}
}
具體見:
總結(jié)
總結(jié)一下,iceberg的資料比較少,很多設(shè)計(jì)或者討論等需要關(guān)注issues,然后再去擼源碼,可能對(duì)于剛?cè)腴T的小伙伴來說有點(diǎn)困難。后續(xù)我也會(huì)多分享一些關(guān)于iceberg的文章