0. 蜜汁參數(shù)
在做 HBase 客戶端 scan 優(yōu)化時(shí),經(jīng)常會(huì)碰到以下幾個(gè)參數(shù),總是讓人迷惑 ,不知從何優(yōu)化起。
-
.setCache(緩存大小? 字節(jié)數(shù)?行數(shù)?) -
.setMaxResultSize(最大結(jié)果數(shù)?) -
.setBatch(批量?)
造成這種困擾很大的原因是命名問(wèn)題。
先說(shuō)下結(jié)論,如果把名字改成如下,語(yǔ)義會(huì)清晰很多 。[1]
-
.setCaching=>.setNumberOfRowsFetchSize(客戶端每次 rpc fetch 的行數(shù)) -
.setMaxResultSize=>.setMaxResultByteSize(客戶端緩存的最大字節(jié)數(shù)) -
.setBatch=>.setColumnsChunkSize(客戶端每次獲取的列數(shù))
1. Client Scan 原理及相關(guān)源碼解讀
HBase 每次 scan 的數(shù)據(jù)量可能會(huì)比較大,客戶端不會(huì)一次性全部把數(shù)據(jù)從服務(wù)端拉回來(lái)。而是通過(guò)多次 rpc 分批次的拉取。類似于 TCP 協(xié)議里面一段一段的傳輸,可以做到細(xì)粒度的流量控制。至于如何調(diào)優(yōu),控制每次 rpc 拉取的數(shù)據(jù)量,就可以通過(guò)以上三個(gè)比較蛋疼的參數(shù)來(lái)控制。
我們可以先看一段來(lái)自 HBase scan 里面的核心類 ClientScanner 里的讀取邏輯,通過(guò)它來(lái)了解整個(gè)流程。
@Override
public Result next() throws IOException {
// If the scanner is closed and there's nothing left in the cache, next is a no-op.
if (cache.size() == 0 && this.closed) {
return null;
}
// 緩沖中沒(méi)有就 RPC 調(diào)用讀取數(shù)據(jù)進(jìn)緩存
if (cache.size() == 0) {
loadCache();
}
// 緩沖中有直接從緩存中取
if (cache.size() > 0) {
return cache.poll();
}
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
return null;
}
每次從緩存 cache 中讀,緩存為空則 loadCache , 實(shí)際上 cache 是通過(guò)一個(gè)鏈表來(lái)實(shí)現(xiàn)的,定義如下:
protected final LinkedList<Result> cache = new LinkedList<Result>();
繼續(xù)看 loadCache() ,為了弄清大體主流程,我刪除了部分代碼
protected void loadCache() throws IOException {
Result[] values = null;
// 剩余最大容量
long remainingResultSize = maxScannerResultSize;
// 行數(shù)計(jì)數(shù) 為 setCaching 的值
int countdown = this.caching;
// 配置 rpc 請(qǐng)求的條數(shù)
callable.setCaching(this.caching);
boolean serverHasMoreResults = false;
// do while 循環(huán),循環(huán)次數(shù)即為 rpc 次數(shù)
do {
try {
// rpc 從 server 拉數(shù)據(jù),請(qǐng)求的條數(shù)為 this.caching 默認(rèn)為 Integer.Max_VALUE
values = call(callable, caller, scannerTimeout);
} catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) {
// 異常處理 這里略過(guò)
}
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache
// 將數(shù)據(jù)放入緩存前,先對(duì)數(shù)據(jù)進(jìn)行一些處理,主要是處理對(duì)于部分對(duì)調(diào)用這不可見(jiàn)的數(shù)據(jù)
List<Result> resultsToAddToCache =
getResultsToAddToCache(values, callable.isHeartbeatMessage());
if (!resultsToAddToCache.isEmpty()) {
// 遍歷 results 寫(xiě)入 cache
for (Result rs : resultsToAddToCache) {
cache.add(rs);
// We don't make Iterator here
for (Cell cell : rs.rawCells()) {
// 估算每個(gè) cell 的大小,計(jì)算剩余 byte size
remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
}
// 剩余行數(shù) --
countdown--;
this.lastResult = rs;
}
}
// We expect that the server won't have more results for us when we exhaust
// the size (bytes or count) of the results returned. If the server *does* inform us that
// there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
// get results is the moreResults context valid.
if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty();
}
// Values == null means server-side filter has determined we must STOP
} while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
&& (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
// 循環(huán)條件
}
這里重點(diǎn)看下do while 循環(huán)的條件 doneWithRegion()
/**
* @param remainingResultSize
* @param remainingRows
* @param regionHasMoreResults
*/
private boolean doneWithRegion(long remainingResultSize, int remainingRows,
boolean regionHasMoreResults) {
// 同時(shí)滿足這些才行這里的
// remainingResultSize 初始值即為配置的 setMaxResultSize
// remainingRows 初始值為配置的 setCaching (實(shí)際為:行數(shù))
return remainingResultSize > 0 && remainingRows > 0 && !regionHasMoreResults;
}
因?yàn)槊看?while 循環(huán)會(huì)進(jìn)行一次 rpc 調(diào)用,不同的參數(shù)配置組合配置導(dǎo)致 rpc 調(diào)用的次數(shù)不同。必須同時(shí)滿足行數(shù)與字節(jié)數(shù)的的限制才行。
3. 官方配置與建議
這幾個(gè)參數(shù)對(duì)應(yīng)的配置如下
hbase.client.scanner.caching - (setCaching):HBase-0.98 默認(rèn)值為為 100,HBase-1.2 默認(rèn)值為 2147483647,即 Integer.MAX_VALUE。Scan.next() 的一次 RPC 請(qǐng)求 fetch 的記錄條數(shù)。配置建議:這個(gè)參數(shù)與 下面的hbase.client.scanner.max.result.size - (setMaxResultSize)配置使用,在網(wǎng)絡(luò)狀況良好的情況下,自定義設(shè)置不宜太小, 可以直接采用默認(rèn)值,不配置。hbase.client.scanner.max.result.size - (setMaxResultSize):HBase-0.98 無(wú)該項(xiàng)配置,HBase-1.2 默認(rèn)值為 210241024,即 2M。Scan.next() 的一次 RPC 請(qǐng)求 fetch 的數(shù)據(jù)量大小,目前 HBase-1.2 在 Caching 為默認(rèn)值(Integer Max)的時(shí)候,實(shí)際使用這個(gè)參數(shù)控制 RPC 次數(shù)和流量。配置建議:如果網(wǎng)絡(luò)狀況較好(萬(wàn)兆網(wǎng)卡),scan 的數(shù)據(jù)量非常大,可以將這個(gè)值配置高一點(diǎn)。如果配置過(guò)高:則可能 loadCache 速度比較慢,導(dǎo)致 scan timeout 異常hbase.server.scanner.max.result.size:服務(wù)端配置。HBase-0.98 無(wú)該項(xiàng)配置,HBase-1.2 新增,默認(rèn)值為 10010241024,即 100M。該參數(shù)表示當(dāng) Scan.next() 發(fā)起 RPC 后,服務(wù)端返回給客戶端的最大字節(jié)數(shù),防止 Server OOM。[2]setBatch()坑爹的命名,這個(gè)實(shí)際上是配置獲取的列數(shù),假如表有兩個(gè)列簇 cf,info,每個(gè)列簇5個(gè)列。這樣每行可能有10列了,setBatch() 可以控制每次獲取的最大列數(shù),進(jìn)一步從列級(jí)別控制流量。配置建議:當(dāng)列數(shù)很多,數(shù)據(jù)量大時(shí)考慮配置此參數(shù),例如100列每次只獲取50列。一般情況可以默認(rèn)值(-1 不受限)。
參考:
[1] HBase client’s weird API names
[2] HBase Client配置參數(shù)說(shuō)明