1. InputFormat 數(shù)據(jù)輸入

1 切片與MapTask并行度決定機(jī)制

  1. MapTask并行度決定Map階段的任務(wù)處理并發(fā)度,進(jìn)而影響Job的處理速度
  2. MapTask并行度決定機(jī)制

數(shù)據(jù)塊:blocks是hdfs在磁盤上對(duì)數(shù)據(jù)進(jìn)行的劃分;
數(shù)據(jù)切片:數(shù)據(jù)切片僅僅是在邏輯上對(duì)數(shù)據(jù)進(jìn)行劃分,不會(huì)在磁盤上對(duì)數(shù)據(jù)進(jìn)改變
2.1 一個(gè)Job的Map的并行度由客戶端在提交Job時(shí)的切片數(shù)決定
2.2 每的split切片分配一個(gè)MapTask并行實(shí)例處理
2.3 默認(rèn)情況下,切片大小=blocksize
2.4 切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片

2. FileinputFormat

簡(jiǎn)單的按照文件長度和切片大小進(jìn)行切片
例:file1 300M ,file2 10M;此時(shí)切片情況:file1.split1 0-128M,file1.split2 128-256M,file1.split3 256-320M,file2.split 0-10M。切成四片
切片大小的公式 MATH.max(minSize,Math.min(maxSize,blockSize));
切片大小 調(diào)大改變minSize 調(diào)小改變maxSize
獲取切片信息API:

String Name = inputSplit.getPath().getName();
FileSplit inputSplit = (FileSplit) context.getInputSplit();

3. CombineTextInputFormat

用于小文件過多的場(chǎng)景,可以將多個(gè)小文件在邏輯上規(guī)劃為一個(gè)大文件;將多個(gè)小文件交給一個(gè)大文件處理。
切片方法:

虛擬存儲(chǔ)過程
a.txt 1.7M 1.7M<4M 劃分一塊
b.txt 5.1M 5.1M>4M但是小于2* 4M 平均劃分成兩塊
c.txt 3.4M 3.4M<4M 劃分一塊
d.txt 3.4M 6.8M>4M但是小于2* 4M 平均劃分成兩塊

切片過程:(a)判斷虛擬存儲(chǔ)的文件大小是否大于setMaxInputSplitSize的值大于大于等于則單獨(dú)形成一個(gè)切片。
(b)如果不大于則跟下一個(gè)虛擬存儲(chǔ)文件進(jìn)行合并,共同形成一個(gè)切片。
4. TextInputFormat

TextInputFormat是FileInputFormat的默認(rèn)實(shí)現(xiàn)類。按行讀取記錄;讀取到的記錄存在<k,v>中。
k:整個(gè)文件中每行起始字節(jié)偏移量,LongWriter類型。
v:每行的內(nèi)容,不包括終止符(回車符、換行符),Text類型

5. KeyValueTextInputFormat

按照指定分隔符的形式,按行分割字符串。
如果一行當(dāng)中存在多個(gè)指定分隔符,只有第一個(gè)有效。
相關(guān)代碼

public class KVmap extends Mapper<Text, Text, Text, IntWritable> {
     IntWritable v = new IntWritable(1);
      @Override
     protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
           context.write(key, v);
      }
}
public class KVReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
     @Override
     protected void reduce(Text k, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
         int sum = 0;
         for (IntWritable v : values) {
             sum +=v.get();
        }
         IntWritable i  = new IntWritable(sum);
         context.write(k, i);
    }
}
public class KVDriver {
    public static void main(String[] args) throws 
IOException, ClassNotFoundException, InterruptedException 
{
         Configuration conf = new Configuration();
         conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
         Job job = Job.getInstance(conf);
         job.setJarByClass(KVDriver.class);
         job.setMapperClass(KVmap.class);
         job.setReducerClass(KVReduce.class);
         job.setMapOutputKeyClass(Text.class);
         job.setMapOutputValueClass(IntWritable.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(IntWritable.class);
         job.setInputFormatClass(KeyValueTextInputFormat.class);
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
         job.waitForCompletion(true);
     }
}

輸入文件信息:身邊有人來有人走。 有人去去就回。 有人頭也不回。 我會(huì)釋懷。 我知道, 有人走了, 就會(huì)再有來人替代他。 多年后, 或許我還會(huì)記得, 有人在我的世界里出現(xiàn)過。 或許潮起潮落, 或許波瀾未起。
結(jié)果:身邊有人來有人走。 1

6. NLineInputFormat

如果使用NLineInputFormat,代表每個(gè)map處理inputsplit不再按block快去劃分,而是按NLineInputFormat指定的行數(shù)來劃分。輸入文件的行數(shù)/N=切片數(shù)。

 public class NLineInputFormatMap extends 
 Mapper<LongWritable, Text, Text, IntWritable>{
      String[] strs;
     IntWritable lon = new IntWritable(1);
     @Override
     protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException 
{
          strs = value.toString().split(" ");
          for (String str : strs) {
              Text text = new Text(str);
              context.write(text, lon );
          }
     }
}

 public class NLineInputFormatReduce extends Reducer<Text, 
IntWritable, Text, IntWritable>{
     IntWritable value = new IntWritable();
     @Override
     protected void reduce(Text arg0, 
Iterable<IntWritable> arg1,Context arg2) throws 
IOException, InterruptedException {
          int i = 0;
          for (IntWritable IntWritable : arg1) {
              i += IntWritable.get();
          }
          value.set(i);
          arg2.write(arg0, value);
     }
}
          Configuration conf = new Configuration();
          //1. 獲取job對(duì)象
          Job job = Job.getInstance(conf);
          NLineInputFormat.setNumLinesPerSplit(job, 3);
          //設(shè)置jar存儲(chǔ)位置
          job.setJarByClass(NLineInputFormatDriver.class);
          //3.關(guān)聯(lián)map和reduce
          job.setMapperClass(NLineInputFormatMap.class);
          job.setReducerClass(NLineInputFormatReduce.class);
          //4.設(shè)置map階段輸出的key和value
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(IntWritable.class);
          //5.設(shè)置最終數(shù)據(jù)的輸出形式
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
          //設(shè)置文件的輸入形式
          job.setInputFormatClass(NLineInputFormat.class);
          //6設(shè)置輸入輸出路徑
          FileInputFormat.setInputPaths(job, new Path(args[0]));
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
          //7.提交
          boolean result = job.waitForCompletion(true);
          System.exit(result ? 0 : 1);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容