FlinkKafkaConsumer010 代碼閱讀筆記

0.

FlinkKafkaConsumer010 是 flink 1.6.1 提供的 Kafka 數(shù)據(jù)源接入實(shí)現(xiàn),在 flink 框架中數(shù)據(jù)源需要實(shí)現(xiàn) SourceFunction 接口。

@Public
public interface SourceFunction<T> extends Function, Serializable {

    void run(SourceContext<T> ctx) throws Exception;

    void cancel();

    @Public
    interface SourceContext<T> {

        void collect(T element);

        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        @PublicEvolving
        void emitWatermark(Watermark mark);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

SourceFunction

SourceFunction 聲明了兩個接口方法:

  1. run:啟動一個 source,輸出 element 產(chǎn)生數(shù)據(jù)流;
  2. cancel:取消 source,也就是將 run 方法的執(zhí)行邏輯中止。

SourceContext

flink 通過 SourceContext 提供 element 輸出的接口:

  1. collect : 輸出一個 element,該 element 的時間戳被自動設(shè)置為本地時間;
  2. collectWithTimestamp : 根據(jù)用戶提供的自定義的時間戳輸出一個元素;
  3. emitWatermark : 手動設(shè)置一個Watermark。

至于各種 Time 與 Watermark 之間的關(guān)系可以參考官方文檔,這里就不贅述了。

1. FlinkKafkaConsumer010

首先通過類圖對 FlinkKafkaConsumer010 進(jìn)行整體的認(rèn)識。

FlinkKafkaConsumer010類圖

ParallelSourceFunction 是 SourceFunction 的子類,實(shí)際上該類是一個標(biāo)簽,用于通知系統(tǒng)該 source 可以并行執(zhí)行。

RichFunction 提供了 open 和 close 兩個鉤子方法,用于開始前和結(jié)束后回調(diào)執(zhí)行;另外還提供了 RuntimeContext 的設(shè)置獲取方法。

2. FlinkKafkaConsumer010 構(gòu)建

先來看 FlinkKafkaConsumer010 的構(gòu)建邏輯,重點(diǎn)在 FlinkKafkaConsumer09 中:

    private FlinkKafkaConsumer09(
            List<String> topics,
            Pattern subscriptionPattern,
            KeyedDeserializationSchema<T> deserializer,
            Properties props) {

        super(
                topics,
                subscriptionPattern,
                deserializer,
                getLong(
                    checkNotNull(props, "props"),
                    KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
                !getBoolean(props, KEY_DISABLE_METRICS, false));

        this.properties = props;
        setDeserializer(this.properties);

        // configure the polling timeout
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }
  1. 忽略用戶配置的 deserializer,對于官方 KafkaConsumer 的 deserializer 配置為 ByteArrayDeserializer,至于 FlinkKafkaConsumer 層面的反序列化通過 KeyedDeserializationSchema 接口實(shí)現(xiàn)。
  2. 如果用戶沒有 flink.poll-timeout,那么默認(rèn) 100ms,用于設(shè)置 poll kafka 數(shù)據(jù)的等待時間。

順著看父類 FlinkKafkaConsumerBase 的構(gòu)造方法:

    public FlinkKafkaConsumerBase(
            List<String> topics,
            Pattern topicPattern,
            KeyedDeserializationSchema<T> deserializer,
            long discoveryIntervalMillis,
            boolean useMetrics) {
        this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
        this.deserializer = checkNotNull(deserializer, "valueDeserializer");

        checkArgument(
            discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0,
            "Cannot define a negative value for the topic / partition discovery interval.");
        this.discoveryIntervalMillis = discoveryIntervalMillis;

        this.useMetrics = useMetrics;
    }
  1. KafkaTopicsDescriptor 封裝了 topic 的獲取邏輯;
  2. discoveryIntervalMillis 設(shè)置 partition 自動更新的周期,默認(rèn)為 Long.MIN_VALUE,也就是不要自動發(fā)現(xiàn)。

3. FlinkKafkaConsumer010 初始化

初始化的流程主要包含兩部分:CheckpointedFunction.initializeState 和RichFunction.open。

a. CheckpointedFunction.initializeState

在FlinkKafkaConsumerBase 中:

    public final void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();

        ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
            stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
                OFFSETS_STATE_NAME,
                TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));

