Apache Flink 以一種特有的方式來(lái)處理數(shù)據(jù)類型和序列化,包括自有的類型描述器、泛型抽取和類型序列化框架,本文將描述其背后的概念和原理。
Flink的類型處理
Flink試圖去推斷出在分布式計(jì)算過(guò)程中交換和存儲(chǔ)的數(shù)據(jù)的類型信息,像數(shù)據(jù)庫(kù)推斷表模式一樣。在大多數(shù)情況下,F(xiàn)link可以自己無(wú)縫地推測(cè)出所有必要的信息。擁有類型信息Flink就可以完成如下的事情了:
1、使用POJO類型,通過(guò)引用它們的字段名稱對(duì)數(shù)據(jù)進(jìn)行分組、連接和聚合操作,如dataSet.keyBy("username")。類型信息允許Flink可以提前對(duì)類型進(jìn)行檢測(cè)(如拼寫(xiě)錯(cuò)誤和類型兼容性),而不是等到運(yùn)行時(shí)再出錯(cuò)。
2、Flink了解的數(shù)據(jù)類型信息越多,序列化和數(shù)據(jù)類型布局模式就更好,這在Flink的內(nèi)存使用范例中是非常有用的。
3、最后,它還讓用戶在大多數(shù)情況下不再擔(dān)心序列化框架和必須注冊(cè)類型
通常,數(shù)據(jù)類型的信息都是在預(yù)處理階段需要,即在程序?qū)ataStream和DataSet的調(diào)用之前,及執(zhí)行execute()、print()、count()和collect()的任何調(diào)用之前。
常見(jiàn)問(wèn)題
用戶與Flink的數(shù)據(jù)類型處理交互最常見(jiàn)的問(wèn)題是:
1、注冊(cè)子類型:如果函數(shù)僅描述了超類型,但是執(zhí)行過(guò)程中實(shí)際上使用的是這些超類的子類型,這樣的話,F(xiàn)link要識(shí)別這些子類型,可能會(huì)導(dǎo)致一定的性能下降。對(duì)此,可以在StreamExecutionEnvironment 或者 ExecutionEnvironment中調(diào)用 .registerType(clazz) 注冊(cè)子類來(lái)解決。
2、注冊(cè)自定義序列化:對(duì)于不適用于自己的序列化框架的數(shù)據(jù)類型,F(xiàn)link會(huì)使用Kryo來(lái)進(jìn)行序列化,并不是所有的類型都與Kryo無(wú)縫連接。如,許多Google Guava集合類型,解決方案是為出問(wèn)題的數(shù)據(jù)類型注冊(cè)額外的序列化類,在StreamExecutionEnvironment 或者 ExecutionEnvironment 中調(diào)用.getConfig().addDefaultKryoSerializer(clazz, serializer)來(lái)實(shí)現(xiàn),額外的Kryo序列化器在許多第三方庫(kù)中都存在的,可以參考(自定義序列化器)[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/custom_serializers.html]來(lái)了解更多如何創(chuàng)建自定義序列化器的信息。
3、添加類型提示:有時(shí),當(dāng)Flink用盡各種手段都無(wú)法推測(cè)出泛型信息時(shí),用戶需要傳入一個(gè)類型提示,這個(gè)只適用于JAVA API,類型提示部分描述了許多類型提示的信息。
4、手動(dòng)創(chuàng)建一個(gè)TypeInformation:對(duì)于某些API調(diào)用來(lái)說(shuō),這可能是必須的,因?yàn)镕link無(wú)法通過(guò)Java的通用類型擦除來(lái)推斷數(shù)據(jù)類型。查看創(chuàng)建一個(gè)TypeInformation 或者 TypeSerializer來(lái)獲取更多信息。
Flink的TypeInformation類
TypeInformation類是所有類型描述類的基類,它揭示了數(shù)據(jù)類型的一些基礎(chǔ)屬性,并產(chǎn)生序列化器在默寫(xiě)特殊情況下生成類型比較器(注意,F(xiàn)link中的比較器不僅僅定義一個(gè)順序,他們還是處理key的輔助工具utility)。
本質(zhì)上,F(xiàn)link通過(guò)類型來(lái)做出以下區(qū)分:
基礎(chǔ)類型:所有的Java原生類型及它們的封裝類型,包括void,String,Date,BigDecimal和BigInteger。
原生數(shù)組和Object數(shù)組
組合類型:
Flink Java Tuples(Flink Java API的一部分):最多25個(gè)字段,空字段不支持
Scala Case classes(包括Scala tuples):最多25個(gè)字段,空字段不支持
Row:具有任意數(shù)量字段的元組,并支持空字段
POJO:遵循某種Bean模式的類
輔助類型:如:Option,Either,List,Maps,...
泛型類型:這些不會(huì)被Flink自帶的序列化器進(jìn)行序列化,但是可以通過(guò)Kryo來(lái)進(jìn)行
POJO非常有意思,因?yàn)樗鼈冎С謩?chuàng)建復(fù)雜數(shù)據(jù)類型并通過(guò)字段名稱來(lái)定義key:dataSet.join(another).where("name").equalTo("personName"),它們?cè)谶\(yùn)行時(shí)是透明的,并被Flink高效地處理。
POJO類型的規(guī)則
如果一個(gè)數(shù)據(jù)類型滿足如下條件的話,就被認(rèn)為是一個(gè)POJO類型:
?。?、class是public的或者是獨(dú)立的(不是非static內(nèi)部類)
2、class有無(wú)參構(gòu)造函數(shù)
3、所有class中的非靜態(tài)的、非局部字段要么是public類型,要么有public類型的getter和setter方法
創(chuàng)建一個(gè)TypeInformation或者TypeSerializer
為一個(gè)數(shù)據(jù)類型創(chuàng)建一個(gè)TypeInformation,需要根據(jù)不同的語(yǔ)言來(lái)進(jìn)行:
Java
因?yàn)镴ava通常會(huì)擦除類型信息,所以你需要將數(shù)據(jù)類型傳遞給TypeInformation構(gòu)造函數(shù):
對(duì)于非泛型數(shù)據(jù)類型,你可以傳遞Class:
TypeInformation<String> info = TypeInformation.of(String.class);
對(duì)于泛型數(shù)據(jù)類型,你需要通過(guò)TypeHint來(lái)捕獲數(shù)據(jù)累心:
TypeInformation<Tuple2<String,String>> info = TypeInformation.of(new TypeHint<Tuple2<String,String>>(){});
在內(nèi)部,這個(gè)創(chuàng)建了TypeHint的匿名子類,來(lái)捕獲類型的泛型信息并保持知道運(yùn)行時(shí)。
Scala
在Scala中,F(xiàn)link使用編譯時(shí)的宏,并在其任然可用時(shí)捕獲所有通用類型信息
import org.apache.flink.streaming.api.scala._
val stringInfo: TypeInformation[String] = createTypeInformation[String]
val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
你也可以在Java中以回調(diào)函數(shù)的形式來(lái)使用同樣的方法。
創(chuàng)建一個(gè)TypeSerializer,僅需要在TypeInformation類中調(diào)用typeInfo.createSerializer(config)即可。
config參數(shù)是ExecutionConfig類型的,并保持了程序注冊(cè)的自定義serializer的信息,你可以通過(guò)DataStream或者DataSet的getExecutionCondif()方法來(lái)獲取ExecutionConfig。在函數(shù)內(nèi)部(如:MapFunction),你可以創(chuàng)建一個(gè)RichFunction并調(diào)用getRuntimeContext().getExecutionConfig()來(lái)獲取它。
創(chuàng)建一個(gè)TypeSerializer,僅需要在TypeInformation類中調(diào)用typeInfo.createSerializer(config)即可。
config參數(shù)是ExecutionConfig類型的,并保持了程序注冊(cè)的自定義serializer的信息,你可以通過(guò)DataStream或者DataSet的getExecutionCondif()方法來(lái)獲取ExecutionConfig。在函數(shù)內(nèi)部(如:MapFunction),你可以創(chuàng)建一個(gè)RichFunction并調(diào)用getRuntimeContext().getExecutionConfig()來(lái)獲取它。