記一次序列化導(dǎo)致的MR統(tǒng)計(jì)問(wèn)題

類似簡(jiǎn)單的wordcount,MapReduce輸出的統(tǒng)計(jì)結(jié)果卻不符預(yù)期。部分key丟失了,未丟失的key,其value值也不對(duì)。
經(jīng)查,是因?yàn)榇a中指定了SortComparator,導(dǎo)致key比較時(shí)所用的反序列化方式與序列化方式不符,這影響了map在merge時(shí)對(duì)key的排序,也影響了reduce對(duì)key的分組。

下面結(jié)合代碼,對(duì)以下步驟進(jìn)行梳理(代碼來(lái)自hadoop-mapreduce-client-core-2.6.0-cdh5.5.0, hadoop-common-2.6.0-cdh5.5.0, avro-1.7.6)。

  1. map端key的序列化器;
  2. map端merge時(shí)key的比較器;
  3. reduce端對(duì)key的分組;
  4. reduce端key分組時(shí)的比較器;
  5. key的比較。

map端key的序列化器——keySerializer

Map的輸出主要由org.apache.hadoop.mapred.MapTask的內(nèi)部類MapOutputBuffer實(shí)現(xiàn),MapOutputBuffer使用keySerializer對(duì)key進(jìn)行序列化。

keySerializer是什么呢?
keySerializer本身是個(gè)接口,具體實(shí)現(xiàn)類是在MapOutPutBuffer的init()方法中指定的,代碼如下:

public void init(Context context) throws ... {
  ...
  this.keySerializer = this.serializationFactory.getSerializer(this.keyClass);
  ...
}

SerializationFactory又是如何根據(jù)keyClass得到對(duì)應(yīng)的Serializer的呢?
SerializationFactory維護(hù)了一個(gè)鏈表serializations,SerializationFactory會(huì)依次遍歷serializations中的serialization,找到第一個(gè)accept keyclass的serialization。代碼如下:

public <T> Serialization<T> getSerialization(Class<T> c) {
    Iterator i$ = this.serializations.iterator();

    Serialization serialization;
    do {
      if(!i$.hasNext()) {
        return null;
      }

      serialization = (Serialization)i$.next();
    } while(!serialization.accept(c)); //是否accept class

    return serialization;
  }

serializations又有哪些呢?
SerializationFactory的構(gòu)造函數(shù)會(huì)讀取io.serializations配置來(lái)初始化serializations。若沒(méi)有配置io.serializations,則取默認(rèn)的serializations,依次為WritableSerialization、AvroSpecificSerialization和AvroReflectSerialization。代碼如下:

private List<Serialization<?>> serializations = new ArrayList();

public SerializationFactory(Configuration conf) {
  ...
  String[] arr$ = conf.getStrings("io.serializations", new String[]{WritableSerialization.class.getName(), AvroSpecificSerialization.class.getName(), AvroReflectSerialization.class.getName()});
  ...
}

一個(gè)serialization是否accept某個(gè)class,則是由具體的serialization自己實(shí)現(xiàn)的,如WritableSerialization,其接受的是Writable的子類。WritableSerialization的accept()代碼如下:

public boolean accept(Class<?> c) {
    return Writable.class.isAssignableFrom(c);
}

map端merge時(shí)key的比較器

MapOutputBuffer在對(duì)數(shù)據(jù)進(jìn)行merge時(shí),使用了比較器對(duì)key進(jìn)行排序。

Merger.merge(this.job, this.rfs, this.keyClass, this.valClass, this.codec, segmentList, this.job.getInt("io.sort.factor", 100), new Path(mapId.toString()), 
this.job.getOutputKeyComparator(),  //獲取key的比較器
this.reporter, (Counter)null, this.mapTask.spilledRecordsCounter);

getOutputKeyComparator()獲取的又是什么呢?獲取的是mapreduce.job.output.key.comparator.class配置的比較器。

reduce端對(duì)key的分組

reduce端按key分組處理的主流程在org.apache.hadoop.mapreduce.Reducer的run()方法中(這個(gè)Reducer就是我們?cè)趯慚R應(yīng)用程序reduce的部分時(shí),繼承的Reducer),run()方法的代碼如下:

public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws ... {
    this.setup(context);

    try {
      while(context.nextKey()) {
        this.reduce(context.getCurrentKey(), context.getValues(), context);
        ...
      }
    } finally {
      this.cleanup(context);
    }

  }

這個(gè)run()方法里的setup(), reduce()和cleanup()就是經(jīng)常被我們重寫的三個(gè)方法。

key分組的關(guān)鍵,在于理解context的具體實(shí)現(xiàn)——ReduceContextImpl。

ReduceContextImpl

ReduceContextImpl的關(guān)鍵成員如下(其中KEYIN和VALUEIN都是泛型)。

