Spark寫入es:EsHadoopRemoteException: version_conflict_engine_exception

背景介紹

  • 業(yè)務(wù)場景:spark批量寫入es,基于es-hadoop組件實現(xiàn)
  • 批處理任務(wù)定時調(diào)度
  • cdh5.5.3集群,spark2.3,elasticsearch6.4.3
  • es中對應(yīng)索引的_id由程序控制,保證全局唯一
  • 僅測試環(huán)境出現(xiàn),且很難復(fù)現(xiàn)

問題描述

完整報錯信息如下:

19/05/20 11:08:54 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 24.0 failed 4 times, most recent failure: Lost task 2.3 in stage 24.0 (TID 849, p016d052n01, executor 6): org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [24/1000]. Error sample (first [5] error messages):
    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZVIK_2462056_2019-05-18]: version conflict, document already exists (current version [1])
    {"update":{"_id":"OZVIK_2462056_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZVIK_2462056_2019-05-18","product_no":"OZVIK","cust_id":"2462056","p106":32,"p107":61,"p108":55,"p109":"YGM6E","p110":1,"p111":46,"p112":11126,"p113":189,"p114":70,"p115":6,"p116":60,"p117":"male","p118":"gg","p119":19,"p120":2,"p121":1544025600000,"p122":69,"p123":"FL0SS","dt":"2019-05-18","absum01":71,"testday01":76,"testday02":11202,"testday03":"7611202","testday04":"70male","testday04_2":22404,"testday05":"761120270male761120222404","amount01":"YGM6E2462056","amount02":22252,"amount03":"OZVIK","aa":11197,"testb21":93,"fix_const_999_0222":999,"0304tf":"999 2462056 YGM6E","0305test_long":11173,"hello":87,"datetest":"2019-05-18","binarytest":32,"nestedtest":"YGM6E","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZVIK","floattest02":1,"__namelist_54":"0"}}

    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][OZWTC_148752_2019-05-18]: version conflict, document already exists (current version [1])
    {"update":{"_id":"OZWTC_148752_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"OZWTC_148752_2019-05-18","product_no":"OZWTC","cust_id":"148752","p106":88,"p107":20,"p108":13,"p109":"3BIW6","p110":1,"p111":79,"p112":15107,"p113":183,"p114":62,"p115":85,"p116":68,"p117":"female","p118":"nn","p119":51,"p120":80,"p121":1534867200000,"p122":87,"p123":"VOG2J","dt":"2019-05-18","absum01":63,"testday01":147,"testday02":15254,"testday03":"14715254","testday04":"62female","testday04_2":30508,"testday05":"1471525462female1471525430508","amount01":"3BIW6148752","amount02":30214,"amount03":"OZWTC","aa":15170,"testb21":108,"fix_const_999_0222":999,"0304tf":"999 148752 3BIW6","0305test_long":15187,"hello":101,"datetest":"2019-05-18","binarytest":88,"nestedtest":"3BIW6","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"OZWTC","floattest02":1,"__namelist_54":"0"}}

    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P08Y7_3310671_2019-05-18]: version conflict, document already exists (current version [1])
    {"update":{"_id":"P08Y7_3310671_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P08Y7_3310671_2019-05-18","product_no":"P08Y7","cust_id":"3310671","p106":27,"p107":62,"p108":40,"p109":"5JPCP","p110":0,"p111":93,"p112":17036,"p113":185,"p114":68,"p115":54,"p116":24,"p117":"female","p118":"aa","p119":43,"p120":88,"p121":1536508800000,"p122":43,"p123":"HI31Q","dt":"2019-05-18","absum01":68,"testday01":122,"testday02":17158,"testday03":"12217158","testday04":"68female","testday04_2":34316,"testday05":"1221715868female1221715834316","amount01":"5JPCP3310671","amount02":34072,"amount03":"P08Y7","aa":17104,"testb21":89,"fix_const_999_0222":999,"0304tf":"999 3310671 5JPCP","0305test_long":17129,"hello":67,"datetest":"2019-05-18","binarytest":27,"nestedtest":"5JPCP","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P08Y7","floattest02":0,"__namelist_54":"0"}}

    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P0TI9_8523_2019-05-18]: version conflict, document already exists (current version [1])
    {"update":{"_id":"P0TI9_8523_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P0TI9_8523_2019-05-18","product_no":"P0TI9","cust_id":"8523","p106":20,"p107":68,"p108":36,"p109":"YIP72","p110":0,"p111":24,"p112":13632,"p113":197,"p114":73,"p115":70,"p116":90,"p117":"male","p118":"aa","p119":75,"p120":11,"p121":1532361600000,"p122":82,"p123":"8KUUS","dt":"2019-05-18","absum01":73,"testday01":143,"testday02":13775,"testday03":"14313775","testday04":"73male","testday04_2":27550,"testday05":"1431377573male1431377527550","amount01":"YIP728523","amount02":27264,"amount03":"P0TI9","aa":13705,"testb21":88,"fix_const_999_0222":999,"0304tf":"999 8523 YIP72","0305test_long":13656,"hello":56,"datetest":"2019-05-18","binarytest":20,"nestedtest":"YIP72","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P0TI9","floattest02":0,"__namelist_54":"0"}}

    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [offline_quota_library_s][P1J8O_2619118_2019-05-18]: version conflict, document already exists (current version [1])
    {"update":{"_id":"P1J8O_2619118_2019-05-18"}}
{"doc_as_upsert":true,"doc":{"id":"P1J8O_2619118_2019-05-18","product_no":"P1J8O","cust_id":"2619118","p106":99,"p107":57,"p108":53,"p109":"NR3QD","p110":1,"p111":83,"p112":17171,"p113":157,"p114":55,"p115":8,"p116":20,"p117":"male","p118":"oo","p119":42,"p120":4,"p121":1516636800000,"p122":62,"p123":"FO4IS","dt":"2019-05-18","absum01":56,"testday01":63,"testday02":17234,"testday03":"6317234","testday04":"55male","testday04_2":34468,"testday05":"631723455male631723434468","amount01":"NR3QD2619118","amount02":34342,"amount03":"P1J8O","aa":17227,"testb21":156,"fix_const_999_0222":999,"0304tf":"999 2619118 NR3QD","0305test_long":17255,"hello":152,"datetest":"2019-05-18","binarytest":99,"nestedtest":"NR3QD","aaaaaaaaaaaaaaaaabaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa":"P1J8O","floattest02":1,"__namelist_54":"0"}}

