續(xù)??一個(gè)基于flinkSql 的實(shí)時(shí)計(jì)算平臺(tái)?http://www.itdecent.cn/p/db1a89e6fa85?文中 剖析fink中kafkaTbaleSource的實(shí)現(xiàn)
此文想看看Kafka 在 flink table 中的實(shí)現(xiàn)。
首先再flink1.7中TbaleSource定義的流程是,先定義 tableFactory 再定義 tableSource 再定義 sourceFunction. 我們就按照這個(gè)套路來(lái)看看源代碼
一、Kafka010TableSourceSinkFactory

看一個(gè)方法createKafkaTableSource()。這個(gè)方法會(huì)再初始化查找到tableFactory 之后就會(huì)被調(diào)用。
至于查找tableFactory? 的過(guò)程是怎樣的可以參考?flink table factory基礎(chǔ)知識(shí) http://www.itdecent.cn/p/6b755ed1a5bb?一文去跟一下源代碼。對(duì)createKafkaTableSource的參數(shù)做下解釋
TableSchema schema :kafka的數(shù)據(jù)最終都會(huì)被映射到table中的一行記錄。這個(gè)schema 就是用來(lái)描述table結(jié)構(gòu)用的,標(biāo)識(shí)kafka中數(shù)據(jù)的列名和數(shù)據(jù)類型
Optional<String> proctimeAttribute:時(shí)間屬性來(lái)標(biāo)識(shí)是用時(shí)間時(shí)間還是處理時(shí)間,默認(rèn)是處理時(shí)間
List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors:時(shí)間描述。如果有類似窗口的作業(yè),基本上都會(huì)有指定時(shí)間,這個(gè)里就是描述你指定的時(shí)間是那個(gè)字段,叫什么名字,水位的定義等
String topic, kafka topic名字
Properties properties, 鏈接kafka的一些屬性比如zk地址等
DeserializationSchema deserializationSchema, 這個(gè)是反序列化器,反序列化的查找過(guò)程同查找tableFactory 的過(guò)程。
StartupMode startupMode: 就是消費(fèi)kafka offset的模式。有EARLIEST,LATEST,默認(rèn)是從上一次消費(fèi)位置開始消費(fèi)
Map<KafkaTopicPartition, Long> specificStartupOffsets:?kafka offset的模式可以選擇指定從某個(gè)offset位置開始消費(fèi),如果是這種模式?specificStartupOffsets 就是用來(lái)標(biāo)識(shí)每個(gè)topic partion 要開始消費(fèi)的offset位置。
createKafkaTableSource() 啥事也么干。開門見(jiàn)山的去?new 了一下 Kafka010TableSource()。參數(shù)原封不動(dòng)的傳給Kafka010TableSource。 下面來(lái)看下?Kafka010TableSource
二、Kafka010TableSource

這個(gè)類也很明了。三個(gè)方法,2個(gè)構(gòu)造方法,一個(gè)創(chuàng)建kafkaCounsumer的方法。構(gòu)造方法可以忽略無(wú)非就是再初始化對(duì)象的時(shí)候?qū)?duì)象的成員變量進(jìn)行初始化,
看到createKafkaConsumer 這個(gè)方法的名字可以感覺(jué)到這是去創(chuàng)建一個(gè)kafka的消費(fèi)者。本文是看kafkaTbaleSource的實(shí)現(xiàn)。數(shù)據(jù)來(lái)源是kafka,flink是作為一個(gè)消費(fèi)端,接受到數(shù)據(jù)然后作為數(shù)據(jù)處理的soure.先不看里面的Consumer代碼長(zhǎng)啥樣,先看下他在哪里被調(diào)用。
createKafkaConsumer()來(lái)自于父類,父類此方法是抽象的,因?yàn)閷?duì)于不同版本的kafka有不一樣的實(shí)現(xiàn)。createKafkaConsumer()這個(gè)方法在父類的?getKafkaConsumer 方法中被調(diào)用如下
protected FlinkKafkaConsumerBase? ?getKafkaConsumer(...) {
????kafkaConsumer =createKafkaConsumer(...)
}
getKafkaConsumer? 被getDataStream 方法調(diào)用如下
public DataStream?getDataStream(StreamExecutionEnvironment env) {
????DeserializationSchema deserializationSchema = getDeserializationSchema();
? ?FlinkKafkaConsumerBase kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
? return env.addSource(kafkaConsumer).name(explainSource());
}
?env.addSource(kafkaConsumer) 看到這句應(yīng)該熟悉flink api的同學(xué)會(huì)很熟悉。這不是就是添加一個(gè)soure的做法嗎? 我們不用flink sql的時(shí)候也就是這么干的. 這說(shuō)明sql的內(nèi)部也是用stream api去實(shí)現(xiàn)的。只是在之上做了一些包裝把數(shù)據(jù)轉(zhuǎn)化成表。把邏輯封裝成sql語(yǔ)句。
getDataStream 方法來(lái)自于StreamTableSource 如下
trait StreamTableSource[T] extends TableSource[T] {
? def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T]
}
這是一個(gè)獲取流的方法。在flink在將table api轉(zhuǎn)化成執(zhí)行計(jì)劃的過(guò)程會(huì)被調(diào)用。
再來(lái)捋一下上面的流程。在調(diào)用?ConnectTableDescriptor.registerTableSource(tableName)的時(shí)候會(huì)去查找tableFactory。查找到kafka相關(guān)的factory 比如?Kafka010TableSourceSinkFactory。之后會(huì)調(diào)用tableFactory的createStreamTableSource方法。createStreamTableSource 這個(gè)方法會(huì)調(diào)用?createKafkaTableSource。進(jìn)行初始化Kafka010TableSource。 之后再解析執(zhí)行計(jì)劃的時(shí)候會(huì)調(diào)用Kafka010TableSource 的?getDataStream 方法。這個(gè)方法會(huì)調(diào)用?getKafkaConsumer 方法啟動(dòng)一個(gè)kafka的消費(fèi)者,作為dataStream。最后通過(guò)env.addSource(kafkaConsumer)方法為flink添加soure。 這就是整個(gè)Kafka Table 的執(zhí)行過(guò)程。
三、FlinkKafkaConsumerBase
前面說(shuō)到了獲取kafkaConsumer,那他到底是個(gè)啥玩意呢?先看下下基類FlinkKafkaConsumerBase

看到FlinkKafkaConsumerBase 是繼承??RichParallelSourceFunction 應(yīng)該要知道這是給flink添加一個(gè)streamSource的基本方式就是要繼承?RichParallelSourceFunction 、?RichSourceFunction 、ParallelSourceFunction、SourceFunction 。 這塊不明白的請(qǐng)參考flink soruce詳解 http://www.itdecent.cn/p/d6427dcf7ea2
明白RichParallelSourceFunction是個(gè)啥玩意之后。只需要著重看里面的run 方法。再run方法中做了很多初始化工作。比如createFetcher這就是創(chuàng)建真正的消費(fèi)者。最后會(huì)調(diào)用fetcher 的runFetchLoop 方法。 里面就是一個(gè)死循環(huán) 去消費(fèi)kafka數(shù)據(jù)最后把數(shù)據(jù)發(fā)送給flink.如下截圖。
