Flink 的 HDFS Connector

這個Connector提供了一個sink來寫分區(qū)文件到任何Hadoop FileSystem支持的任何文件系統(tǒng)中,為了使用這個Connector,請將下面的依賴添加到你的工程中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-filesystem_2.10</artifactId>
  <version>1.3.0</version>
</dependency>

注意:streaming connectors目前還不是二進制發(fā)布包的一部分,請參考此處來了解如何在分布式執(zhí)行中關聯(lián)到這些connectors。

Flink Sink分桶

分桶行為跟寫行為都是可以配置的,這個后面我們會講到,你可以通過默認配置來創(chuàng)建一個分桶的sink,將數(shù)據(jù)sink到以時間作為劃分的滾動文件中:
Java 代碼:

DataStream<String> input = ...;

input.addSink(new BucketingSink<String>("/base/path"));

Scala 代碼:

val input: DataStream[String] = ...

input.addSink(new BucketingSink[String]("/base/path"))

這里唯一需要參數(shù)是這些分桶所要存儲的目錄地址,sink還可以通過配置一個自定義的bucketer、writer和批大小來進一步配置。

默認情況下分桶sink是通過元素到達的系統(tǒng)時間來進行切分的,并用"yyyy-MM-dd HH"的時間格式來命名桶,這個時間格式與當前的系統(tǒng)時間傳入SimpleDateFormat來形成一個桶的路徑,當遇到一個新的時間后就會創(chuàng)建一個新的桶。例如:如果你有一個以分鐘作為最細粒度的模式,那么你將每分鐘獲得一個新的分桶。每個分桶本身是一個包含若干分區(qū)文件的目錄,每個并行的sink實例會創(chuàng)建它自己的分區(qū)文件,當分區(qū)文件過大時,sink會緊接著其它分區(qū)文件創(chuàng)建一個新的分區(qū)文件。當一個桶變成非活躍狀態(tài)時,打開的文件會被刷新和關閉,當一個桶不再被寫入時,會被認為是非活躍的。默認情況下,sink會每分鐘檢查一遍是否非活躍,并關閉超過一分鐘沒有數(shù)據(jù)寫入的分桶,這種行為可以通過在BucketingSink
setInactiveBucketCheckInterval()setInactiveBucketThreshold()來配置。

你可以在BucketingSink中使用setBucketer()來指定一個自定義的bucketer,如果需要,bucketer可以使用元素或者元組的屬性來決定bucketer的目錄。

默認的writer是StringWriter,這個writer會調用到達的元素的toString()方法,將數(shù)據(jù)以新的行作為劃分寫入到分區(qū)文件中。你可以在BucketingSink中使用setWriter()來指定一個自定義的writer,如果你想寫到Hadoop SequenceFiles,你可以只用預定義的SequenceFileWriter,這個writer還可以指定壓縮格式。

最后的配置項是批大小,這個配置指定了一個分區(qū)文件何時需要被關閉、新的分區(qū)文件開始。(默認的分區(qū)文件大小是384MB)
例如:
Java 代碼:

DataStream<Tuple2<IntWritable,Text>> input = ...;

BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,

input.addSink(sink);

Scala 代碼:

val input: DataStream[Tuple2[IntWritable, Text]] = ...

val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,

input.addSink(sink)

這個例子會創(chuàng)建一個按下面的模式來寫數(shù)據(jù)到分桶文件的sink:
/base/path/{date-time}/part-{parallel-task}-{count}
這里date-time是我們從date/time模式中獲取的字符串,parallel-task是并行sink實例的索引,count是分區(qū)文件的運行編號,這個運行編號是由于分區(qū)文件的批大小導致的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容