Flink SQL FileSystem Connector分區(qū)提交與自定義小文件合并策略

本文已授權(quán)「Flink中文社區(qū)」微信公眾號發(fā)布并標(biāo)注原創(chuàng)。

Prologue

之前筆者在介紹Flink 1.11 Hive Streaming新特性時提到過,F(xiàn)link SQL的FileSystem Connector為了與Flink-Hive集成的大環(huán)境適配,做了很多改進,而其中最為明顯的就是分區(qū)提交(partition commit)機制。本文先通過源碼簡單過一下分區(qū)提交機制的兩個要素——即觸發(fā)(trigger)和策略(policy)的實現(xiàn),然后用合并小文件的實例說一下自定義分區(qū)提交策略的方法。

PartitionCommitTrigger

在最新的Flink SQL中,F(xiàn)ileSystem Connector原生支持?jǐn)?shù)據(jù)分區(qū),并且寫入時采用標(biāo)準(zhǔn)Hive分區(qū)格式,如下所示。

path
└── datetime=2019-08-25
    └── hour=11
        ├── part-0.parquet
        ├── part-1.parquet
    └── hour=12
        ├── part-0.parquet
└── datetime=2019-08-26
    └── hour=6
        ├── part-0.parquet

那么,已經(jīng)寫入的分區(qū)數(shù)據(jù)何時才能對下游可見呢?這就涉及到如何觸發(fā)分區(qū)提交的問題。根據(jù)官方文檔,觸發(fā)參數(shù)有以下兩個:

  • sink.partition-commit.trigger:可選process-time(根據(jù)處理時間觸發(fā))和partition-time(根據(jù)從事件時間中提取的分區(qū)時間觸發(fā))。
  • sink.partition-commit.delay:分區(qū)提交的時延。如果trigger是process-time,則以分區(qū)創(chuàng)建時的系統(tǒng)時間戳為準(zhǔn),經(jīng)過此時延后提交;如果trigger是partition-time,則以分區(qū)創(chuàng)建時本身攜帶的事件時間戳為準(zhǔn),當(dāng)水印時間戳經(jīng)過此時延后提交。

可見,process-time trigger無法應(yīng)對處理過程中出現(xiàn)的抖動,一旦數(shù)據(jù)遲到或者程序失敗重啟,數(shù)據(jù)就不能按照事件時間被歸入正確的分區(qū)了。所以在實際應(yīng)用中,我們幾乎總是選用partition-time trigger,并自己生成水印。當(dāng)然我們也需要通過partition.time-extractor.*一系列參數(shù)來指定抽取分區(qū)時間的規(guī)則(PartitionTimeExtractor),官方文檔說得很清楚,不再贅述。

在源碼中,PartitionCommitTrigger的類圖如下。

下面以分區(qū)時間觸發(fā)的PartitionTimeCommitTrigger為例,簡單看看它的思路。直接上該類的完整代碼。

public class PartitionTimeCommitTigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC =
            new ListStateDescriptor<>(
                    "pending-partitions",
                    new ListSerializer<>(StringSerializer.INSTANCE));

    private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC =
            new ListStateDescriptor<>(
                    "checkpoint-id-to-watermark",
                    new MapSerializer<>(LongSerializer.INSTANCE, LongSerializer.INSTANCE));

    private final ListState<List<String>> pendingPartitionsState;
    private final Set<String> pendingPartitions;

    private final ListState<Map<Long, Long>> watermarksState;
    private final TreeMap<Long, Long> watermarks;
    private final PartitionTimeExtractor extractor;
    private final long commitDelay;
    private final List<String> partitionKeys;

    public PartitionTimeCommitTigger(
            boolean isRestored,
            OperatorStateStore stateStore,
            Configuration conf,
            ClassLoader cl,
            List<String> partitionKeys) throws Exception {
        this.pendingPartitionsState = stateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        this.pendingPartitions = new HashSet<>();
        if (isRestored) {
            pendingPartitions.addAll(pendingPartitionsState.get().iterator().next());
        }

        this.partitionKeys = partitionKeys;
        this.commitDelay = conf.get(SINK_PARTITION_COMMIT_DELAY).toMillis();
        this.extractor = PartitionTimeExtractor.create(
                cl,
                conf.get(PARTITION_TIME_EXTRACTOR_KIND),
                conf.get(PARTITION_TIME_EXTRACTOR_CLASS),
                conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));

        this.watermarksState = stateStore.getListState(WATERMARKS_STATE_DESC);
        this.watermarks = new TreeMap<>();
        if (isRestored) {
            watermarks.putAll(watermarksState.get().iterator().next());
        }
    }

    @Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
            this.pendingPartitions.add(partition);
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        if (!watermarks.containsKey(checkpointId)) {
            throw new IllegalArgumentException(String.format(
                    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
                    checkpointId, watermarks));
        }

        long watermark = watermarks.get(checkpointId);
        watermarks.headMap(checkpointId, true).clear();

        List<String> needCommit = new ArrayList<>();
        Iterator<String> iter = pendingPartitions.iterator();
        while (iter.hasNext()) {
            String partition = iter.next();
            LocalDateTime partTime = extractor.extract(
                    partitionKeys, extractPartitionValues(new Path(partition)));
            if (watermark > toMills(partTime) + commitDelay) {
                needCommit.add(partition);
                iter.remove();
            }
        }
        return needCommit;
    }

    @Override
    public void snapshotState(long checkpointId, long watermark) throws Exception {
        pendingPartitionsState.clear();
        pendingPartitionsState.add(new ArrayList<>(pendingPartitions));

        watermarks.put(checkpointId, watermark);
        watermarksState.clear();
        watermarksState.add(new HashMap<>(watermarks));
    }

    @Override
    public List<String> endInput() {
        ArrayList<String> partitions = new ArrayList<>(pendingPartitions);
        pendingPartitions.clear();
        return partitions;
    }
}