Bailing out...
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:519)
    at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.add(BulkProcessor.java:127)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:192)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:172)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:101)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

es客戶端在寫入es時,數(shù)據(jù)現(xiàn)有的版本號與它所持有的版本號不一致,即有別的client已經(jīng)修改過數(shù)據(jù)。

解決過程

1)首先思考:確保了_id全局唯一,正常情況下同一個_id的數(shù)據(jù),僅會被一個spark task執(zhí)行一次。而es基于樂觀鎖進(jìn)行控制,只有其他client在當(dāng)前client讀寫之間進(jìn)行了數(shù)據(jù)的更改才會導(dǎo)致當(dāng)前client報版本沖突錯誤。于是思考,是什么原因?qū)е聲兄辽賰蓚€client去寫同一條數(shù)據(jù)呢?

可能一:spark的動態(tài)資源分配

spark的動態(tài)資源分配,在CDH中確實會導(dǎo)致executor數(shù)量成倍增長,然后將task調(diào)度到新的executor執(zhí)行,但這不會導(dǎo)致同一個task對應(yīng)的數(shù)據(jù)(partition)多個task執(zhí)行,故排除。

可能二:task的推測執(zhí)行
推測執(zhí)行機(jī)制為了防止某個task拖慢task set整體的執(zhí)行,會為同一份數(shù)據(jù)啟動多個task,哪個task最先執(zhí)行完就以該task的結(jié)果為準(zhǔn),并殺掉其他task。該種情況確實會產(chǎn)生多個client寫同一條數(shù)據(jù)產(chǎn)生版本沖突,但spark默認(rèn)并未開啟該機(jī)制,程序也沒有手動設(shè)置,所以也要排除。

2)debug源代碼,因為問題很難復(fù)現(xiàn)問題,也沒有獲得足夠有用的信息。

