[HBase] 寫入流程之客戶端處理階段

1.前言

HBase服務(wù)器端并沒有提供update,delete接口,所以這些操作在服務(wù)器端都被認(rèn)作是寫入操作。因此HBase中更新,刪除操作的流程與寫入流程完全一致。那下面就以put操作為例,進(jìn)行寫入流程的分析。

2.HTable類
2.1 HTable的put方法:
  @Override
  public void put(final Put put) throws IOException {
    getBufferedMutator().mutate(put);
    if (autoFlush) {
      flushCommits();
    }
  }

  @Override
  public void put(final List<Put> puts) throws IOException {
    getBufferedMutator().mutate(puts);
    if (autoFlush) {
      flushCommits();
    }
  }

<1> getBufferedMutator().mutate(put);

  BufferedMutator getBufferedMutator() throws IOException {
    // mutator:寫緩存
    if (mutator == null) {
      this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
          //創(chuàng)建mutator實例,傳入tableName,線程池,寫的buffer大小,最大的keyValue數(shù)量  
          new BufferedMutatorParams(tableName)
              .pool(pool)
              .writeBufferSize(tableConfiguration.getWriteBufferSize())
              .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
      );
    }
    return mutator;
  }

connection.getBufferedMutator實際調(diào)用ConnectionManager的getBufferedMutator方法,最終返回new BufferedMutatorImpl構(gòu)造方法構(gòu)造。

這里的maxKeyValueSize是單個Cell的大小

BufferedMutatorImpl中:

  protected ClusterConnection connection; // non-final so can be overridden in test
  private final TableName tableName;
  private volatile Configuration conf;
  @VisibleForTesting
  final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
  @VisibleForTesting
  AtomicLong currentWriteBufferSize = new AtomicLong(0);

  private long writeBufferSize;
  private final int maxKeyValueSize;
  private boolean closed = false;
  private final ExecutorService pool;

  @VisibleForTesting
  protected AsyncProcess ap; // non-final so can be overridden in test

調(diào)用mutate方法傳入put請求,實際上就是將put請求放入writeAsyncBuffer

   long toAddSize = 0;
    for (Mutation m : ms) {
      //如果是put請求,校驗寫入的每個Cell的大小是否超過限制
      if (m instanceof Put) {
        validatePut((Put) m);
      }
      //累加當(dāng)前請求占據(jù)的堆內(nèi)存大小
      toAddSize += m.heapSize();
    }

    // This behavior is highly non-intuitive... it does not protect us against
    // 94-incompatible behavior, which is a timing issue because hasError, the below code
    // and setter of hasError are not synchronized. Perhaps it should be removed.
    //異步提交請求發(fā)生錯誤
    if (ap.hasError()) {
      //累加當(dāng)前的請求的大小(AtomicLong類型,具有原子性)
      currentWriteBufferSize.addAndGet(toAddSize);
      //將該請求放在緩存中(ConcurrentLinkedQueue,同步且有順序)
      writeAsyncBuffer.addAll(ms);
      //不管當(dāng)前的currentWriteBufferSize是否達(dá)到閾值,直接flush,并且同步等待結(jié)果返回
      backgroundFlushCommits(true);
    } else {
      currentWriteBufferSize.addAndGet(toAddSize);
      writeAsyncBuffer.addAll(ms);
    }

    // Now try and queue what needs to be queued.
    while (currentWriteBufferSize.get() > writeBufferSize) {
      backgroundFlushCommits(false);
    }

總結(jié):put中,
getBufferedMutator().mutate(put);
這個方法中在緩存達(dá)到閾值時,通過AsyncProcess異步提交(如果異步提交進(jìn)程初始化異常,那么就會轉(zhuǎn)為同步提交,且不必等待達(dá)到閾值)
if (autoFlush) { flushCommits(); }這個條件分支:
如果autoFlush為true,不管緩存是否達(dá)到閾值,都會直接觸發(fā)AsyncProcess異步提交

2.2 backgroundFlushCommits方法

當(dāng)傳入backgroudFlushCommits的參數(shù)為false時執(zhí)行的是異步提交,參數(shù)為true時執(zhí)行的是同步提交。
極少數(shù)情況(異步提交發(fā)生異常才會轉(zhuǎn)為同步):

與此同時,可以發(fā)現(xiàn)無論異步提交還是同步提交,實際的提交動作是由AsyncProcess ap執(zhí)行的:
其中最關(guān)鍵的是ap.submit(tableName, buffer, true, null, false);

locateRegion方法這里就先不贅述了,之后會寫一篇專門來分析。


放到Map<ServerName, MultiAction<Row>> actionsByServer

接下里就是多線程的RPC提交:
AsyncProcess中的submit ---》submitMultiActions ---》sendMultiAction ---》

分RegionServer對相應(yīng)Actions創(chuàng)建多線程

---》getNewMultiActionRunnable

最終每個RegionServer一個線程

---》SingleServerRequestRunnable類中的run方法

3.總結(jié)

(1)把put操作添加到writeAsyncBuffer隊列里面,符合條件(自動flush或者超過了閥值writeBufferSize)就通過AsyncProcess異步批量提交。

(2)在提交之前,我們要根據(jù)每個rowkey找到它們歸屬的region server,這個定位的過程是通過HConnection的locateRegion方法獲得的,然后再把這些rowkey按照HRegionLocation分組。

(3)通過多線程,一個HRegionLocation構(gòu)造MultiServerCallable<Row>,然后通過rpcCallerFactory.<MultiResponse> newCaller()執(zhí)行調(diào)用,忽略掉失敗重新提交和錯誤處理,客戶端的提交操作到此結(jié)束。

相關(guān)博客:HBase的put流程源碼分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容