Flink源碼閱讀之FileSystem Connector

代碼在flink-table-runtime-blink模塊,用戶指南參考官網(wǎng).

目前是舊的實現(xiàn)方式,將會按FLIP-95重新實現(xiàn)FLINK-19336

入口類FileSystemTableFactory,如何做Factory discover的可以參考之前的博文,這里就不贅述了。

Sink

構(gòu)造FileSystemTableSink對象,傳入相關(guān)屬性參數(shù)

public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
        Configuration conf = new Configuration();
        context.getTable().getOptions().forEach(conf::setString);

        return new FileSystemTableSink(
                context.getObjectIdentifier(),//connector標識符
                context.isBounded(),//是否有界流
                context.getTable().getSchema(),//表的schema
                getPath(conf),//file 路徑
                context.getTable().getPartitionKeys(),//分區(qū)key
                conf.get(PARTITION_DEFAULT_NAME),//默認分區(qū)名稱
                context.getTable().getOptions());//參數(shù)
    }

FileSystemTableSink會根據(jù)DataStream構(gòu)造DataStreamSink

consumeDataStream主要做幾個事情:

  1. 構(gòu)造RowDataPartitionComputer,將分區(qū)字段和非分區(qū)字段index和type分開。
  2. EmptyMetaStoreFactory空的metastore實現(xiàn)。
  3. UUID生成文件前綴
  4. 構(gòu)造FileSystemFactory的實現(xiàn)
  5. 根據(jù)是否有界流走不同分支處理
public final DataStreamSink<RowData> consumeDataStream(DataStream<RowData> dataStream) {
        RowDataPartitionComputer computer = new RowDataPartitionComputer(
                defaultPartName,
                schema.getFieldNames(),
                schema.getFieldDataTypes(),
                partitionKeys.toArray(new String[0]));

        EmptyMetaStoreFactory metaStoreFactory = new EmptyMetaStoreFactory(path);
        OutputFileConfig outputFileConfig = OutputFileConfig.builder()
                .withPartPrefix("part-" + UUID.randomUUID().toString())
                .build();
        FileSystemFactory fsFactory = FileSystem::get;

        if (isBounded) {
            FileSystemOutputFormat.Builder<RowData> builder = new FileSystemOutputFormat.Builder<>();
            builder.setPartitionComputer(computer);
            builder.setDynamicGrouped(dynamicGrouping);
            builder.setPartitionColumns(partitionKeys.toArray(new String[0]));
            builder.setFormatFactory(createOutputFormatFactory());
            builder.setMetaStoreFactory(metaStoreFactory);
            builder.setFileSystemFactory(fsFactory);
            builder.setOverwrite(overwrite);
            builder.setStaticPartitions(staticPartitions);
            builder.setTempPath(toStagingPath());
            builder.setOutputFileConfig(outputFileConfig);
            return dataStream.writeUsingOutputFormat(builder.build())
                    .setParallelism(dataStream.getParallelism());
        } else {
            Configuration conf = new Configuration();
            properties.forEach(conf::setString);
            Object writer = createWriter();
            TableBucketAssigner assigner = new TableBucketAssigner(computer);
            TableRollingPolicy rollingPolicy = new TableRollingPolicy(
                    !(writer instanceof Encoder),
                    conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                    conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());

            BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
            if (writer instanceof Encoder) {
                //noinspection unchecked
                bucketsBuilder = StreamingFileSink.forRowFormat(
                        path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
                        .withBucketAssigner(assigner)
                        .withOutputFileConfig(outputFileConfig)
                        .withRollingPolicy(rollingPolicy);
            } else {
                //noinspection unchecked
                bucketsBuilder = StreamingFileSink.forBulkFormat(
                        path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
                        .withBucketAssigner(assigner)
                        .withOutputFileConfig(outputFileConfig)
                        .withRollingPolicy(rollingPolicy);
            }
            return createStreamingSink(
                    conf,
                    path,
                    partitionKeys,
                    tableIdentifier,
                    overwrite,
                    dataStream,
                    bucketsBuilder,
                    metaStoreFactory,
                    fsFactory,
                    conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis());
        }
    }

一般流式任務都是無界流,所以走else分支:

  1. 根據(jù)format類型創(chuàng)建Writer對象,比如parquet,是從BulkWriter創(chuàng)建來的
  2. 用TableBucketAssigner包裝RowDataPartitionComputer
  3. 構(gòu)造TableRollingPolicy,用于文件的生成策略,BulkWriter是根據(jù)checkpoint的執(zhí)行來生成文件
  4. 構(gòu)造BucketsBuilder對象

createStreamingSink

  1. BucketsBuilder包裝成StreamingFileWriter,這是個operator,繼承了AbstractStreamOperator
  2. 在inputStream后增加了一個operator,主要處理邏輯在這個operator里面
  3. 如果配置了sink.partition-commit.policy.kind,則會進行commit處理,比如維護partition到metastore或者生成_success文件,同樣也是增加了一個operator
  4. 最后通過一個DiscardingSink function將數(shù)據(jù)丟棄,因為數(shù)據(jù)在上面operator已經(jīng)處理過了
public static DataStreamSink<RowData> createStreamingSink(
            Configuration conf,
            Path path,
            List<String> partitionKeys,
            ObjectIdentifier tableIdentifier,
            boolean overwrite,
            DataStream<RowData> inputStream,
            BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
            TableMetaStoreFactory msFactory,
            FileSystemFactory fsFactory,
            long rollingCheckInterval) {
        if (overwrite) {
            throw new IllegalStateException("Streaming mode not support overwrite.");
        }

        StreamingFileWriter fileWriter = new StreamingFileWriter(
                rollingCheckInterval,
                bucketsBuilder);
        DataStream<CommitMessage> writerStream = inputStream.transform(
                StreamingFileWriter.class.getSimpleName(),
                TypeExtractor.createTypeInfo(CommitMessage.class),
                fileWriter).setParallelism(inputStream.getParallelism());

        DataStream<?> returnStream = writerStream;

        // save committer when we don't need it.
        if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) {
            StreamingFileCommitter committer = new StreamingFileCommitter(
                    path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf);
            returnStream = writerStream
                    .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer)
                    .setParallelism(1)
                    .setMaxParallelism(1);
        }
        //noinspection unchecked
        return returnStream.addSink(new DiscardingSink()).setParallelism(1);
    }