        if (context.isRestored() && !restoredFromOldState) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // migrate from 1.2 state, if there is any
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                restoredFromOldState = true;
                unionOffsetStates.add(kafkaOffset);
            }
            oldRoundRobinListState.clear();

            if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                throw new IllegalArgumentException(
                    "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }

            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
        } else {
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
    }

從檢查點(diǎn)中恢復(fù) offset 到 restoredState 中。

其中對于 1.2 版本的狀態(tài)也做了兼容,這里的細(xì)節(jié)就不看了。

b. RichFunction.open

在FlinkKafkaConsumerBase 中:

    public void open(Configuration configuration) throws Exception {
        // determine the offset commit mode
        this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        // create the partition discoverer
        this.partitionDiscoverer = createPartitionDiscoverer(
                topicsDescriptor,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks());
        this.partitionDiscoverer.open();

        subscribedPartitionsToStartOffsets = new HashMap<>();

        List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                if (!restoredFromOldState) {
                    // seed the partition discoverer with the union state while filtering out
                    // restored partitions that should not be subscribed by this subtask
                    if (KafkaTopicPartitionAssigner.assign(
                        restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                            == getRuntimeContext().getIndexOfThisSubtask()){
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                } else {
                    // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                    // in this case, just use the restored state as the subscribed partitions
                    subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
        } else {
            // use the partition discoverer to fetch the initial seed partitions,
            // and set their initial offsets depending on the startup mode.
            // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
            // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
            // when the partition is actually read.
            switch (startupMode) {
                case SPECIFIC_OFFSETS:
                    if (specificStartupOffsets == null) {
                        throw new IllegalStateException(
                            "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                ", but no specific offsets were specified.");
                    }

                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        Long specificOffset = specificStartupOffsets.get(seedPartition);
                        if (specificOffset != null) {
                            // since the specified offsets represent the next record to read, we subtract
                            // it by one so that the initial state of the consumer will be correct
                            subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
                        } else {
                            // default to group offset behaviour if the user-provided specific offsets
                            // do not contain a value for this partition
                            subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                        }
                    }

                    break;
                case TIMESTAMP:
                    if (startupOffsetsTimestamp == null) {
                        throw new IllegalStateException(
                            "Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
                                ", but no startup timestamp was specified.");
                    }

                    for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
                            : fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
                        subscribedPartitionsToStartOffsets.put(
                            partitionToOffset.getKey(),
                            (partitionToOffset.getValue() == null)
                                    // if an offset cannot be retrieved for a partition with the given timestamp,
                                    // we default to using the latest offset for the partition
                                    ? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
                                    // since the specified offsets represent the next record to read, we subtract
                                    // it by one so that the initial state of the consumer will be correct
                                    : partitionToOffset.getValue() - 1);
                    }

                    break;
                default:
                    for (KafkaTopicPartition seedPartition : allPartitions) {
                        subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
                    }
            }

            if (!subscribedPartitionsToStartOffsets.isEmpty()) {
                switch (startupMode) {
                    case EARLIEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case TIMESTAMP:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            startupOffsetsTimestamp,
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            specificStartupOffsets,
                            subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                    "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                partitionsDefaultedToGroupOffsets.size(),
                                partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    default:
                    case GROUP_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                }
            } else {
                LOG.info("Consumer subtask {} initially has no partitions to read from.",
                    getRuntimeContext().getIndexOfThisSubtask());
            }
        }
    }

OffsetCommitMode

  1. 如果 getRuntimeContext().isCheckpointingEnabled() 并且 enableCommitOnCheckpoints 那么設(shè)置為 ON_CHECKPOINTS,也就是當(dāng) flink 的檢查點(diǎn)完成后提交 offset;
  2. 如果 isCheckpointingEnabled 為 false,而 enableAutoCommit 為 true,那么設(shè)置為 KAFKA_PERIODIC,也就是使用官方 KafkaConsumer 的周期性自動提交 offset 的機(jī)制;
  3. 否則設(shè)置為 DISABLED,也就是關(guān)閉 offset 提交。

Kafka010PartitionDiscoverer

  1. 根據(jù) KafkaTopicsDescriptor 調(diào)用 Kafka 的接口獲取需要消費(fèi)的 TopicPartition;
  2. 根據(jù) numParallelSubtasks 和 indexOfThisSubtask 選擇當(dāng)前 task 需要消費(fèi)的 TopicPartition。
    public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
        if (isUndiscoveredPartition(partition)) {
            discoveredPartitions.add(partition);

            return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
        }

        return false;
    }

