Hadoop InputFormat介紹
1 概述
我們?cè)诰帉慚apReduce程序的時(shí)候,在設(shè)置輸入格式的時(shí)候,會(huì)調(diào)用如下代碼:
job.setInputFormatClass(KeyVakueTextInputFormat.class)
通過上面的代碼來保證輸入的文件是按照我們想要的格式被讀取,所有的輸入格式都繼承于InputFormat,這是一個(gè)抽象類,其子類有專門用于讀取普通文件的FileInputFormatt,用于讀取數(shù)據(jù)庫文件的DBInputFromat,用于讀取HBase的TableInputFormat等等。如下圖是InputFormat的圖譜。

2 InputFormat方法
從類圖中可以看出,InputFormat抽象類僅有兩個(gè)抽象方法:
public abstract List<InputSplit> getSplits(JobContext context)
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)
getSplits()方法是邏輯上拆分作業(yè)的輸入文件集,然后將每個(gè)InputSplit分配給一個(gè)單獨(dú)的Mapper進(jìn)行處理
注意:拆分是按輸入文件的邏輯分割,而輸入文件不會(huì)被物理分割成塊。每個(gè)切片都是一個(gè)<input-file-path,start,offset>的元組,InputFormat并創(chuàng)建相應(yīng)的RecordReader讀取這些切片。
createRecordReader()方法是為給定的切片創(chuàng)建一個(gè)記錄閱讀器。在切片被使用之前先調(diào)用RecordReader.initialize(InputSplit, TaskAttemptContext)方法。
通過InputFormat,MapReduce框架可以做到:
- 驗(yàn)證作業(yè)輸入的正確性
- 將輸入的文件切割成邏輯分片(InputSplit),一個(gè)InputSplit將會(huì)分配給一個(gè)獨(dú)立的MapTask
- 提供RecordReader實(shí)現(xiàn),讀取InputSplit中的Kv對(duì)供Mapper使用。
不同的InputFormat會(huì)各自實(shí)現(xiàn)不同的文件讀取方法以及分片方式,每個(gè)輸入分片會(huì)被單獨(dú)的MapTask作為數(shù)據(jù)源。下面將介紹InputSplit和RecordReader。
3 InputSplit介紹
MapTask的輸入是一個(gè)輸入切片,稱為InputSplit。InputSplit也是一個(gè)抽象類,它在邏輯上包含給處理這個(gè)InputSplit的Mapper的所有KV對(duì)。不同類型的輸入格式對(duì)應(yīng)不同類型的切片,下圖是InputSplit的類圖。

3.1 InputSplit方法
// 獲取切片大小,并且根據(jù)size對(duì)切片排序
public abstract long getLength()
// 獲取存儲(chǔ)該分片的數(shù)據(jù)所在的節(jié)點(diǎn)位置,其中的數(shù)據(jù)是本地的,位置信息不需要序列號(hào)
public abstract String[] getLocations()
// 獲取有關(guān)切片在那個(gè)節(jié)點(diǎn)上的信息,以及它是如何存儲(chǔ)在每個(gè)位置的
public SplitLocationInfo[] getLocationInfo()
4 RecordReader
RecorderReader將讀入到Map的數(shù)據(jù)拆分成KV對(duì)。RecorderReader也是一個(gè)抽象類。下面是RecordReader的類圖:

接下來看一下RecordReader的源代碼:
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
/**
* 由一個(gè)InputSplit初始化
*/
public abstract void initialize(InputSplit split,
TaskAttemptContext context
) throws IOException, InterruptedException;
/**
* 讀取分片下一個(gè)KV
*/
public abstract
boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the current key
*/
public abstract
KEYIN getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
*/
public abstract
VALUEIN getCurrentValue() throws IOException, InterruptedException;
/**
* 跟蹤讀取分片的進(jìn)度
*/
public abstract float getProgress() throws IOException, InterruptedException;
/**
* Close the record reader.
*/
public abstract void close() throws IOException;
}