PS:這里有個java8的函數(shù)式接口的寫法,第一次接觸的同學可能會有點蒙,如果接口只有一個抽象方法,那么接口就是函數(shù)式接口,實現(xiàn)方式可以有很多種,最常見的就是使用匿名內(nèi)部類,還有就是使用lambda或構(gòu)造器引用來實現(xiàn)。如下,

FileSystemFactory fsFactory = FileSystem::get;
//等同于 匿名類
        FileSystemFactory fileSystemFactory = new FileSystemFactory() {
            public FileSystem create(URI fsUri) throws IOException {
                return FileSystem.get(fsUri);
            }
        };

//      等同于 lambda
        FileSystemFactory fileSystemFactory = uri -> FileSystem.get(uri);

數(shù)據(jù)寫入filesystem

數(shù)據(jù)處理在StreamingFileWriter#processElement

public void processElement(StreamRecord<RowData> element) throws Exception {
        helper.onElement(
                element.getValue(),
                getProcessingTimeService().getCurrentProcessingTime(),
                element.hasTimestamp() ? element.getTimestamp() : null,
                currentWatermark);
    }

在此之前會在initializeState中通過BucketsBuilder創(chuàng)建Buckets,并封裝到StreamingFileSinkHelper中

@Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        buckets = bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask());

        // Set listener before the initialization of Buckets.
        inactivePartitions = new HashSet<>();
        buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
            @Override
            public void bucketCreated(Bucket<RowData, String> bucket) {
            }

            @Override
            public void bucketInactive(Bucket<RowData, String> bucket) {
                inactivePartitions.add(bucket.getBucketId());
            }
        });

        helper = new StreamingFileSinkHelper<>(
                buckets,
                context.isRestored(),
                context.getOperatorStateStore(),
                getRuntimeContext().getProcessingTimeService(),
                bucketCheckInterval);
        currentWatermark = Long.MIN_VALUE;
    }

回到processElement,跟進代碼你會發(fā)現(xiàn)最終數(shù)據(jù)會由Bucket的write寫入文件

void write(IN element, long currentTime) throws IOException {
  //判斷是否有inprogress的文件,如果沒有則新起一個
        if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
                        subtaskIndex, bucketId, element);
            }

            inProgressPart = rollPartFile(currentTime);
        }
        inProgressPart.write(element, currentTime);
    }

最終通過調(diào)用第三方包中write的方式寫入文件系統(tǒng),如 hadoop、hive、parquet、orc等

