spark hiveUDF transient的重要性

背景

最近在寫(xiě)hiveUDF的時(shí)候,遇到了一些反序列的問(wèn)題,具體的報(bào)錯(cuò)如下:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 6.0 failed 4 times, most recent failure: Lost task 11.3 in stage 6.0 (TID 105) (dw-csprd-bigdata-athena-dn-096.shizhuang-inc.com executor 3): com.esotericsoftware.kryo.KryoException: Unable to find class: scala.collection.immutable.HashMap$$$Lambda$9/282828951
Serialization trace:
mergef$1 (scala.collection.immutable.HashMap$$anon$1)
defaultMerger (scala.collection.immutable.HashMap$)
_2 (scala.Tuple2)
head (scala.collection.immutable.$colon$colon)
factories (com.fasterxml.jackson.module.scala.deser.UnsortedMapDeserializerModule$$anon$1)
_additionalDeserializers (com.fasterxml.jackson.databind.cfg.DeserializerFactoryConfig)
_factoryConfig (com.fasterxml.jackson.databind.deser.BeanDeserializerFactory)
_factory (com.fasterxml.jackson.databind.deser.DefaultDeserializationContext$Impl)
_deserializationContext (com.fasterxml.jackson.databind.ObjectMapper)
objectMapper (xxx.xxxUDF)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:160)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:133)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:693)
    at org.apache.hadoop.hive.ql.exec.SerializationUtilities$KryoWithHooks.readClass(SerializationUtilities.java:181)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:118)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)

分析

我們的代碼類(lèi)似如下:

class xxxUDF extends GenericUDF {
  @transient
  var argumentOIs: Array[ObjectInspector] = _

  val objectMapper = new ObjectMapper()
  objectMapper.registerModule(DefaultScalaModule)
  objectMapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
  objectMapper.configure(Feature.ALLOW_SINGLE_QUOTES, true)

  val result: Text = new Text()

其中spark的配置是使用kryo序列化,spark.serializer=org.apache.spark.serializer.KryoSerializer

可以看到是objectMapper類(lèi)在driver端在把UDF傳給executor的時(shí)候,需要做UDF的序列化,而序列化的時(shí)候,就會(huì)把objectMapper字段進(jìn)行序列化,
這樣在executor端進(jìn)行task.run的時(shí)候會(huì)把 objectMapper反序列化出來(lái),這個(gè)時(shí)候如果對(duì)應(yīng)的類(lèi)的成員方法如果沒(méi)有進(jìn)行kryo的注冊(cè),就會(huì)直接報(bào)序列化的錯(cuò)誤,
而spark目前的默認(rèn)注冊(cè)的kryo類(lèi)在KryoSerializer.scala中,如下:

...
 kryo.register(None.getClass)
 kryo.register(Nil.getClass)
 kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon"))
 kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
...

解決

在objectMapper加上@transient注解,使該對(duì)象不被序列化,這樣在反序列化的時(shí)候,就不會(huì)反序列化該對(duì)象

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容