public class ReduceContextImpl ... {
  private KEYIN key;
  private VALUEIN value;
  private boolean nextKeyIsSame = false;
  public boolean nextKey();
  public boolean nextKeyValue();
  protected class ValueIterator ... {
   public boolean hasNext();
   public VALUEIN next();
  }
}

其中,四個(gè)主要方法的含義如下

  1. nextKeyValue():讀取下對(duì)(key, value)
  2. nextKey():讀取下一個(gè)key,其實(shí)是調(diào)用了nextKeyValue(),同時(shí)也讀取了下一個(gè)value(其實(shí),map寫過(guò)來(lái)的就是一對(duì)對(duì)(key, value),讀的話,也應(yīng)該是一對(duì)對(duì)的讀)
  3. values.hasNext():即ValueIterator.hasNext(),判斷是否還有下一個(gè)value
  4. values.next():即ValueIterator.next(),讀取下一個(gè)value,其實(shí)也是調(diào)用了nextKeyValue(),同時(shí)也讀取了下一個(gè)key

nextKeyValue()每次讀取下一對(duì)(key, value),都將key值和value值存儲(chǔ)在KEYIN keyVALUEIN value中(這就是reduce端,key/value的復(fù)用)。

這里主要闡述ReduceContextImpl 是如何實(shí)現(xiàn)對(duì)key的分組處理。為了說(shuō)明方便,這里將上文中Reducer.run()方法展開如下(其中,假設(shè)用戶實(shí)現(xiàn)的reduce()使用了iterator的形式對(duì)value進(jìn)行了遍歷)。

public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws ... {
  while(context.nextKey()) { //外部while
    while(values.hasNext()) { // 內(nèi)部while,即用戶重寫的reduce()方法
      context.write(key, values.next());
    }
    ...
  }
}

key的分組處理,關(guān)鍵在于nextKeyValue()方法。上述代碼中,未直接體現(xiàn)nextKeyValue()方法,而我們知道values.next()調(diào)用了該方法。nextKeyValue()方法在讀取到當(dāng)前key后,會(huì)將當(dāng)前key與下一個(gè)將要處理的key進(jìn)行比較,若發(fā)現(xiàn)不是同一個(gè)key,就會(huì)將nextKeyIsSame置為false。此后,values.hasNext()方法發(fā)現(xiàn)nextKeyIsSame為false,也會(huì)返回false,從而跳出循環(huán),結(jié)束一個(gè)key的處理。這樣,就保證了每一次內(nèi)部while,處理的都是同一個(gè)key。
下面,將幾處關(guān)鍵代碼摘要如下(這里面省略了firstValue以及很多其他的處理邏輯)。

public boolean nextKeyValue() {
  DataInputBuffer nextKey = this.input.getKey(); //獲取當(dāng)前key
  this.currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());
  this.key = this.keyDeserializer.deserialize(this.key); //反序列化當(dāng)前key
  
  DataInputBuffer nextVal = this.input.getValue(); //獲取當(dāng)前value
  this.value = this.valueDeserializer.deserialize(this.value); //反序列化當(dāng)前value
  
  this.hasMore = this.input.next(); //是否還有下一個(gè)(key, value)
  if(this.hasMore) {
    //如果有下一個(gè)(key,, value),獲取下一個(gè)key
    nextKey = this.input.getKey();
    //比較當(dāng)前key和下一個(gè)key,設(shè)置nextKeyIsSame
    this.nextKeyIsSame = this.comparator.compare(this.currentRawKey.getBytes(), 0, this.currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()) == 0;
  } else {
    this.nextKeyIsSame = false;
  }
}

public boolean hasNext() {
  ...
  return ReduceContextImpl.this.nextKeyIsSame;
}

reduce端key分組時(shí)的比較器

我們注意到,nextKeyValue()在比較當(dāng)前key和下一個(gè)key時(shí),用到了一個(gè)比較器。這個(gè)比較器是在org.apache.hadoop.mapred.ReduceTask.run()方法中獲取的。

public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws ... {
      ...
      RawComparator comparator = job.getOutputValueGroupingComparator(); // 獲取比較器
      if(useNewApi) {
        this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
      } else {
        ...
      }
      ...
}

getOutputValueGroupingComparator()按如下優(yōu)先級(jí)獲取比較器:

  1. 首先,獲取mapreduce.job.output.group.comparator.class配置的分組比較器;
  2. 其次,獲取mapreduce.job.output.key.comparator.class配置的key比較器(這里就和上文的“map端merge時(shí)key的比較器”相同了);
  3. 最后,獲取如Text、IntWritable這些類型自己定義的比較器。

這里再對(duì)第三點(diǎn)做點(diǎn)補(bǔ)充說(shuō)明,像Text、IntWritable這些類型,都有一段static代碼,將自己定義的Comparator注冊(cè)到WritableComparator中。

