StreamSets update和delete分離之后導(dǎo)致的操作順序錯(cuò)亂

現(xiàn)象:

當(dāng)期的操作流程如下圖:


clipboard.png

這樣的處理流程會造成如下問題:

1、當(dāng)binlog解析出的批次數(shù)據(jù)中,數(shù)據(jù)包含了對同一條數(shù)據(jù)的刪除和修改操作時(shí),無法保證操作執(zhí)行的順序。

解決方案(針對kudu的Destination):
Kudu的Destination中有個(gè)設(shè)置Default Operation ,這個(gè)設(shè)置的說明是:

default operation to perform if sdc.operation.type is not set in record header.

所以我們可以通過Record的Header Attribute 中的sdc.opeation.type來直接控制數(shù)據(jù)在kudu的Destination中執(zhí)行的操作。
對數(shù)據(jù)的操作不進(jìn)行分離,通過sdc.operation.type加入的Record的Header Attributes中進(jìn)行控制。
sdc.operation.type的值的說明如下:

  • INSERT_CODE = 1;
  • DELETE_CODE = 2;
  • UPDATE_CODE = 3;
  • UPSERT_CODE = 4;

在Javascript Evaluator 中的JS中增加如下的代碼:

for(var i = 0; i < records.length; i++) {
try {
  var newRecord = sdcFunctions.createRecord(true);
  var attributes = records[i].attributes
  if(records[i].value['Type'] =='DELETE'){
    newRecord.attributes['sdc.operation.type']='2';
    newRecord.value = records[i].value['OldData'];
  }else{
    newRecord.attributes['sdc.operation.type']='4'
    newRecord.value = records[i].value['Data'];
  }
  newRecord.value.Type = records[i].value['Type'];
  newRecord.value.Database = records[i].value['Database'];
  newRecord.value.Table = records[i].value['Table'];
  output.write(newRecord);
} catch (e) {
  // Send record to error
  error.write(records[i], e);
  }
}
``
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容