Flink的數(shù)據(jù)類型和序列化

Apache Flink 以一種特有的方式來(lái)處理數(shù)據(jù)類型和序列化,包括自有的類型描述器、泛型抽取和類型序列化框架,本文將描述其背后的概念和原理。

Flink的類型處理

Flink試圖去推斷出在分布式計(jì)算過(guò)程中交換和存儲(chǔ)的數(shù)據(jù)的類型信息,像數(shù)據(jù)庫(kù)推斷表模式一樣。在大多數(shù)情況下,F(xiàn)link可以自己無(wú)縫地推測(cè)出所有必要的信息。擁有類型信息Flink就可以完成如下的事情了:
  1、使用POJO類型,通過(guò)引用它們的字段名稱對(duì)數(shù)據(jù)進(jìn)行分組、連接和聚合操作,如dataSet.keyBy("username")。類型信息允許Flink可以提前對(duì)類型進(jìn)行檢測(cè)(如拼寫(xiě)錯(cuò)誤和類型兼容性),而不是等到運(yùn)行時(shí)再出錯(cuò)。
  2、Flink了解的數(shù)據(jù)類型信息越多,序列化和數(shù)據(jù)類型布局模式就更好,這在Flink的內(nèi)存使用范例中是非常有用的。
3、最后,它還讓用戶在大多數(shù)情況下不再擔(dān)心序列化框架和必須注冊(cè)類型
通常,數(shù)據(jù)類型的信息都是在預(yù)處理階段需要,即在程序?qū)ataStream和DataSet的調(diào)用之前,及執(zhí)行execute()、print()、count()和collect()的任何調(diào)用之前。

常見(jiàn)問(wèn)題

用戶與Flink的數(shù)據(jù)類型處理交互最常見(jiàn)的問(wèn)題是:
  1、注冊(cè)子類型:如果函數(shù)僅描述了超類型,但是執(zhí)行過(guò)程中實(shí)際上使用的是這些超類的子類型,這樣的話,F(xiàn)link要識(shí)別這些子類型,可能會(huì)導(dǎo)致一定的性能下降。對(duì)此,可以在StreamExecutionEnvironment 或者 ExecutionEnvironment中調(diào)用 .registerType(clazz) 注冊(cè)子類來(lái)解決。
  2、注冊(cè)自定義序列化:對(duì)于不適用于自己的序列化框架的數(shù)據(jù)類型,F(xiàn)link會(huì)使用Kryo來(lái)進(jìn)行序列化,并不是所有的類型都與Kryo無(wú)縫連接。如,許多Google Guava集合類型,解決方案是為出問(wèn)題的數(shù)據(jù)類型注冊(cè)額外的序列化類,在StreamExecutionEnvironment 或者 ExecutionEnvironment 中調(diào)用.getConfig().addDefaultKryoSerializer(clazz, serializer)來(lái)實(shí)現(xiàn),額外的Kryo序列化器在許多第三方庫(kù)中都存在的,可以參考(自定義序列化器)[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/custom_serializers.html]來(lái)了解更多如何創(chuàng)建自定義序列化器的信息。
  3、添加類型提示:有時(shí),當(dāng)Flink用盡各種手段都無(wú)法推測(cè)出泛型信息時(shí),用戶需要傳入一個(gè)類型提示,這個(gè)只適用于JAVA API,類型提示部分描述了許多類型提示的信息。
  4、手動(dòng)創(chuàng)建一個(gè)TypeInformation:對(duì)于某些API調(diào)用來(lái)說(shuō),這可能是必須的,因?yàn)镕link無(wú)法通過(guò)Java的通用類型擦除來(lái)推斷數(shù)據(jù)類型。查看創(chuàng)建一個(gè)TypeInformation 或者 TypeSerializer來(lái)獲取更多信息。

Flink的TypeInformation類

TypeInformation類是所有類型描述類的基類,它揭示了數(shù)據(jù)類型的一些基礎(chǔ)屬性,并產(chǎn)生序列化器在默寫(xiě)特殊情況下生成類型比較器(注意,F(xiàn)link中的比較器不僅僅定義一個(gè)順序,他們還是處理key的輔助工具utility)。
本質(zhì)上,F(xiàn)link通過(guò)類型來(lái)做出以下區(qū)分:
  基礎(chǔ)類型:所有的Java原生類型及它們的封裝類型,包括void,StringDate,BigDecimalBigInteger
  原生數(shù)組和Object數(shù)組
  組合類型:
    Flink Java Tuples(Flink Java API的一部分):最多25個(gè)字段,空字段不支持
    Scala Case classes(包括Scala tuples):最多25個(gè)字段,空字段不支持
    Row:具有任意數(shù)量字段的元組,并支持空字段
    POJO:遵循某種Bean模式的類
  輔助類型:如:Option,Either,List,Maps,...
  泛型類型:這些不會(huì)被Flink自帶的序列化器進(jìn)行序列化,但是可以通過(guò)Kryo來(lái)進(jìn)行
