剖析flink中kafkaTbaleSource的實(shí)現(xiàn)

續(xù)??一個(gè)基于flinkSql 的實(shí)時(shí)計(jì)算平臺(tái)?http://www.itdecent.cn/p/db1a89e6fa85?文中 剖析fink中kafkaTbaleSource的實(shí)現(xiàn)

此文想看看Kafka 在 flink table 中的實(shí)現(xiàn)。

首先再flink1.7中TbaleSource定義的流程是,先定義 tableFactory 再定義 tableSource 再定義 sourceFunction. 我們就按照這個(gè)套路來(lái)看看源代碼

一、Kafka010TableSourceSinkFactory

Kafka010TableSourceSinkFactory?

看一個(gè)方法createKafkaTableSource()。這個(gè)方法會(huì)再初始化查找到tableFactory 之后就會(huì)被調(diào)用。

至于查找tableFactory? 的過(guò)程是怎樣的可以參考?flink table factory基礎(chǔ)知識(shí) http://www.itdecent.cn/p/6b755ed1a5bb?一文去跟一下源代碼。對(duì)createKafkaTableSource的參數(shù)做下解釋

TableSchema schema :kafka的數(shù)據(jù)最終都會(huì)被映射到table中的一行記錄。這個(gè)schema 就是用來(lái)描述table結(jié)構(gòu)用的,標(biāo)識(shí)kafka中數(shù)據(jù)的列名和數(shù)據(jù)類型

Optional<String> proctimeAttribute:時(shí)間屬性來(lái)標(biāo)識(shí)是用時(shí)間時(shí)間還是處理時(shí)間,默認(rèn)是處理時(shí)間

List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors:時(shí)間描述。如果有類似窗口的作業(yè),基本上都會(huì)有指定時(shí)間,這個(gè)里就是描述你指定的時(shí)間是那個(gè)字段,叫什么名字,水位的定義等

String topic, kafka topic名字

Properties properties, 鏈接kafka的一些屬性比如zk地址等

DeserializationSchema deserializationSchema, 這個(gè)是反序列化器,反序列化的查找過(guò)程同查找tableFactory 的過(guò)程。

StartupMode startupMode: 就是消費(fèi)kafka offset的模式。有EARLIEST,LATEST,默認(rèn)是從上一次消費(fèi)位置開始消費(fèi)

Map<KafkaTopicPartition, Long> specificStartupOffsets:?kafka offset的模式可以選擇指定從某個(gè)offset位置開始消費(fèi),如果是這種模式?specificStartupOffsets 就是用來(lái)標(biāo)識(shí)每個(gè)topic partion 要開始消費(fèi)的offset位置。

createKafkaTableSource() 啥事也么干。開門見(jiàn)山的去?new 了一下 Kafka010TableSource()。參數(shù)原封不動(dòng)的傳給Kafka010TableSource。 下面來(lái)看下?Kafka010TableSource

二、Kafka010TableSource

Kafka010TableSource

這個(gè)類也很明了。三個(gè)方法,2個(gè)構(gòu)造方法,一個(gè)創(chuàng)建kafkaCounsumer的方法。構(gòu)造方法可以忽略無(wú)非就是再初始化對(duì)象的時(shí)候?qū)?duì)象的成員變量進(jìn)行初始化,

看到createKafkaConsumer 這個(gè)方法的名字可以感覺(jué)到這是去創(chuàng)建一個(gè)kafka的消費(fèi)者。本文是看kafkaTbaleSource的實(shí)現(xiàn)。數(shù)據(jù)來(lái)源是kafka,flink是作為一個(gè)消費(fèi)端,接受到數(shù)據(jù)然后作為數(shù)據(jù)處理的soure.先不看里面的Consumer代碼長(zhǎng)啥樣,先看下他在哪里被調(diào)用。

createKafkaConsumer()來(lái)自于父類,父類此方法是抽象的,因?yàn)閷?duì)于不同版本的kafka有不一樣的實(shí)現(xiàn)。createKafkaConsumer()這個(gè)方法在父類的?getKafkaConsumer 方法中被調(diào)用如下

protected FlinkKafkaConsumerBase? ?getKafkaConsumer(...) {

????kafkaConsumer =createKafkaConsumer(...)

}

getKafkaConsumer? getDataStream 方法調(diào)用如下

public DataStream?getDataStream(StreamExecutionEnvironment env) {

????DeserializationSchema deserializationSchema = getDeserializationSchema();

? ?FlinkKafkaConsumerBase kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);

? return env.addSource(kafkaConsumer).name(explainSource());

}

?env.addSource(kafkaConsumer) 看到這句應(yīng)該熟悉flink api的同學(xué)會(huì)很熟悉。這不是就是添加一個(gè)soure的做法嗎? 我們不用flink sql的時(shí)候也就是這么干的. 這說(shuō)明sql的內(nèi)部也是用stream api去實(shí)現(xiàn)的。只是在之上做了一些包裝把數(shù)據(jù)轉(zhuǎn)化成表。把邏輯封裝成sql語(yǔ)句。

getDataStream 方法來(lái)自于StreamTableSource 如下

trait StreamTableSource[T] extends TableSource[T] {

? def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]

}

這是一個(gè)獲取流的方法。在flink在將table api轉(zhuǎn)化成執(zhí)行計(jì)劃的過(guò)程會(huì)被調(diào)用。

再來(lái)捋一下上面的流程。在調(diào)用?ConnectTableDescriptor.registerTableSource(tableName)的時(shí)候會(huì)去查找tableFactory。查找到kafka相關(guān)的factory 比如?Kafka010TableSourceSinkFactory。之后會(huì)調(diào)用tableFactory的createStreamTableSource方法。createStreamTableSource 這個(gè)方法會(huì)調(diào)用?createKafkaTableSource。進(jìn)行初始化Kafka010TableSource。 之后再解析執(zhí)行計(jì)劃的時(shí)候會(huì)調(diào)用Kafka010TableSource 的?getDataStream 方法。這個(gè)方法會(huì)調(diào)用?getKafkaConsumer 方法啟動(dòng)一個(gè)kafka的消費(fèi)者,作為dataStream。最后通過(guò)env.addSource(kafkaConsumer)方法為flink添加soure。 這就是整個(gè)Kafka Table 的執(zhí)行過(guò)程。

三、FlinkKafkaConsumerBase

前面說(shuō)到了獲取kafkaConsumer,那他到底是個(gè)啥玩意呢?先看下下基類FlinkKafkaConsumerBase

FlinkKafkaConsumerBase

看到FlinkKafkaConsumerBase 是繼承??RichParallelSourceFunction 應(yīng)該要知道這是給flink添加一個(gè)streamSource的基本方式就是要繼承?RichParallelSourceFunction 、?RichSourceFunction 、ParallelSourceFunction、SourceFunction 。 這塊不明白的請(qǐng)參考flink soruce詳解 http://www.itdecent.cn/p/d6427dcf7ea2

明白RichParallelSourceFunction是個(gè)啥玩意之后。只需要著重看里面的run 方法。再run方法中做了很多初始化工作。比如createFetcher這就是創(chuàng)建真正的消費(fèi)者。最后會(huì)調(diào)用fetcher 的runFetchLoop 方法。 里面就是一個(gè)死循環(huán) 去消費(fèi)kafka數(shù)據(jù)最后把數(shù)據(jù)發(fā)送給flink.如下截圖。

flink kafka 消費(fèi)者
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容