1 切片與MapTask并行度決定機(jī)制
- MapTask并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響Job的處理速度
- MapTask并行度決定機(jī)制
數(shù)據(jù)塊:blocks是hdfs在磁盤上對(duì)數(shù)據(jù)進(jìn)行的劃分;
數(shù)據(jù)切片:數(shù)據(jù)切片僅僅是在邏輯上對(duì)數(shù)據(jù)進(jìn)行劃分,不會(huì)在磁盤上對(duì)數(shù)據(jù)進(jìn)改變
2.1 一個(gè)Job的Map的并行度由客戶端在提交Job時(shí)的切片數(shù)決定
2.2 每的split切片分配一個(gè)MapTask并行實(shí)例處理
2.3 默認(rèn)情況下,切片大小=blocksize
2.4 切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片
2. FileinputFormat
簡(jiǎn)單的按照文件長度和切片大小進(jìn)行切片
例:file1 300M ,file2 10M;此時(shí)切片情況:file1.split1 0-128M,file1.split2 128-256M,file1.split3 256-320M,file2.split 0-10M。切成四片
切片大小的公式 MATH.max(minSize,Math.min(maxSize,blockSize));
切片大小 調(diào)大改變minSize調(diào)小改變maxSize
獲取切片信息API:String Name = inputSplit.getPath().getName();
FileSplit inputSplit = (FileSplit) context.getInputSplit();
3. CombineTextInputFormat
用于小文件過多的場(chǎng)景,可以將多個(gè)小文件在邏輯上規(guī)劃為一個(gè)大文件;將多個(gè)小文件交給一個(gè)大文件處理。
切片方法:
| 虛擬存儲(chǔ)過程 | |
|---|---|
| a.txt 1.7M | 1.7M<4M 劃分一塊 |
| b.txt 5.1M | 5.1M>4M但是小于2* 4M 平均劃分成兩塊 |
| c.txt 3.4M | 3.4M<4M 劃分一塊 |
| d.txt 3.4M | 6.8M>4M但是小于2* 4M 平均劃分成兩塊 |
切片過程:(a)判斷虛擬存儲(chǔ)的文件大小是否大于setMaxInputSplitSize的值大于大于等于則單獨(dú)形成一個(gè)切片。
(b)如果不大于則跟下一個(gè)虛擬存儲(chǔ)文件進(jìn)行合并,共同形成一個(gè)切片。
4. TextInputFormat
TextInputFormat是FileInputFormat的默認(rèn)實(shí)現(xiàn)類。按行讀取記錄;讀取到的記錄存在<k,v>中。
k:整個(gè)文件中每行起始字節(jié)偏移量,LongWriter類型。
v:每行的內(nèi)容,不包括終止符(回車符、換行符),Text類型
5. KeyValueTextInputFormat
按照指定分隔符的形式,按行分割字符串。
如果一行當(dāng)中存在多個(gè)指定分隔符,只有第一個(gè)有效。
相關(guān)代碼public class KVmap extends Mapper<Text, Text, Text, IntWritable> { IntWritable v = new IntWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, v); } }public class KVReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text k, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int sum = 0; for (IntWritable v : values) { sum +=v.get(); } IntWritable i = new IntWritable(sum); context.write(k, i); } }public class KVDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " "); Job job = Job.getInstance(conf); job.setJarByClass(KVDriver.class); job.setMapperClass(KVmap.class); job.setReducerClass(KVReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(KeyValueTextInputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }輸入文件信息:身邊有人來有人走。 有人去去就回。 有人頭也不回。 我會(huì)釋懷。 我知道, 有人走了, 就會(huì)再有來人替代他。 多年后, 或許我還會(huì)記得, 有人在我的世界里出現(xiàn)過。 或許潮起潮落, 或許波瀾未起。
結(jié)果:身邊有人來有人走。 1
6. NLineInputFormat
如果使用NLineInputFormat,代表每個(gè)map處理inputsplit不再按block快去劃分,而是按NLineInputFormat指定的行數(shù)來劃分。輸入文件的行數(shù)/N=切片數(shù)。
public class NLineInputFormatMap extends
Mapper<LongWritable, Text, Text, IntWritable>{
String[] strs;
IntWritable lon = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException
{
strs = value.toString().split(" ");
for (String str : strs) {
Text text = new Text(str);
context.write(text, lon );
}
}
}
public class NLineInputFormatReduce extends Reducer<Text,
IntWritable, Text, IntWritable>{
IntWritable value = new IntWritable();
@Override
protected void reduce(Text arg0,
Iterable<IntWritable> arg1,Context arg2) throws
IOException, InterruptedException {
int i = 0;
for (IntWritable IntWritable : arg1) {
i += IntWritable.get();
}
value.set(i);
arg2.write(arg0, value);
}
}
Configuration conf = new Configuration();
//1. 獲取job對(duì)象
Job job = Job.getInstance(conf);
NLineInputFormat.setNumLinesPerSplit(job, 3);
//設(shè)置jar存儲(chǔ)位置
job.setJarByClass(NLineInputFormatDriver.class);
//3.關(guān)聯(lián)map和reduce
job.setMapperClass(NLineInputFormatMap.class);
job.setReducerClass(NLineInputFormatReduce.class);
//4.設(shè)置map階段輸出的key和value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.設(shè)置最終數(shù)據(jù)的輸出形式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設(shè)置文件的輸入形式
job.setInputFormatClass(NLineInputFormat.class);
//6設(shè)置輸入輸出路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7.提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);