回想一下,這個是 ParallelSourceFunction,也就是說會有多個實(shí)例并行執(zhí)行,所以不同的實(shí)例需要分配一下 TopicPartition。

StartOffsets

如果從檢查點(diǎn)恢復(fù)了狀態(tài),那么依據(jù)檢查點(diǎn)的內(nèi)容設(shè)置 subscribedPartitionsToStartOffsets,對于新增的 partition 設(shè)置為 EARLIEST_OFFSET;

如果沒有從檢查點(diǎn)恢復(fù)狀態(tài),那么根據(jù) startupMode 設(shè)置 subscribedPartitionsToStartOffsets。

startupMode 默認(rèn)為 GROUP_OFFSETS,也就是依據(jù) Kafka 消費(fèi)組提交的 offset 繼續(xù)消費(fèi)。

4. FlinkKafkaConsumer010 執(zhí)行

執(zhí)行邏輯的接口方法是 SourceFunction 的 run。

具體實(shí)現(xiàn)在 FlinkKafkaConsumerBase 中:

    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

        this.offsetCommitCallback = new KafkaCommitCallback() {
            @Override
            public void onSuccess() {
                successfulCommits.inc();
            }

            @Override
            public void onException(Throwable cause) {
                LOG.warn("Async Kafka commit failed.", cause);
                failedCommits.inc();
            }
        };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        // from this point forward:
        //   - 'snapshotState' will draw offsets from the fetcher,
        //     instead of being built from `subscribedPartitionsToStartOffsets`
        //   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
        //     Kafka through the fetcher, if configured to do so)
        this.kafkaFetcher = createFetcher(
                sourceContext,
                subscribedPartitionsToStartOffsets,
                periodicWatermarkAssigner,
                punctuatedWatermarkAssigner,
                (StreamingRuntimeContext) getRuntimeContext(),
                offsetCommitMode,
                getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
                useMetrics);

        if (!running) {
            return;
        }

        // depending on whether we were restored with the current state version (1.3),
        // remaining logic branches off into 2 paths:
        //  1) New state - partition discovery loop executed as separate thread, with this
        //                 thread running the main fetcher loop
        //  2) Old state - partition discovery is disabled and only the main fetcher loop is executed

        if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
            final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
            this.discoveryLoopThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // --------------------- partition discovery loop ---------------------

                        List<KafkaTopicPartition> discoveredPartitions;

                        // throughout the loop, we always eagerly check if we are still running before
                        // performing the next operation, so that we can escape the loop as soon as possible

                        while (running) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
                            }

                            try {
                                discoveredPartitions = partitionDiscoverer.discoverPartitions();
                            } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
                                // the partition discoverer may have been closed or woken up before or during the discovery;
                                // this would only happen if the consumer was canceled; simply escape the loop
                                break;
                            }

                            // no need to add the discovered partitions if we were closed during the meantime
                            if (running && !discoveredPartitions.isEmpty()) {
                                kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
                            }

                            // do not waste any time sleeping if we're not running anymore
                            if (running && discoveryIntervalMillis != 0) {
                                try {
                                    Thread.sleep(discoveryIntervalMillis);
                                } catch (InterruptedException iex) {
                                    // may be interrupted if the consumer was canceled midway; simply escape the loop
                                    break;
                                }
                            }
                        }
                    } catch (Exception e) {
                        discoveryLoopErrorRef.set(e);
                    } finally {
                        // calling cancel will also let the fetcher loop escape
                        // (if not running, cancel() was already called)
                        if (running) {
                            cancel();
                        }
                    }
                }
            }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());

            discoveryLoopThread.start();
            kafkaFetcher.runFetchLoop();

            // --------------------------------------------------------------------

            // make sure that the partition discoverer is properly closed
            partitionDiscoverer.close();
            discoveryLoopThread.join();

            // rethrow any fetcher errors
            final Exception discoveryLoopError = discoveryLoopErrorRef.get();
            if (discoveryLoopError != null) {
                throw new RuntimeException(discoveryLoopError);
            }
        } else {
            // won't be using the discoverer
            partitionDiscoverer.close();

            kafkaFetcher.runFetchLoop();
        }
    }

