1.前言
HBase服務(wù)器端并沒有提供update,delete接口,所以這些操作在服務(wù)器端都被認(rèn)作是寫入操作。因此HBase中更新,刪除操作的流程與寫入流程完全一致。那下面就以put操作為例,進(jìn)行寫入流程的分析。
2.HTable類
2.1 HTable的put方法:
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits();
}
}
@Override
public void put(final List<Put> puts) throws IOException {
getBufferedMutator().mutate(puts);
if (autoFlush) {
flushCommits();
}
}
<1> getBufferedMutator().mutate(put);
BufferedMutator getBufferedMutator() throws IOException {
// mutator:寫緩存
if (mutator == null) {
this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(
//創(chuàng)建mutator實例,傳入tableName,線程池,寫的buffer大小,最大的keyValue數(shù)量
new BufferedMutatorParams(tableName)
.pool(pool)
.writeBufferSize(tableConfiguration.getWriteBufferSize())
.maxKeyValueSize(tableConfiguration.getMaxKeyValueSize())
);
}
return mutator;
}
connection.getBufferedMutator實際調(diào)用ConnectionManager的getBufferedMutator方法,最終返回new BufferedMutatorImpl構(gòu)造方法構(gòu)造。
這里的maxKeyValueSize是單個Cell的大小

在BufferedMutatorImpl中:
protected ClusterConnection connection; // non-final so can be overridden in test
private final TableName tableName;
private volatile Configuration conf;
@VisibleForTesting
final ConcurrentLinkedQueue<Mutation> writeAsyncBuffer = new ConcurrentLinkedQueue<Mutation>();
@VisibleForTesting
AtomicLong currentWriteBufferSize = new AtomicLong(0);
private long writeBufferSize;
private final int maxKeyValueSize;
private boolean closed = false;
private final ExecutorService pool;
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
調(diào)用mutate方法傳入put請求,實際上就是將put請求放入writeAsyncBuffer中
long toAddSize = 0;
for (Mutation m : ms) {
//如果是put請求,校驗寫入的每個Cell的大小是否超過限制
if (m instanceof Put) {
validatePut((Put) m);
}
//累加當(dāng)前請求占據(jù)的堆內(nèi)存大小
toAddSize += m.heapSize();
}
// This behavior is highly non-intuitive... it does not protect us against
// 94-incompatible behavior, which is a timing issue because hasError, the below code
// and setter of hasError are not synchronized. Perhaps it should be removed.
//異步提交請求發(fā)生錯誤
if (ap.hasError()) {
//累加當(dāng)前的請求的大小(AtomicLong類型,具有原子性)
currentWriteBufferSize.addAndGet(toAddSize);
//將該請求放在緩存中(ConcurrentLinkedQueue,同步且有順序)
writeAsyncBuffer.addAll(ms);
//不管當(dāng)前的currentWriteBufferSize是否達(dá)到閾值,直接flush,并且同步等待結(jié)果返回
backgroundFlushCommits(true);
} else {
currentWriteBufferSize.addAndGet(toAddSize);
writeAsyncBuffer.addAll(ms);
}
// Now try and queue what needs to be queued.
while (currentWriteBufferSize.get() > writeBufferSize) {
backgroundFlushCommits(false);
}
總結(jié):put中,
先getBufferedMutator().mutate(put);
這個方法中在緩存達(dá)到閾值時,通過AsyncProcess異步提交(如果異步提交進(jìn)程初始化異常,那么就會轉(zhuǎn)為同步提交,且不必等待達(dá)到閾值)
if (autoFlush) { flushCommits(); }這個條件分支:
如果autoFlush為true,不管緩存是否達(dá)到閾值,都會直接觸發(fā)AsyncProcess異步提交
2.2 backgroundFlushCommits方法
當(dāng)傳入backgroudFlushCommits的參數(shù)為false時執(zhí)行的是異步提交,參數(shù)為true時執(zhí)行的是同步提交。
極少數(shù)情況(異步提交發(fā)生異常才會轉(zhuǎn)為同步):

與此同時,可以發(fā)現(xiàn)無論異步提交還是同步提交,實際的提交動作是由AsyncProcess ap執(zhí)行的:
其中最關(guān)鍵的是ap.submit(tableName, buffer, true, null, false);

locateRegion方法這里就先不贅述了,之后會寫一篇專門來分析。

放到
Map<ServerName, MultiAction<Row>> actionsByServer中
接下里就是多線程的RPC提交:
AsyncProcess中的submit ---》submitMultiActions ---》sendMultiAction ---》

---》getNewMultiActionRunnable

---》SingleServerRequestRunnable類中的run方法

3.總結(jié)
(1)把put操作添加到writeAsyncBuffer隊列里面,符合條件(自動flush或者超過了閥值writeBufferSize)就通過AsyncProcess異步批量提交。
(2)在提交之前,我們要根據(jù)每個rowkey找到它們歸屬的region server,這個定位的過程是通過HConnection的locateRegion方法獲得的,然后再把這些rowkey按照HRegionLocation分組。
(3)通過多線程,一個HRegionLocation構(gòu)造MultiServerCallable<Row>,然后通過rpcCallerFactory.<MultiResponse> newCaller()執(zhí)行調(diào)用,忽略掉失敗重新提交和錯誤處理,客戶端的提交操作到此結(jié)束。
相關(guān)博客:HBase的put流程源碼分析