注意到該類中維護了兩對必要的信息:

  • pendingPartitions/pendingPartitionsState:等待提交的分區(qū)以及對應(yīng)的狀態(tài);
  • watermarks/watermarksState:<檢查點ID, 水印時間戳>的映射關(guān)系(用TreeMap存儲以保證有序)以及對應(yīng)的狀態(tài)。

這也說明開啟檢查點是分區(qū)提交機制的前提。snapshotState()方法用于將這些信息保存到狀態(tài)中。這樣在程序failover時,也能夠保證分區(qū)數(shù)據(jù)的完整和正確。

那么PartitionTimeCommitTigger是如何知道該提交哪些分區(qū)的呢?來看committablePartitions()方法:

  1. 檢查checkpoint ID是否合法;
  2. 取出當(dāng)前checkpoint ID對應(yīng)的水印,并調(diào)用TreeMap的headMap()和clear()方法刪掉早于當(dāng)前checkpoint ID的水印數(shù)據(jù)(沒用了);
  3. 遍歷等待提交的分區(qū),調(diào)用之前定義的PartitionTimeExtractor(比如${year}-${month}-${day} ${hour}:00:00)抽取分區(qū)時間。如果水印時間已經(jīng)超過了分區(qū)時間加上上述sink.partition-commit.delay參數(shù),說明可以提交,并返回它們。

PartitionCommitTrigger的邏輯會在負(fù)責(zé)真正提交分區(qū)的StreamingFileCommitter組件中用到(注意StreamingFileCommitter的并行度固定為1,之前有人問過這件事)。StreamingFileCommitter和StreamingFileWriter(即SQL版StreamingFileSink)的細(xì)節(jié)相對比較復(fù)雜,本文不表,之后會詳細(xì)說明。

PartitionCommitPolicy

PartitionCommitTrigger解決了分區(qū)何時對下游可見的問題,而PartitionCommitPolicy解決的是對下游可見的標(biāo)志問題。根據(jù)官方文檔,我們可以通過sink.partition-commit.policy.kind參數(shù)進行配置,一共有三種提交策略(可以組合使用):

  • metastore:向Hive Metastore更新分區(qū)信息(僅在使用HiveCatalog時有效);
  • success-file:向分區(qū)目錄下寫一個表示成功的文件,文件名可以通過sink.partition-commit.success-file.name參數(shù)自定義,默認(rèn)為_SUCCESS;
  • custom:自定義的提交策略,需要通過sink.partition-commit.policy.class參數(shù)來指定策略的類名。

PartitionCommitPolicy的內(nèi)部實現(xiàn)就簡單多了,類圖如下。策略的具體邏輯通過覆寫commit()方法實現(xiàn)。

兩個默認(rèn)實現(xiàn)MetastoreCommitPolicy和SuccessFileCommitPolicy如下,都非常容易理解。

