Flink集成數(shù)據(jù)湖之實(shí)時(shí)數(shù)據(jù)寫入iceberg

背景

隨著大數(shù)據(jù)處理結(jié)果的實(shí)時(shí)性要求越來越高,越來越多的大數(shù)據(jù)處理從離線轉(zhuǎn)到了實(shí)時(shí),其中以flink為主的實(shí)時(shí)計(jì)算在大數(shù)據(jù)處理中占有重要地位。

Flink消費(fèi)kafka等實(shí)時(shí)數(shù)據(jù)流。然后實(shí)時(shí)寫入hive,在大數(shù)據(jù)處理方面有著廣泛的應(yīng)用。此外由于列式存儲(chǔ)格式如parquet或者orc在查詢性能方面有著顯著的提高,所以大家都會(huì)優(yōu)先選擇列式存儲(chǔ)作為我們的存儲(chǔ)格式。

傳統(tǒng)的這種架構(gòu)看似不錯(cuò),但是還是有很多沒有解決的問題:

  • 實(shí)時(shí)寫入造成大量小文件,需要單獨(dú)的程序來進(jìn)行合并
  • 實(shí)時(shí)的寫入,讀取,還有合并小文件在同時(shí)進(jìn)行,那么如何保證事務(wù),讀取數(shù)據(jù)的時(shí)候不會(huì)出現(xiàn)臟讀。
  • Hdfs的數(shù)據(jù)一般是一次寫入。多次讀寫,但是如果因?yàn)槌绦虺鲥e(cuò)導(dǎo)致數(shù)據(jù)錯(cuò)了,確實(shí)要修改某一條數(shù)據(jù)改怎么辦
  • 消費(fèi)kafka的數(shù)據(jù)落地到hive,有一天kafka的數(shù)據(jù)多了幾個(gè)字段,如何同步到hive?必須刪了重建嗎?
  • 訂單等業(yè)務(wù)數(shù)據(jù)一般存儲(chǔ)在傳統(tǒng)數(shù)據(jù)庫,如mysql等。如何實(shí)時(shí)同步這些cdc數(shù)據(jù)到hive倉庫呢,包括ddl和dml

如果你有上面的需求,那么你可以考慮一下數(shù)據(jù)湖了,目前開源的數(shù)據(jù)湖技術(shù)主要有以下幾個(gè):delta、hudi、iceberg,但是側(cè)重點(diǎn)有所不同,我上面說的問題也不完全都能實(shí)現(xiàn),但是這些都是數(shù)據(jù)湖要做的東西,隨著社區(qū)的不斷發(fā)展,這些功能都會(huì)有的。

但是目前世面上這些數(shù)據(jù)湖技術(shù)都與spark緊密綁定。而我們目前實(shí)時(shí)計(jì)算主要以flink為主,而且我個(gè)人覺得未來實(shí)時(shí)計(jì)算也將以flink為主,所以我選擇了iceberg為我們的數(shù)據(jù)湖,雖然他有一些功能不是很完善,但是有著良好的抽象,并且不強(qiáng)制綁定spark,所以對(duì)于iceberg沒有的功能,我們可以自己給補(bǔ)全,再回饋給社區(qū),一起成長。

iceberg簡介

其實(shí)對(duì)于iceberg,官方的定義是一種表格式。

Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to Presto and Spark that use a high-performance format that works just like a SQL table.

我們可以簡單理解為他是基于計(jì)算層(flink , spark)和存儲(chǔ)層(orc,parqurt)的一個(gè)中間層,我們?cè)趆ive建立一個(gè)iceberg格式的表。用flink或者spark寫入iceberg,然后再通過其他方式來讀取這個(gè)表,比如spark,flink,presto等。

當(dāng)然數(shù)據(jù)湖的概念遠(yuǎn)不止這點(diǎn),我們今天就先簡單的這么理解,后續(xù)寫一篇文章專門介紹一下iceberg。

flink實(shí)時(shí)寫入

準(zhǔn)備sql client環(huán)境