static {
  // register this comparator
  WritableComparator.define(Text.class, new Comparator());
}

key的比較

在對(duì)key進(jìn)行比較時(shí),是根據(jù)序列化后的字節(jié)碼進(jìn)行比較的。comparator.compare()的形式如下:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);

假設(shè)key的類型是Text,獲取到的比較器是Text定義的Comparator。Text在比較時(shí),會(huì)先讀取一個(gè)整數(shù),這個(gè)整數(shù)存儲(chǔ)了Text的長(zhǎng)度,再根據(jù)這個(gè)長(zhǎng)度比較Text的內(nèi)容(也就是說(shuō)Text在存儲(chǔ)的時(shí)候,先是存儲(chǔ)了Text的長(zhǎng)度,再存儲(chǔ)Text內(nèi)容本身)。

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  int n1 = WritableUtils.decodeVIntSize(b1[s1]);
  int n2 = WritableUtils.decodeVIntSize(b2[s2]);
  return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}

可以看到,在對(duì)Text類型的key進(jìn)行比較時(shí),沒(méi)有將整個(gè)Text都反序列化,而時(shí)盡量在字節(jié)碼的層面上進(jìn)行比較。

經(jīng)過(guò)上面的梳理,我們知道了

  1. key在map端輸出時(shí),用keySeriailzer進(jìn)行了序列化;
  2. key在map端merge時(shí),用comparator在序列化后的結(jié)果上進(jìn)行比較;
  3. key在reduce端分組時(shí),用comparator在序列化后的結(jié)果上進(jìn)行比較;
  4. comparator在比較的時(shí)候,因?yàn)榛诘氖切蛄谢蟮慕Y(jié)果,所以涉及到一些反序列化。

下面,我們來(lái)詳細(xì)闡述下本文開頭的問(wèn)題。

問(wèn)題是如何發(fā)生的?

問(wèn)題代碼中

  1. 配置了用戶自定義的SortComparator(也就是配置了mapred.output.key.comparator.class)。這個(gè)用戶定義的比較器在比較時(shí),用的是org.apache.avro.io.BinaryData的compare,而BinaryData在比較時(shí),序列化和反序列化用的是BinaryEncoder和BinaryDecoder。
  2. key是Text類型,沒(méi)有對(duì)keySerializer進(jìn)行特殊配置,所以獲取到的是默認(rèn)的WritableSerialization,最終調(diào)用的是Text的序列化方法。

這導(dǎo)致key使用Text進(jìn)行了序列化,但在比較時(shí),卻嘗試使用BinaryDecoder進(jìn)行反序列化。序列化和反序列化不符,導(dǎo)致map端的merge和reduce端的key分組,在進(jìn)行比較時(shí)都出錯(cuò)了,所以最后統(tǒng)計(jì)結(jié)果不對(duì)。

最后再補(bǔ)充一些細(xì)節(jié)。

BinaryData的比較方法

BinaryData是根據(jù)BinaryDecoder(可以讀取key的值)和key的schma進(jìn)行比較的。這里的schema在avro中用來(lái)標(biāo)識(shí)類型,如RECORD可理解為復(fù)雜的Class類型,而SRING、LONG等的含義顯而易見。
下面的代碼摘取了對(duì)RECORD和STRING類型進(jìn)行比較時(shí)的一些細(xì)節(jié)(省略了一些和avro Field處理有關(guān)的細(xì)節(jié))。我們可以看到,在對(duì)RECORD進(jìn)行處理時(shí),使用了遞歸的方式,并且在遇到第一個(gè)不相等的字段時(shí),便返回結(jié)果,不再處理后續(xù)的字節(jié)碼,也就節(jié)省了部分反序列化的時(shí)間。而對(duì)STRING的處理,和Text很相似,都是先讀取文本的長(zhǎng)度,再根據(jù)該長(zhǎng)度比較文本本身。

private static int compare(Decoders d, Schema schema) throws IOException {
    Decoder d1 = d.d1; Decoder d2 = d.d2;
    switch (schema.getType()) {
    case RECORD: {
      for (Field field : schema.getFields()) {
        int c = compare(d, field.schema()); //遞歸調(diào)用
        if (c != 0) //一旦遇到一個(gè)字段不相等,直接返回
          return (field.order() != Field.Order.DESCENDING) ? c : -c;
      }
      return 0;
    }
  ...
    case STRING: {
      int l1 = d1.readInt();
      int l2 = d2.readInt();
      int c = compareBytes(d.d1.getBuf(), d.d1.getPos(), l1,
                           d.d2.getBuf(), d.d2.getPos(), l2);
      d.d1.skipFixed(l1);
      d.d2.skipFixed(l2);
      return c;
    }
...

BinaryData的序列化方式

可見avro官網(wǎng)的Binary Encoding一節(jié)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容