public class MetastoreCommitPolicy implements PartitionCommitPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(MetastoreCommitPolicy.class);

    private TableMetaStore metaStore;

    public void setMetastore(TableMetaStore metaStore) {
        this.metaStore = metaStore;
    }

    @Override
    public void commit(Context context) throws Exception {
        LinkedHashMap<String, String> partitionSpec = context.partitionSpec();
        metaStore.createOrAlterPartition(partitionSpec, context.partitionPath());
        LOG.info("Committed partition {} to metastore", partitionSpec);
    }
}
public class SuccessFileCommitPolicy implements PartitionCommitPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(SuccessFileCommitPolicy.class);

    private final String fileName;
    private final FileSystem fileSystem;

    public SuccessFileCommitPolicy(String fileName, FileSystem fileSystem) {
        this.fileName = fileName;
        this.fileSystem = fileSystem;
    }

    @Override
    public void commit(Context context) throws Exception {
        fileSystem.create(
                new Path(context.partitionPath(), fileName),
                FileSystem.WriteMode.OVERWRITE).close();
        LOG.info("Committed partition {} with success file", context.partitionSpec());
    }
}

Customize PartitionCommitPolicy

還記得之前做過的Hive Streaming實驗么?

由上圖可見,在寫入比較頻繁或者并行度比較大時,每個分區(qū)內(nèi)都會出現(xiàn)很多細(xì)碎的小文件,這是我們不樂意看到的。下面嘗試自定義PartitionCommitPolicy,實現(xiàn)在分區(qū)提交時將它們順便合并在一起(存儲格式為Parquet)。

Parquet格式與普通的TextFile等行存儲格式不同,它是自描述(自帶schema和metadata)的列存儲,數(shù)據(jù)結(jié)構(gòu)按照Google Dremel的標(biāo)準(zhǔn)格式來組織,與Protobuf相同。所以,我們應(yīng)該先檢測寫入文件的schema,再按照schema分別讀取它們,并拼合在一起。

下面貼出合并分區(qū)內(nèi)所有小文件的完整策略ParquetFileMergingCommitPolicy。為了保證依賴不沖突,Parquet相關(guān)的組件全部采用Flink shade過的版本。竊以為代碼寫得還算工整易懂,所以偷懶不寫注釋了。

package me.lmagics.flinkexp.hiveintegration.util;

import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
  private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

  @Override
  public void commit(Context context) throws Exception {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    String partitionPath = context.partitionPath().getPath();

    List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
    LOGGER.info("{} files in path {}", files.size(), partitionPath);

    MessageType schema = getParquetSchema(files, conf);
    if (schema == null) {
      return;
    }
    LOGGER.info("Fetched parquet schema: {}", schema.toString());

    Path result = merge(partitionPath, schema, files, fs);
    LOGGER.info("Files merged into {}", result.toString());
  }

  private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
    List<Path> result = new ArrayList<>();

    RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
    while (dirIterator.hasNext()) {
      LocatedFileStatus fileStatus = dirIterator.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
        result.add(filePath);
      }
    }

    return result;
  }

  private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
    if (files.size() == 0) {
      return null;
    }

    HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
    ParquetFileReader reader = ParquetFileReader.open(inputFile);
    ParquetMetadata metadata = reader.getFooter();
    MessageType schema = metadata.getFileMetaData().getSchema();

    reader.close();
    return schema;
  }

  private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
    Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
    ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
      .withType(schema)
      .withConf(fs.getConf())
      .withWriteMode(Mode.CREATE)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build();

    for (Path file : files) {
      ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
        .withConf(fs.getConf())
        .build();
      Group data;
      while((data = reader.read()) != null) {
        writer.write(data);
      }
      reader.close();
    }
    writer.close();

    for (Path file : files) {
      fs.delete(file, false);
    }

    return mergeDest;
  }
}

別忘了修改分區(qū)提交策略相關(guān)的參數(shù):

'sink.partition-commit.policy.kind' = 'metastore,success-file,custom', 
'sink.partition-commit.policy.class' = 'me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy'

重新跑一遍之前的Hive Streaming程序,觀察日志輸出:

20-08-04 22:15:00 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - 14 files in path /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13

// 如果看官熟悉Protobuf的話,可以發(fā)現(xiàn)這里的schema風(fēng)格是完全一致的
20-08-04 22:15:00 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - Fetched parquet schema: 
message hive_schema {
  optional int64 ts;
  optional int64 user_id;
  optional binary event_type (UTF8);
  optional binary from_type (UTF8);
  optional binary column_type (UTF8);
  optional int64 site_id;
  optional int64 groupon_id;
  optional int64 partner_id;
  optional int64 merchandise_id;
}

20-08-04 22:15:04 INFO  me.lmagics.flinkexp.hiveintegration.util.ParquetFileMergingCommitPolicy       - Files merged into /user/hive/warehouse/hive_tmp.db/analytics_access_log_hive/ts_date=2020-08-04/ts_hour=22/ts_minute=13/result-1596550500950.parquet

最后來驗證一下,合并成功。

The End

民那晚安。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容