無(wú)論HDFS還是MapReduce,在處理小文件時(shí)效率都非常低,但又難免面臨處理大量小文件的場(chǎng)景,此時(shí),就需要有相應(yīng)解決方案??梢宰远xInputFormat實(shí)現(xiàn)小文件的合并。(對(duì)外是一個(gè)整文件,對(duì)內(nèi)仍是原先的小文件,節(jié)省MapTask)
需求如下:
將多個(gè)小文件合并成一個(gè)SequenceFile文件(SequenceFile文件是Hadoop用來(lái)存儲(chǔ)二進(jìn)制形式的key-value對(duì)的文件格式),SequenceFile里面存儲(chǔ)著多個(gè)文件,存儲(chǔ)的形式為文件路徑+名稱(chēng)為key,文件內(nèi)容為value。
1)輸入數(shù)據(jù)

image
2)期望輸出文件格式

image
步驟

image
程序?qū)崿F(xiàn)
(1)自定義InputFromat
package cn.mark.mrInputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
//存儲(chǔ)的形式為文件路徑+名稱(chēng)為key,文件內(nèi)容為value。讀全部文件用到流,byte
//故 輸入Key類(lèi)型為T(mén)ext,輸入Value類(lèi)型為BytesWritable
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable> {
// 定義類(lèi)繼承FileInputFormat
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;//單個(gè)文件不允許再切片
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(split,context);
return recordReader;
}
}
(2)自定義RecordReader類(lèi)(核心)
package cn.mark.mrInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
//RecordReader<Text, BytesWritable> 固有的輸入KV格式
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
// 主要針對(duì)缺什么補(bǔ)什么
FileSplit split;
Configuration configuration;
Text k = new Text();
BytesWritable v = new BytesWritable();
// 標(biāo)記位
boolean isProgress = true;
//************************************
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
// 初始化
this.split = (FileSplit) split;
configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// 核心業(yè)務(wù)邏輯處理 對(duì)key和value進(jìn)行封裝
if (isProgress){
/** The number of bytes in the file to process. 獲取文件字節(jié)的全部數(shù)量
public long getLength() { return fs.getLength(); } */
byte[] buf = new byte[(int) split.getLength()];
// 1.獲取fs對(duì)象
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);
// 2.獲取輸入流
FSDataInputStream fis = fs.open(path);
// 3.拷貝
// readFully(InputStream in, byte buf[], int off, int len)
// 4各參數(shù): 1.要讀的流 2.目的地 3.讀的大小的起始位置 4.讀的長(zhǎng)度
/**
* Reads len bytes in a loop.
* @param in InputStream to read from
* @param buf The buffer to fill
* @param off offset from the buffer :緩沖區(qū)的偏移量,即開(kāi)始位置
* @param len the length of bytes to read
先開(kāi)辟一段相應(yīng)長(zhǎng)度的字節(jié)緩沖區(qū),再讀內(nèi)容進(jìn)去 */
IOUtils.readFully(fis,buf,0,buf.length);
// 4.封裝v v是文件的內(nèi)容
/**Set the value to a copy of the given byte range
* @param newData the new values to copy in
* @param offset the offset in newData to start at
* @param length the number of bytes to copy
public void set(byte[] newData, int offset, int length)
再將之前的緩沖區(qū)內(nèi)容通過(guò)set方法,設(shè)置成v的值 */
v.set(buf,0,buf.length);
// 5.封裝K ,k本身就是路徑名 path.toString():既有路徑又有文件名稱(chēng)
k.set(path.toString());
// 6.關(guān)閉資源
IOUtils.closeStream(fis);
// 能進(jìn)來(lái)說(shuō)明能讀到數(shù)據(jù),而且每次調(diào)用nextKeyValue函數(shù)時(shí)候是說(shuō)明已經(jīng)新讀一個(gè)文件,
// 本W(wǎng)holeRecordReader類(lèi)會(huì)重新創(chuàng)建對(duì)象,重新初始化,isProgress都會(huì)重新設(shè)為true
isProgress = false;//說(shuō)明本文件已經(jīng)讀完!
return true; //只有return true才會(huì)執(zhí)行下面的函數(shù)
/** public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) { //<往常只讀一行,有數(shù)據(jù)則true>
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
} 下如果只寫(xiě)ture 則會(huì)無(wú)限循環(huán),如果只寫(xiě)false則會(huì)不進(jìn)循環(huán),不進(jìn)行讀寫(xiě)操作,
故需要一個(gè)標(biāo)記*/
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return k;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return v;
}
//***********************************
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
(3)編寫(xiě)SequenceFileMapper類(lèi)處理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SequenceFileMapper extends Mapper<Text, BytesWritable,Text,BytesWritable> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
// 不是一次讀取一行,是一次讀取一整個(gè)文件
context.write(key,value);
}
}
(4)編寫(xiě)SequenceFileReducer類(lèi)處理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 要知道傳過(guò)來(lái)的是什么數(shù)據(jù)及其類(lèi)型
// 傳過(guò)來(lái)a.txt , b.txt 輸出 : <路徑名文件名,文件內(nèi)容>
public class SequenceFileReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
// 循環(huán)寫(xiě)出 每次都是一個(gè)文件的全部?jī)?nèi)容
for (BytesWritable value :
values) {
context.write(key,value);
}
}
}
(5)編寫(xiě)SequenceFileDriver類(lèi)處理流程
package cn.mark.mrInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 輸入輸出路徑需要根據(jù)自己電腦上實(shí)際的輸入輸出路徑設(shè)置
args = new String[] { "C:\\Users\\Administrator\\Downloads\\input\\123",
"C:\\Users\\Administrator\\Downloads\\input\\output" };
// 1 獲取job對(duì)象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 設(shè)置jar包存儲(chǔ)位置、關(guān)聯(lián)自定義的mapper和reducer
job.setJarByClass(SequenceFileDriver.class);
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 7設(shè)置輸入的inputFormat
job.setInputFormatClass(WholeFileInputformat.class);
// 8設(shè)置輸出的outputFormat 默認(rèn)是Text.class
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 3 設(shè)置map輸出端的kv類(lèi)型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 4 設(shè)置最終輸出端的kv類(lèi)型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 5 設(shè)置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 6 提交job
job.waitForCompletion(true);
}
}

image