a. Kafka010Fetcher 構(gòu)建

通過抽象方法 createFetcher 方法創(chuàng)建 AbstractFetcher,具體的實(shí)現(xiàn)在子類 FlinkKafkaConsumer010 中。

關(guān)鍵有三個模塊:unassignedPartitionsQueue、Handover 和 consumerThread。

unassignedPartitionsQueue

這是一個 ClosableBlockingQueue<KafkaTopicPartitionState<KPH>> 隊(duì)列,當(dāng)初始化時會把需要消費(fèi)的 TopicPartition 加入這個隊(duì)列;如果啟動了 TopicPartition 周期性自動發(fā)現(xiàn),那么后續(xù)新發(fā)現(xiàn) TopicPartition 也會加入這個隊(duì)列。

Handover

可以理解為一個長度為一的阻塞隊(duì)列,將 consumerThread 獲取的消息或者拋出的異常,傳遞給 flink 執(zhí)行的線程。

KafkaConsumerThread

  1. 封裝了 Kafka 消費(fèi)的邏輯,另外依靠 unassignedPartitionsQueue,可以動態(tài)添加新的 TopicPartition。
  2. 封裝了 offset 提交的邏輯,如果提交策略是 OffsetCommitMode.ON_CHECKPOINTS,那么利用 CheckpointListener 的回調(diào)執(zhí)行 offset 提交,其中線程間通信使用了 nextOffsetsToCommit 這個數(shù)據(jù)結(jié)構(gòu)。

b. Kafka010Fetcher 執(zhí)行

執(zhí)行 runFetchLoop 方法

    public void runFetchLoop() throws Exception {
        try {
            final Handover handover = this.handover;

            // kick off the actual Kafka consumer
            consumerThread.start();

            while (running) {
                // this blocks until we get the next records
                // it automatically re-throws exceptions encountered in the consumer thread
                final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

                // get the records for each topic partition
                for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {

                    List<ConsumerRecord<byte[], byte[]>> partitionRecords =
                            records.records(partition.getKafkaPartitionHandle());

                    for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
                        final T value = deserializer.deserialize(
                                record.key(), record.value(),
                                record.topic(), record.partition(), record.offset());

                        if (deserializer.isEndOfStream(value)) {
                            // end of stream signaled
                            running = false;
                            break;
                        }

                        // emit the actual record. this also updates offset state atomically
                        // and deals with timestamps and watermark generation
                        emitRecord(value, partition, record.offset(), record);
                    }
                }
            }
        }
        finally {
            // this signals the consumer thread that no more work is to be done
            consumerThread.shutdown();
        }

        // on a clean exit, wait for the runner thread
        try {
            consumerThread.join();
        }
        catch (InterruptedException e) {
            // may be the result of a wake-up interruption after an exception.
            // we ignore this here and only restore the interruption state
            Thread.currentThread().interrupt();
        }
    }

從 handover 中獲取 ConsumerRecords,經(jīng)過 deserializer 反序列化后 emit 出去。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 隨著大數(shù)據(jù) 2.0 時代悄然到來,大數(shù)據(jù)從簡單的批處理擴(kuò)展到了實(shí)時處理、流處理、交互式查詢和機(jī)器學(xué)習(xí)應(yīng)用。近年來涌...
    Java大生閱讀 2,240評論 0 6
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評論 19 139
  • 這個連接器提供了對由Apache Kafka提供的事件流的訪問。 Flink 提供了特殊的Kafka Connec...
    寫B(tài)ug的張小天閱讀 21,645評論 2 17
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,913評論 13 425
  • 前些日子在做移動端的開發(fā),早就聽聞移動端有300ms點(diǎn)擊延遲這一個問題,于是上找?guī)旖鉀Q。出于好奇心拜讀了一下Fas...
    KedAyA閱讀 1,442評論 1 0

友情鏈接更多精彩內(nèi)容