MapReduce數(shù)據(jù)流

1、InputFormat
InputFormat的作用有:
- 驗證作業(yè)輸入的正確性
- 將輸入文件切割成邏輯分片(InputSplit),一個InputSplit將會被分配給一個獨立的MapTask
- 提供RecordReader實現(xiàn),讀取InputSplit中的“K-V對”供Mapper使用
InputFormat抽象類源碼如下:
public abstract class InputFormat<K, V> {
/**
* 僅僅是邏輯分片,并沒有物理分片,所以每一個分片類似于這樣一個元組 <input-file-path, start, offset>
*/
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException, InterruptedException;
/**
* 創(chuàng)建RecordReader,從InputSplit中讀取數(shù)據(jù),解析成<K,V>對
*/
public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException;
}
InputFormat有兩個抽象方法:
- List<InputSplit> getSplits(),將輸入數(shù)據(jù)切分成Split。
- RecordReader<K,V> createRecordReader(),將InputSplit的內(nèi)容轉換為可以作為map輸入的<k,v>鍵值對。
InputFormat主要類圖圖下:

-
FileInputFormat切片機制:
(1)簡單地按照文件的內(nèi)容長度進行切片
(2)切片大小默認等于Block大小
(3)切片時不考慮數(shù)據(jù)整體,而是逐個針對每一個文件單獨切片 -
TextInputFormat切片機制:
切片方式:TextInputFormat是默認的切片機制,按文件規(guī)劃進行切分。比如切片默認為128M,如果一個文件為200M,則會形成兩個切片,一個是128M,一個是72M,啟動兩個MapTask任務進行處理任務。但是如果一個文件只有1M,也會單獨啟動一個MapTask執(zhí)行此任務,如果是10個這樣的小文件,就會啟動10個MapTask處理小文件任務。
讀取方式:TextInputFormat是按行讀取文件的每條記錄,key代表讀取的文件行在該文件中的起始字節(jié)偏移量,key為LongWritable類型;value為讀取的行內(nèi)容,不包括任何行終止符(換行符/回車符), value為Text類型,相當于java中的String類型。 -
CombineFileInputFormat切片機制:
框架默認的TextInputFormat切片機制是對任務按文件規(guī)劃切片,不管文件多小,都會是一個單獨的切片,都會交給一個MapTask,這樣如果有大量小文件,就會產(chǎn)生大量的MapTask,處理效率極其低下。
(1)應用場景:CombineTextInputFormat用于小文件過多的場景,它可以將多個小文件從邏輯上規(guī)劃到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。
(2)切片機制:生成切片過程包括:虛擬存儲過程和切片過程二部分。 -
KeyValueTextInputFormat切片機制:
KeyValueTextInputFormat與TextInputFormat相似,按行讀入記錄,每個文件形成一個切片,但KeyValueTextInputFormat在讀入一行后可以指定切割符,把一行內(nèi)容按切割符分割成鍵值對的形式。
例如:
A-this is a
B-this is b
C-this is c
C-this is c
經(jīng)過mapper階段后被切割成:
(A,this is a)
(B,this is b)
(C,this is c)
(C,this is c)
-
NLineInputForma切片機制:
NLineInputFormat可以指定切分文件時按指定的行數(shù)進行切分,比如文件總行數(shù)為n,切分行數(shù)為N,則切片數(shù)為:如果n/N整除,切片數(shù)為n/N;如果不能整除,切片數(shù)為(n/N + 1)。 -
SequenceFileInputFormat切片機制:
SequenceFileInputFormat只能處理SequenceFile類型的文件,輸出為普通文件
1.1 InputSplit
MapTask的并行度決定Map階段的任務處理并發(fā)度,每一個Split切片分配一個MapTask并行實例處理。默認情況下,切片大小=BlockSize。切片時不考慮數(shù)據(jù)集整體,而是逐個針對每一個文件單獨切片??丛创a可知,InputSplit也是一個抽象類,它在邏輯上包含了提供給處理這個InputSplit的Mapper的所有K-V對。
public abstract class InputSplit {
/**
* 獲取Split的大小,單位是字節(jié)
*/
public abstract long getLength() throws IOException, InterruptedException;
/**
* 返回存儲該數(shù)據(jù)塊的數(shù)據(jù)節(jié)點的名稱,例如:String[0]="Slave1",String[1]="Slave2".
*/
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
InputSplit主要類圖如下:

1.2 RecordReader
RecordReader的作用就是將數(shù)據(jù)切分成key/value的形式然后作為輸入傳給Mapper。
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
/**
* 初始化RecordReader,框架會在開始的時候調(diào)用一次
*/
public abstract void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException;
/**
* 讀取分片下一個<key, value>對
*/
public abstract boolean nextKeyValue() throws IOException,
InterruptedException;
/**
* 獲取當前的key
*/
public abstract KEYIN getCurrentKey() throws IOException,
InterruptedException;
/**
* 獲取當前的value
*/
public abstract VALUEIN getCurrentValue() throws IOException,
InterruptedException;
/**
* 跟蹤讀取分片的進度
*/
public abstract float getProgress() throws IOException,
InterruptedException;
/**
* 關閉RecordReader
*/
public abstract void close() throws IOException;
}
RecordReader主要類圖如下:
