Flink kafka source源碼解析(一)

主要流程

一般在flink中創(chuàng)建kafka source的代碼如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//KafkaEventSchema為自定義的數(shù)據(jù)字段解析類
env.addSource(new FlinkKafkaConsumer<>("foo", new KafkaEventSchema(), properties)

而Kafka的KafkaConsumer API中消費(fèi)某個topic使用的是poll方法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.poll(Duration.ofMillis(100));

本文將介紹flink從env.addSource方法最終調(diào)用到consumer.poll方法的過程。

源碼分析

初始化

初始化執(zhí)行env.addSource的時候會創(chuàng)建StreamSource對象,即final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);這里的function就是傳入的FlinkKafkaConsumer對象,StreamSource構(gòu)造函數(shù)中將這個對象傳給父類AbstractUdfStreamOperator的userFunction變量,源碼如下:

StreamSource.java

public StreamSource(SRC sourceFunction) {
    super(sourceFunction);
    this.chainingStrategy = ChainingStrategy.HEAD;
}

AbstractUdfStreamOperator.java

public AbstractUdfStreamOperator(F userFunction) {
   this.userFunction = requireNonNull(userFunction);
   checkUdfCheckpointingPreconditions();
}

Task運(yùn)行

task啟動后會調(diào)用到SourceStreamTask中的performDefaultAction()方法,這里面會啟動一個線程sourceThread.start();,部分源碼如下

private final LegacySourceFunctionThread sourceThread;

@Override
protected void performDefaultAction(ActionContext context) throws Exception {
    sourceThread.start();
}

LegacySourceFunctionThread的run方法中,通過調(diào)用headOperator.run方法,最終調(diào)用了StreamSource中的run方法,部分源碼如下:

public void run(final Object lockingObject,
                final StreamStatusMaintainer streamStatusMaintainer,
                final Output<StreamRecord<OUT>> collector,
                final OperatorChain<?, ?> operatorChain) throws Exception {

  //省略部分代碼
  this.ctx = StreamSourceContexts.getSourceContext(
    timeCharacteristic,
    getProcessingTimeService(),
    lockingObject,
    streamStatusMaintainer,
    collector,
    watermarkInterval,
    -1);

  try {
    userFunction.run(ctx);
    //省略部分代碼
  } finally {
    // make sure that the context is closed in any case
    ctx.close();
    if (latencyEmitter != null) {
      latencyEmitter.close();
    }
  }
}

這里最重要的就是userFunction.run(ctx);,這個userFunction就是在上面初始化的時候傳入的FlinkKafkaConsumer對象,也就是說這里實際調(diào)用了FlinkKafkaConsumer中的run方法,而具體的方法實現(xiàn)在其父類FlinkKafkaConsumerBase中,至此,進(jìn)入了真正的kafka消費(fèi)階段。

Kafka消費(fèi)階段

FlinkKafkaConsumerBase#run中創(chuàng)建了一個KafkaFetcher對象,并最終調(diào)用了kafkaFetcher.runFetchLoop(),這個方法的代碼片段如下:

/** The thread that runs the actual KafkaConsumer and hand the record batches to this fetcher. */
private final KafkaConsumerThread consumerThread;

@Override
public void runFetchLoop() throws Exception {
  try {
    final Handover handover = this.handover;

    // kick off the actual Kafka consumer
    consumerThread.start();
    
    //省略部分代碼
}

可以看到實際啟動了一個KafkaConsumerThread線程。進(jìn)入到KafkaConsumerThread#run中,下面只是列出了這個方法的部分源碼,完整代碼請參考KafkaConsumerThread.java

@Override
public void run() {
  // early exit check
  if (!running) {
    return;
  }
  // This method initializes the KafkaConsumer and guarantees it is torn down properly.
  // This is important, because the consumer has multi-threading issues,
  // including concurrent 'close()' calls.
  try {
    this.consumer = getConsumer(kafkaProperties);
  } catch (Throwable t) {
    handover.reportError(t);
    return;
  }
  try {

    // main fetch loop
    while (running) {
      try {
        if (records == null) {
          try {
            records = consumer.poll(pollTimeout);
          } catch (WakeupException we) {
            continue;
          }
        }
      }
      // end main fetch loop
    }
  } catch (Throwable t) {
    handover.reportError(t);
  } finally {
    handover.close();
    try {
      consumer.close();
    } catch (Throwable t) {
      log.warn("Error while closing Kafka consumer", t);
    }
  }
}

至此,終于走到了真正從kafka拿數(shù)據(jù)的代碼,即records = consumer.poll(pollTimeout);。因為KafkaConsumer不是線程安全的,所以每個線程都需要生成獨(dú)立的KafkaConsumer對象,即this.consumer = getConsumer(kafkaProperties);

KafkaConsumer<byte[], byte[]> getConsumer(Properties kafkaProperties) {
  return new KafkaConsumer<>(kafkaProperties);
}

總結(jié)

本文只是介紹了flink消費(fèi)kafka數(shù)據(jù)的關(guān)鍵流程,后續(xù)會更詳細(xì)的介紹在AT_LEAST_ONCEEXACTLY_ONCE不同場景下FlinkKafkaConsumer管理offset的流程。

注:本文基于flink 1.9.0和kafka 2.3

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容