目前官方的測試版本是基于scala 2.12版本的flink。所以我們也用和官方同步的版本來測試下,下載下面的兩個(gè)jar放到flink的lib下面,然后啟動(dòng)一下flink集群,standalone模式。

  • 下載flink :flink-1.11.2-bin-scala_2.12.tgz

  • 下載 iceberg-flink-runtime-xxx.jar

  • 下載flink 集成hive的connector,flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar

  • 目前官方的hive測試版本是 2.3.7,其他的版本可能有不兼容

注意要配置flink的checkpoint,因?yàn)槟壳癴link提交iceberg的信息是在每次checkpoint的時(shí)候提交的。在sql client配置checkpoint的方法如下:

在flink-conf.yaml添加如下配置

execution.checkpointing.interval: 10s   # checkpoint間隔時(shí)間
execution.checkpointing.tolerable-failed-checkpoints: 10  # checkpoint 失敗容忍次數(shù)

創(chuàng)建catalog

目前系統(tǒng)提供的catalog有hivecatalog和hadoopcatalog以及自定義catlog

CREATE CATALOG iceberg WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

執(zhí)行完之后,顯示如下:

Flink SQL> show catalogs;
default_catalog
iceberg

如果不想每次啟動(dòng)sql client都重新執(zhí)行ddl,可以在sql-client-defaults.yaml 里面皮遏制一下:

catalogs: # empty list
# A typical catalog definition looks like:
  - name: hive
    type: hive
    hive-conf-dir: /Users/user/work/hive/conf
    default-database: default
  - name: iceberg
    type: iceberg
    warehouse: hdfs://localhost/user/hive2/warehouse
    uri: thrift://localhost:9083
    catalog-type: hive

創(chuàng)建db

use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;

創(chuàng)建table

CREATE TABLE iceberg.iceberg_db.iceberg_001 (
    id BIGINT COMMENT 'unique id',
    data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');

插入數(shù)據(jù)

我們依然創(chuàng)建一個(gè)datagen的connector。

CREATE TABLE sourceTable (
 userid int,
 f_random_str STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='100',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100',
'fields.f_random_str.length'='10'
)

這時(shí)候我們看到有兩個(gè)表了


Flink SQL> show tables;
iceberg_001
sourcetable

然后執(zhí)行insert into插入數(shù)據(jù):

insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable

查詢

我們這里使用presto來查詢

presto的配置iceberg.properties 如下:

connector.name=iceberg
hive.metastore.uri=thrift://localhost:9083
在這里插入圖片描述

代碼版本

public class Flink2Iceberg{

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(10000);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        tenv.executeSql("CREATE CATALOG iceberg WITH (\n" +
                        "  'type'='iceberg',\n" +
                        "  'catalog-type'='hive'," +
                        "  'hive-conf-dir'='/Users/user/work/hive/conf/'" +
                        ")");

        tenv.useCatalog("iceberg");
        tenv.executeSql("CREATE DATABASE iceberg_db");
        tenv.useDatabase("iceberg_db");

        tenv.executeSql("CREATE TABLE sourceTable (\n" +
                        " userid int,\n" +
                        " f_random_str STRING\n" +
                        ") WITH (\n" +
                        " 'connector' = 'datagen',\n" +
                        " 'rows-per-second'='100',\n" +
                        " 'fields.userid.kind'='random',\n" +
                        " 'fields.userid.min'='1',\n" +
                        " 'fields.userid.max'='100',\n" +
                        "'fields.f_random_str.length'='10'\n" +
                        ")");

        tenv.executeSql(
                "insert into iceberg.iceberg_db.iceberg_001 select * from iceberg.iceberg_db.sourceTable");
    }
}

具體見:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/iceberg/src/main/java/com/Flink2Iceberg.java

總結(jié)

總結(jié)一下,iceberg的資料比較少,很多設(shè)計(jì)或者討論等需要關(guān)注issues,然后再去擼源碼,可能對(duì)于剛?cè)腴T的小伙伴來說有點(diǎn)困難。后續(xù)我也會(huì)多分享一些關(guān)于iceberg的文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容