3)這個時候突然發(fā)現(xiàn)ui界面除了有大量版本沖突的報錯信息,在某個角落還有一種EsHadoopNoNodesLeftException: Connection error的錯誤信息,再結(jié)合spark的task重試機(jī)制,貌似已經(jīng)有了答案。由于網(wǎng)絡(luò)原因,es連接異常,但已經(jīng)寫入的數(shù)據(jù)卻無法回滾,spark重新調(diào)度該任務(wù),新任務(wù)以數(shù)據(jù)的版本號為0進(jìn)行寫入,但實際已經(jīng)寫入的數(shù)據(jù)版本已經(jīng)被自增為1了,這時報版本沖突。

4)首先解決版本沖突問題。因為只要保證數(shù)據(jù)不丟失,所以版本沖突時只需忽略該條數(shù)據(jù)即可
結(jié)合官網(wǎng)配置如下錯誤處理器

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    @Override
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector)
            throws Exception
    {
        if (entry.getResponseCode() == 409) {
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

經(jīng)驗證,確實不會再出現(xiàn)版本沖突的錯誤,ui界面只能看到EsHadoopNoNodesLeftException: Connection error

5)解決EsHadoopNoNodesLeftException: Connection error
由于集群使用docker虛擬機(jī)搭建,并且elasticsearch與cdh集群部署在一起,整體性能很差;并且集群中默認(rèn)開啟了spark的動態(tài)資源分配,導(dǎo)致寫入并行度成倍增長。以上原因?qū)е逻B接異常報錯。
解決:使用--conf spark.dynamicAllocation.enabled=false 禁用動態(tài)資源分配,同時調(diào)整并行度,即控制同時寫入es的client數(shù)量。

經(jīng)驗證,連接異常不再出現(xiàn)。

源碼驗證

由 dataframe.saveToEs(to, map) 開始,調(diào)用鏈如下:

SparkDataFrameFunctions#saveToEs
?EsSparkSQL#saveToEs
??SparkContext#runJob

忽略dag劃分、task調(diào)度等細(xì)節(jié),關(guān)注runJob方法
sparkCtx.runJob(srdd.toDF().rdd, new EsDataFrameWriter(srdd.schema, esCfg.save()).write _)

EsDataFrameWriter的write方法轉(zhuǎn)換為函數(shù)作為參數(shù)傳遞到runJob中,在后續(xù)調(diào)用

def write(taskContext: TaskContext, data: Iterator[T]) {
    val writer = RestService.createWriter(settings, taskContext.partitionId.toLong, -1, log)

    taskContext.addTaskCompletionListener((TaskContext) => writer.close())

    if (runtimeMetadata) {
      writer.repository.addRuntimeFieldExtractor(metaExtractor)
    }

    while (data.hasNext) {
      writer.repository.writeToIndex(processData(data))
    }
  }

調(diào)用鏈如下:
RestRepository#writeToIndex
?RestRepository#doWriteToIndex
??BulkProcessor#add
???BulkProcessor#flush
????BulkProcessor#tryFlush
?????RestClient#bulk
??????NetworkClient#execute

核心方法:

public Response execute(Request request) {
        Response response = null;

        boolean newNode;
        do {
            SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body());

            newNode = false;
            try {
                response = currentTransport.execute(routedRequest);
                ByteSequence body = routedRequest.body();
                if (body != null) {
                    stats.bytesSent += body.length();
                }
            } catch (Exception ex) {
                if (ex instanceof EsHadoopIllegalStateException) {
                    throw (EsHadoopException) ex;
                }
                // issues with the SSL handshake, bail out instead of retry, for security reasons
                if (ex instanceof javax.net.ssl.SSLException) {
                    throw new EsHadoopTransportException(ex);
                }
                // check for fatal, non-recoverable network exceptions
                if (ex instanceof BindException) {
                    throw new EsHadoopTransportException(ex);
                }

                if (log.isTraceEnabled()) {
                    log.trace(
                            String.format(
                                    "Caught exception while performing request [%s][%s] - falling back to the next node in line...",
                                    currentNode, request.path()), ex);
                }

                String failed = currentNode;

                failedNodes.put(failed, ex);

                newNode = selectNextNode();

                log.error(String.format("Node [%s] failed (%s); "
                        + (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),
                        failed, ex.getMessage()));

                if (!newNode) {
                    throw new EsHadoopNoNodesLeftException(failedNodes);
                }
            }
        } while (newNode);

        return response;
    }

在此拋出 EsHadoopNoNodesLeftException

總結(jié)建議

再次驗證一個道理:讀報錯信息一定要有耐心,以免誤入歧途。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容