checkpoint

做cp的是snapshotState方法,主要邏輯在Buckets類中

public void snapshotState(
            final long checkpointId,
            final ListState<byte[]> bucketStatesContainer,
            final ListState<Long> partCounterStateContainer) throws Exception {

        Preconditions.checkState(
            bucketWriter != null && bucketStateSerializer != null,
                "sink has not been initialized");

        LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
                subtaskIndex, checkpointId, maxPartCounter);

        bucketStatesContainer.clear();
        partCounterStateContainer.clear();

        snapshotActiveBuckets(checkpointId, bucketStatesContainer);
        partCounterStateContainer.add(maxPartCounter);
    }
    
private void snapshotActiveBuckets(
            final long checkpointId,
            final ListState<byte[]> bucketStatesContainer) throws Exception {

        for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
            final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);

            final byte[] serializedBucketState = SimpleVersionedSerialization
                    .writeVersionAndSerialize(bucketStateSerializer, bucketState);

            bucketStatesContainer.add(serializedBucketState);

            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
            }
        }
    }

這里會對active狀態(tài)的Bucket進行snapshot

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
        prepareBucketForCheckpointing(checkpointId);

        InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
        long inProgressFileCreationTime = Long.MAX_VALUE;

        if (inProgressPart != null) {
            inProgressFileRecoverable = inProgressPart.persist();
            inProgressFileCreationTime = inProgressPart.getCreationTime();
            this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
        }

        return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);//返回BucketState,用于序列化
    }
    
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
        if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
            }
            closePartFile();
        }

        if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
            pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
            pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();//重置
        }
    }

核心邏輯在closePartFile中,將inprogress狀態(tài)的文件關(guān)閉并由內(nèi)存提交到文件系統(tǒng)中,得到pendingFileRecoverable對象并存儲到pendingFileRecoverablesForCurrentCheckpoint列表里,為snapshot準備。

private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
        if (inProgressPart != null) {
            pendingFileRecoverable = inProgressPart.closeForCommit();
            pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
            inProgressPart = null;//置位null
        }
        return pendingFileRecoverable;
    }

寫入中的文件是in progress,此時是不可以讀取的,什么時候才可以被下游讀取,取決于文件什么時候提交。上一步已經(jīng)將數(shù)據(jù)寫入文件了,但是還沒有正式提交。我們知道checkpoint的幾個步驟,不了解的可以參考之前的博文,在最后一步checkpointcoordinator會調(diào)用各operator的notifyCheckpointComplete方法。

public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        commitUpToCheckpoint(checkpointId);
    }

public void commitUpToCheckpoint(final long checkpointId) throws IOException {
        final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
                activeBuckets.entrySet().iterator();

        LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);

        while (activeBucketIt.hasNext()) {
            final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
            bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

            if (!bucket.isActive()) {//由于前面一系列清理動作,這里的bucket將不會是active狀態(tài)
                // We've dealt with all the pending files and the writer for this bucket is not currently open.
                // Therefore this bucket is currently inactive and we can remove it from our state.
                activeBucketIt.remove();
                notifyBucketInactive(bucket);
            }
        }
    }

文件的提交是在Bucket的onSuccessfulCompletionOfCheckpoint

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
        checkNotNull(bucketWriter);

        Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
                pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
                        .entrySet().iterator();

        while (it.hasNext()) {
            Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();

            for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
                bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
            }
            it.remove();
        }

        cleanupInProgressFileRecoverables(checkpointId);
    }

在commit方法中對文件進行重命名,使其能夠被下游讀取,比如hadoop的commit實現(xiàn)

@Override
        public void commit() throws IOException {
            final Path src = recoverable.tempFile();
            final Path dest = recoverable.targetFile();
            final long expectedLength = recoverable.offset();

            final FileStatus srcStatus;
            try {
                srcStatus = fs.getFileStatus(src);
            }
            catch (IOException e) {
                throw new IOException("Cannot clean commit: Staging file does not exist.");
            }

            if (srcStatus.getLen() != expectedLength) {
                // something was done to this file since the committer was created.
                // this is not the "clean" case
                throw new IOException("Cannot clean commit: File has trailing junk data.");
            }

            try {
                fs.rename(src, dest);
            }
            catch (IOException e) {
                throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
            }
        }

最后會對InprogressFile的一些狀態(tài)做清理工作