POJO非常有意思,因?yàn)樗鼈冎С謩?chuàng)建復(fù)雜數(shù)據(jù)類型并通過(guò)字段名稱來(lái)定義key:dataSet.join(another).where("name").equalTo("personName"),它們?cè)谶\(yùn)行時(shí)是透明的,并被Flink高效地處理。

POJO類型的規(guī)則

如果一個(gè)數(shù)據(jù)類型滿足如下條件的話,就被認(rèn)為是一個(gè)POJO類型:
 ?。?、class是public的或者是獨(dú)立的(不是非static內(nèi)部類)
  2、class有無(wú)參構(gòu)造函數(shù)
  3、所有class中的非靜態(tài)的、非局部字段要么是public類型,要么有public類型的getter和setter方法

創(chuàng)建一個(gè)TypeInformation或者TypeSerializer

為一個(gè)數(shù)據(jù)類型創(chuàng)建一個(gè)TypeInformation,需要根據(jù)不同的語(yǔ)言來(lái)進(jìn)行:

Java

因?yàn)镴ava通常會(huì)擦除類型信息,所以你需要將數(shù)據(jù)類型傳遞給TypeInformation構(gòu)造函數(shù):
對(duì)于非泛型數(shù)據(jù)類型,你可以傳遞Class:
TypeInformation<String> info = TypeInformation.of(String.class);
對(duì)于泛型數(shù)據(jù)類型,你需要通過(guò)TypeHint來(lái)捕獲數(shù)據(jù)累心:
TypeInformation<Tuple2<String,String>> info = TypeInformation.of(new TypeHint<Tuple2<String,String>>(){});
在內(nèi)部,這個(gè)創(chuàng)建了TypeHint的匿名子類,來(lái)捕獲類型的泛型信息并保持知道運(yùn)行時(shí)。

Scala

在Scala中,F(xiàn)link使用編譯時(shí)的宏,并在其任然可用時(shí)捕獲所有通用類型信息

import org.apache.flink.streaming.api.scala._

val stringInfo: TypeInformation[String] = createTypeInformation[String]

val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]

你也可以在Java中以回調(diào)函數(shù)的形式來(lái)使用同樣的方法。

創(chuàng)建一個(gè)TypeSerializer,僅需要在TypeInformation類中調(diào)用typeInfo.createSerializer(config)即可。
config參數(shù)是ExecutionConfig類型的,并保持了程序注冊(cè)的自定義serializer的信息,你可以通過(guò)DataStream或者DataSet的getExecutionCondif()方法來(lái)獲取ExecutionConfig。在函數(shù)內(nèi)部(如:MapFunction),你可以創(chuàng)建一個(gè)RichFunction并調(diào)用getRuntimeContext().getExecutionConfig()來(lái)獲取它。

創(chuàng)建一個(gè)TypeSerializer,僅需要在TypeInformation類中調(diào)用typeInfo.createSerializer(config)即可。
config參數(shù)是ExecutionConfig類型的,并保持了程序注冊(cè)的自定義serializer的信息,你可以通過(guò)DataStream或者DataSet的getExecutionCondif()方法來(lái)獲取ExecutionConfig。在函數(shù)內(nèi)部(如:MapFunction),你可以創(chuàng)建一個(gè)RichFunction并調(diào)用getRuntimeContext().getExecutionConfig()來(lái)獲取它。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 原文鏈接:https://ci.apache.org/projects/flink/flink-docs-rele...
    寫(xiě)B(tài)ug的張小天閱讀 11,157評(píng)論 0 5
  • Basic API Concepts Flink程序是實(shí)現(xiàn)基于分布式采集的轉(zhuǎn)換程序(如:過(guò)濾器,映射,更新?tīng)顟B(tài),連...
    MiyoungCheng閱讀 1,948評(píng)論 0 0
  • Table API和SQL通過(guò)join API集成在一起,這個(gè)join API的核心概念是Table,Table可...
    寫(xiě)B(tài)ug的張小天閱讀 16,888評(píng)論 0 15
  • JAVA序列化機(jī)制的深入研究 對(duì)象序列化的最主要的用處就是在傳遞,和保存對(duì)象(object)的時(shí)候,保證對(duì)象的完整...
    時(shí)待吾閱讀 11,163評(píng)論 0 24
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139

友情鏈接更多精彩內(nèi)容