環(huán)境信息
- Flink 1.17.2
- Hive 3.1.0
- Hudi 0.15.0
Flink 需要使用Hudi和Hive,lib目錄添加了hive-exec-3.1.0.jar和hudi-flink1.17-bundle-0.15.0.jar依賴。
錯誤日志
Flink SQL Client向Hudi COW表插入數(shù)據(jù)的時候出現(xiàn)如下錯誤。任務(wù)不停的重啟。
java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder;
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:184)
at org.apache.parquet.avro.AvroSchemaConverter.convertUnion(AvroSchemaConverter.java:256)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:213)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:159)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:275)
at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:153)
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:144)
at org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory.getHoodieAvroWriteSupport(HoodieAvroFileWriterFactory.java:129)
at org.apache.hudi.io.hadoop.HoodieAvroFileWriterFactory.newParquetFileWriter(HoodieAvroFileWriterFactory.java:67)
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriterByFormat(HoodieFileWriterFactory.java:67)
at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:53)
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:108)
at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:84)
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:66)
at org.apache.hudi.io.FlinkCreateHandle.<init>(FlinkCreateHandle.java:59)
at org.apache.hudi.io.FlinkWriteHandleFactory$BaseCommitWriteHandleFactory.create(FlinkWriteHandleFactory.java:121)
at org.apache.hudi.client.HoodieFlinkWriteClient.getOrCreateWriteHandle(HoodieFlinkWriteClient.java:459)
at org.apache.hudi.client.HoodieFlinkWriteClient.access$000(HoodieFlinkWriteClient.java:77)
at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:515)
at org.apache.hudi.client.HoodieFlinkWriteClient$AutoCloseableWriteHandle.<init>(HoodieFlinkWriteClient.java:507)
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:148)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:192)
at org.apache.hudi.sink.StreamWriteFunction.writeBucket(StreamWriteFunction.java:495)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:467)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:463)
at org.apache.hudi.sink.StreamWriteFunction.endInput(StreamWriteFunction.java:157)
at org.apache.hudi.sink.common.AbstractWriteOperator.endInput(AbstractWriteOperator.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:750)
問題分析和解決
經(jīng)過依賴分析發(fā)現(xiàn)是Hudi和hive-exec依賴的parquet-hadoop-bundle版本不同,hive-exec中的版本較老。Flink啟動的時候hive-exec比Hudi的依賴先加載導(dǎo)致問題出現(xiàn)。
解決方案為通過人工干預(yù)使Flink優(yōu)先加載Hudi bundle依賴。一個較為簡單的方式是重命名Hudi bundle jar包名稱使其字母順序在hive-exec之前。例如重命名hudi-flink1.17-bundle-0.15.0.jar為ahudi-flink1.17-bundle-0.15.0.jar。
另一種解決方案
修改packaging/hudi-flink-bundle/pom.xml,在relocations標簽中加入:
<relocation>
<pattern>org.apache.parquet</pattern>
<shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern>
</relocation>
然后重新編譯。
參考鏈接:
https://github.com/apache/hudi/issues/3042
參考資料
https://segmentfault.com/a/1190000045284194
https://blog.csdn.net/m0_66705151/article/details/125781898
https://github.com/apache/hudi/issues/3042