private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
        Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
                inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
                        .entrySet().iterator();

        while (it.hasNext()) {
            final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();

            // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
            // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
            // the code more readable.

            final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);//除了s3,都返回false
            if (LOG.isDebugEnabled() && successfullyDeleted) {
                LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
            }
            it.remove();//清除
        }
    }

partition commit

分區(qū)提交的觸發(fā)以及提交的策略。
觸發(fā)條件分為process-time和partition-time。
process time的原理是當前Checkpoint需要提交的分區(qū)和當前系統(tǒng)時間注冊到pendingPartitions map中,在提交時判斷注冊時間+delay是否小于當前系統(tǒng)時間來確定是否需要提交分區(qū),如果delay=0直接提交。
所以如果delay=0立即提交,如果有數(shù)據(jù)延遲的話可能導致該分區(qū)過早的提交。如果delay=分區(qū)大小,那么就是在Checkpoint間隔+delay后提交上一次Checkpoint需要提交的分區(qū)。

@Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
            this.pendingPartitions.putIfAbsent(partition, procTimeService.getCurrentProcessingTime());
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        List<String> needCommit = new ArrayList<>();
        long currentProcTime = procTimeService.getCurrentProcessingTime();
        Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry<String, Long> entry = iter.next();
            long creationTime = entry.getValue();
            if (commitDelay == 0 || currentProcTime > creationTime + commitDelay) {
                needCommit.add(entry.getKey());
                iter.remove();
            }
        }
        return needCommit;
    }

partition time的原理是基于watermark是否達到分區(qū)時間+delay來判斷是否要提交。

@Override
    public void addPartition(String partition) {
        if (!StringUtils.isNullOrWhitespaceOnly(partition)) {
            this.pendingPartitions.add(partition);
        }
    }

    @Override
    public List<String> committablePartitions(long checkpointId) {
        if (!watermarks.containsKey(checkpointId)) {
            throw new IllegalArgumentException(String.format(
                    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
                    checkpointId, watermarks));
        }

        long watermark = watermarks.get(checkpointId);
        watermarks.headMap(checkpointId, true).clear();

        List<String> needCommit = new ArrayList<>();
        Iterator<String> iter = pendingPartitions.iterator();
        while (iter.hasNext()) {
            String partition = iter.next();
            LocalDateTime partTime = extractor.extract(
                    partitionKeys, extractPartitionValues(new Path(partition)));//根據(jù)path來抽取時間,比如partition='day=2020-12-01/hour=11/minute=11' 轉(zhuǎn)換成 2020-12-01 11:11:00
            if (watermark > toMills(partTime) + commitDelay) {
                needCommit.add(partition);
                iter.remove();
            }
        }
        return needCommit;
    }

Source

讀取數(shù)據(jù)相對于寫入數(shù)據(jù)要簡單些。

創(chuàng)建FileSystemTableSource對象

public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
        Configuration conf = new Configuration();
        context.getTable().getOptions().forEach(conf::setString);

        return new FileSystemTableSource(
                context.getTable().getSchema(),
                getPath(conf),
                context.getTable().getPartitionKeys(),
                conf.get(PARTITION_DEFAULT_NAME),
                context.getTable().getProperties());
    }

構(gòu)造source function,傳入input format用于讀取源數(shù)據(jù)。

public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
   @SuppressWarnings("unchecked")
   TypeInformation<RowData> typeInfo =
         (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
   // Avoid using ContinuousFileMonitoringFunction
   InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
   DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
   return source.name(explainSource());
}

在run方法中,循環(huán)讀取數(shù)據(jù),發(fā)送到下游算子

public void run(SourceContext<OUT> ctx) throws Exception {
        try {

            Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
            if (isRunning && format instanceof RichInputFormat) {
                ((RichInputFormat) format).openInputFormat();
            }

            OUT nextElement = serializer.createInstance();
            while (isRunning) {
                format.open(splitIterator.next());

                // for each element we also check if cancel
                // was called by checking the isRunning flag

                while (isRunning && !format.reachedEnd()) {
                    nextElement = format.nextRecord(nextElement);
                    if (nextElement != null) {
                        ctx.collect(nextElement);
                    } else {
                        break;
                    }
                }
                format.close();
                completedSplitsCounter.inc();

                if (isRunning) {
                    isRunning = splitIterator.hasNext();
                }
            }
        } finally {
            format.close();
            if (format instanceof RichInputFormat) {
                ((RichInputFormat) format).closeInputFormat();
            }
            isRunning = false;
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容