這個Connector提供了一個sink來寫分區(qū)文件到任何Hadoop FileSystem支持的任何文件系統(tǒng)中,為了使用這個Connector,請將下面的依賴添加到你的工程中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.10</artifactId>
<version>1.3.0</version>
</dependency>
注意:streaming connectors目前還不是二進制發(fā)布包的一部分,請參考此處來了解如何在分布式執(zhí)行中關聯(lián)到這些connectors。
Flink Sink分桶
分桶行為跟寫行為都是可以配置的,這個后面我們會講到,你可以通過默認配置來創(chuàng)建一個分桶的sink,將數(shù)據(jù)sink到以時間作為劃分的滾動文件中:
Java 代碼:
DataStream<String> input = ...;
input.addSink(new BucketingSink<String>("/base/path"));
Scala 代碼:
val input: DataStream[String] = ...
input.addSink(new BucketingSink[String]("/base/path"))
這里唯一需要參數(shù)是這些分桶所要存儲的目錄地址,sink還可以通過配置一個自定義的bucketer、writer和批大小來進一步配置。
默認情況下分桶sink是通過元素到達的系統(tǒng)時間來進行切分的,并用"yyyy-MM-dd HH"的時間格式來命名桶,這個時間格式與當前的系統(tǒng)時間傳入SimpleDateFormat來形成一個桶的路徑,當遇到一個新的時間后就會創(chuàng)建一個新的桶。例如:如果你有一個以分鐘作為最細粒度的模式,那么你將每分鐘獲得一個新的分桶。每個分桶本身是一個包含若干分區(qū)文件的目錄,每個并行的sink實例會創(chuàng)建它自己的分區(qū)文件,當分區(qū)文件過大時,sink會緊接著其它分區(qū)文件創(chuàng)建一個新的分區(qū)文件。當一個桶變成非活躍狀態(tài)時,打開的文件會被刷新和關閉,當一個桶不再被寫入時,會被認為是非活躍的。默認情況下,sink會每分鐘檢查一遍是否非活躍,并關閉超過一分鐘沒有數(shù)據(jù)寫入的分桶,這種行為可以通過在BucketingSink的
setInactiveBucketCheckInterval() 和 setInactiveBucketThreshold()來配置。
你可以在BucketingSink中使用setBucketer()來指定一個自定義的bucketer,如果需要,bucketer可以使用元素或者元組的屬性來決定bucketer的目錄。
默認的writer是StringWriter,這個writer會調用到達的元素的toString()方法,將數(shù)據(jù)以新的行作為劃分寫入到分區(qū)文件中。你可以在BucketingSink中使用setWriter()來指定一個自定義的writer,如果你想寫到Hadoop SequenceFiles,你可以只用預定義的SequenceFileWriter,這個writer還可以指定壓縮格式。
最后的配置項是批大小,這個配置指定了一個分區(qū)文件何時需要被關閉、新的分區(qū)文件開始。(默認的分區(qū)文件大小是384MB)
例如:
Java 代碼:
DataStream<Tuple2<IntWritable,Text>> input = ...;
BucketingSink<String> sink = new BucketingSink<String>("/base/path");
sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd--HHmm"));
sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
input.addSink(sink);
Scala 代碼:
val input: DataStream[Tuple2[IntWritable, Text]] = ...
val sink = new BucketingSink[String]("/base/path")
sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
sink.setWriter(new SequenceFileWriter[IntWritable, Text]())
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
input.addSink(sink)
這個例子會創(chuàng)建一個按下面的模式來寫數(shù)據(jù)到分桶文件的sink:
/base/path/{date-time}/part-{parallel-task}-{count}
這里date-time是我們從date/time模式中獲取的字符串,parallel-task是并行sink實例的索引,count是分區(qū)文件的運行編號,這個運行編號是由于分區(qū)文件的批大小導致的。