HBase掃描操作Scan
1 介紹
掃描操作的使用和get()方法類似。同樣,和其他函數(shù)類似,這里也提供了Scan類。但是由于掃描工作方式類似于迭代器,所以用戶無需調(diào)用san()方法創(chuàng)建實例,只需要調(diào)用HTable的getScanner()方法,此方法才是返回真正的掃描器(scanner)實例的同時,用戶也可以使用它迭代獲取數(shù)據(jù),Table中的可用的方法如下:
ResultScanner getScanner(Scan scan)
ResultScanner getScanner(byte[] family)
ResultScanner getScanner(byte[] family, byte[] qualifier)
后兩個為了方便用戶,隱式地幫助用戶創(chuàng)建一個Scan實例,邏輯中最后調(diào)用getScanner(Scan scan)方法。Scan類擁有以下構(gòu)造器:
public Scan()
public Scan(byte [] startRow)
public Scan(byte [] startRow, byte [] stopRow)
public Scan(byte [] startRow, Filter filter)
public Scan(Get get)
public Scan(Scan scan)
用戶可以選擇性的提供startRow參數(shù),來定義掃描讀取HBase表的起始行鍵,即行鍵不是必須指定的。同時可選stopRow參數(shù)來限定讀取到何處停止。
其實行包括在內(nèi),而終止行不包含在內(nèi)。一般區(qū)間表示為[startRow,stopRow)
掃描操作有一個特點:用戶提供的參數(shù)不必精確匹配兩行。掃描會匹配相等或者大于給定的起始行的行鍵。如果沒有顯示地指定起始行,它會從表的起始位置開始獲取數(shù)據(jù)。
當(dāng)遇到了與設(shè)置的終止行相同或者大于終止行的行鍵時,掃描也會終止。如果沒有指定終止鍵,會掃描到表尾。
另一個可選的參數(shù)叫做過濾器(filter),可直接指向Filter實例。盡管Scan實例通常由空白構(gòu)造器構(gòu)造,但其所有可選參數(shù)都有對應(yīng)的getter方法和setter方法。
創(chuàng)建Scan實例后,用戶可能還要給它增加更多限制條件。這種情況下,用戶仍然可以使用空白 參數(shù)的掃描,它可以讀取整個表格,包括所有列族以及它們的所有列。可以用多種方法限制要讀取的數(shù)據(jù):
public Scan addFamily(byte [] family) // 方法限制返回數(shù)據(jù)的列族
public Scan addColumn(byte [] family, byte [] qualifier) // 方法限制返回的列
Scan setTimeRange(long minStamp,long maxStamp) // 設(shè)置時間范圍
Scan setTimeStamp(long timestamp) // 設(shè)置時間戳
Scan setMaxVersions() // 設(shè)置最大版本數(shù)
Scan setMaxVersions(int maxVersions) // 設(shè)置最大版本數(shù)
2 示例代碼
@Test
public void testScan() throws IOException {
Connection conn = ConnectionFactory.createConnection();
Table table = conn.getTable(TableName.valueOf("ns1:t1"));
Scan scan = new Scan(Bytes.toBytes("row"),Bytes.toBytes("row010"));
scan.addFamily(Bytes.toBytes("f1"));
ResultScanner scanner = table.getScanner(scan)
Iterator<Result> results = scanner.iterator();
while (results.hasNext()){
Result r = results.next();
String rowId = Bytes.toString(r.getRow())
Cell cId = r.getColumnLatestCell(Bytes.toBytes("f1"),Bytes.toBytes("id"));
Cell cName = r.getColumnLatestCell(Bytes.toBytes("f1"),Bytes.toBytes("name"));
Cell cAge = r.getColumnLatestCell(Bytes.toBytes("f1"),Bytes.toBytes("age"))
int id = Bytes.toInt(CellUtil.cloneValue(cId));
String name = Bytes.toString(CellUtil.cloneValue(cName));
int age = Bytes.toInt(CellUtil.cloneValue(cAge))
System.out.println("-----------------------------");
System.out.println(rowId + "," + id + "," + age + "," + name);
}
scanner.close();
table.close();
conn.close();
}
上述代碼顯示結(jié)果如下:
-----------------------------
row000,0,0,tom0
-----------------------------
row001,1,1,tom1
-----------------------------
row002,2,2,tom2
-----------------------------
row003,3,3,tom3
-----------------------------
row004,4,4,tom4
-----------------------------
row005,5,5,tom5
-----------------------------
row006,6,6,tom6
-----------------------------
row007,7,7,tom7
-----------------------------
row008,8,8,tom8
-----------------------------
row009,9,9,tom9
3 ResultScanner類
掃描操作不會通過一個RPC請求返回所有匹配的行,而是以行為單位進行返回。很明顯,行的數(shù)目很大,可能有上千條甚至更多,同時在一次請求中發(fā)送大量數(shù)據(jù),會占用大量的系統(tǒng)資源并消耗很長時間。
ResultScanner類把掃描操作轉(zhuǎn)換為類似的get操作,它將每一行數(shù)據(jù)封裝成一個Result實例,并將所有的Result實例放入一個迭代器中。ResultScanner的一些方法如下:
Result next()
Result[] next(int nbRows)
void close()
3.1 掃描器租約
要確保盡早釋放掃描器對象,一個打開的掃描器會占用不少的服務(wù)端資源,累計多了會占用大量的堆空間。當(dāng)使用完ResultScanner之后調(diào)用它的close()方法,同時當(dāng)把close()方法放到try/finally塊中,以保證其在迭代獲取數(shù)據(jù)過程中出現(xiàn)異常和錯誤時,仍然能執(zhí)行close()。
4 設(shè)置掃描器緩存
每一個next()調(diào)用都會為每一行數(shù)據(jù)生成一個單獨的RPC請求,即使使用next(int nbRows)方法也是如此,因為該方法僅僅是在客戶端循環(huán)地調(diào)用next()方法。很顯然,當(dāng)單元格數(shù)據(jù)較少時,這樣做的性能不會很好。因此,如果一次RPC請求可以獲取多行數(shù)據(jù),這樣更有意義。這樣的方法可以由掃描器的緩存實現(xiàn),默認(rèn)情況下,這個緩存是關(guān)閉的。
Scan類中提供了設(shè)置緩存的方法如下:
public Scan setCacheBlocks(boolean cacheBlocks) // 設(shè)置是否應(yīng)用緩存塊來進行掃描
public boolean getCacheBlocks() // 查看是否支持塊緩存
public Scan setCaching(int caching) // 設(shè)置掃描器的緩存行數(shù)
public int getCaching() // 獲取掃描器中的緩存行數(shù)
用戶需要少量的RPC請求次數(shù)和客戶端以及服務(wù)器的內(nèi)存消耗找到平衡點。很多時候,設(shè)置掃描器緩存可以提高性能,不過設(shè)置的太高就會產(chǎn)生不良的影響:每次調(diào)用next()將會占用更長的時間,因為要獲取更多的文件并傳輸?shù)娇蛻舳?,如果返回給客戶端的數(shù)據(jù)超出了其堆的大小,程序就會終止并拋出OutOfMemoryException異常。
Tip
當(dāng)傳輸和處理數(shù)據(jù)的時間超過配置的掃描器租約時間時,用戶將會收到一個ScannerTimeoutException形式拋出的租約過期錯誤。
下面是代碼示例:
/**
* 添加掃描
*/
@Test
public void testScanCacheBatch() throws Exception {
//
Configuration conf = HBaseConfiguration.create();
Connection conn = ConnectionFactory.createConnection(conf);
HTable table = (HTable) conn.getTable(TableName.valueOf("ns1:t2"));
Scan scan = new Scan();
System.out.println(scan.getBatch());
//三行
scan.setCaching(3) ;
//2列
scan.setBatch(2) ;
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> it = scanner.iterator();
while (it.hasNext()) {
Result r = it.next();
outResult(r);
}
scanner.close();
}
private void outResult(Result r){
System.out.println("=========================");
List<Cell> cells = r.listCells();
for(Cell cell : cells){
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
String f = Bytes.toString(CellUtil.cloneFamily(cell));
String col = Bytes.toString(CellUtil.cloneQualifier(cell));
long ts = cell.getTimestamp();
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(rowkey+"/"+f+":"+col+"/"+ts + "=" + value);
}
}
4.1 chche
在默認(rèn)情況下,如果你需要從hbase中查詢數(shù)據(jù),在獲取結(jié)果ResultScanner時,hbase會在你每次調(diào)用ResultScanner.next()操作時對返回的每個Row執(zhí)行一次RPC操作。即使你使用ResultScanner.next(int nbRows)時也只是在客戶端循環(huán)調(diào)用RsultScanner.next()操作,你可以理解為hbase將執(zhí)行查詢請求以迭代器的模式設(shè)計,在執(zhí)行next()操作時才會真正的執(zhí)行查詢操作,而對每個Row都會執(zhí)行一次RPC操作。
因此顯而易見的就會想如果我對多個Row返回查詢結(jié)果才執(zhí)行一次RPC調(diào)用,那么就會減少實際的通訊開銷。這個就是hbase配置屬性“hbase.client.scanner.caching”的由來,設(shè)置cache可以在hbase配置文件中顯示靜態(tài)的配置,也可以在程序動態(tài)的設(shè)置。
cache值得設(shè)置并不是越大越好,需要做一個平衡。cache的值越大,則查詢的性能就越高,但是與此同時,每一次調(diào)用next()操作都需要花費更長的時間,因為獲取的數(shù)據(jù)更多并且數(shù)據(jù)量大了傳輸?shù)娇蛻舳诵枰臅r間就越長,一旦你超過了maximum heap the client process 擁有的值,就會報outofmemoryException異常。當(dāng)傳輸rows數(shù)據(jù)到客戶端的時候,如果花費時間過長,則會拋出ScannerTimeOutException異常。
4.2 batch
在cache的情況下,我們一般討論的是相對比較小的row,那么如果一個Row特別大的時候應(yīng)該怎么處理呢?要知道cache的值增加,那么在client process 占用的內(nèi)存就會隨著row的增大而增大。在hbase中同樣為解決這種情況提供了類似的操作:Batch。可以這么理解,cache是面向行的優(yōu)化處理,batch是面向列的優(yōu)化處理。它用來控制每次調(diào)用next()操作時會返回多少列,比如你設(shè)置setBatch(5),那么每一個Result實例就會返回5列,如果你的列數(shù)為17的話,那么就會獲得四個Result實例,分別含有5,5,5,2個列。
下面會以表格的形式來幫助理解,假設(shè)我們擁有10Row,每個row擁有2個family,每個family擁有10個列。(也就是說每個Row含有20列)
| 緩存 | 批量處理 | Result個數(shù) | RPC次數(shù) | 說明 |
|---|---|---|---|---|
| 1 | 1 | 200 | 201 | 每個列都作為一個Result實例返回。最后還多一個RPC確認(rèn)掃描完成 |
| 200 | 1 | 200 | 2 | 每個Result實例都只包含一列的值,不過它們都被一次RPC請求取回 |
| 2 | 10 | 20 | 11 | 批量參數(shù)是一行所包含的列數(shù)的一半,所以200列除以10,需要20個result實例。同時需要10次RPC請求取回。 |
| 5 | 100 | 10 | 3 | 對一行來講,這個批量參數(shù)實在是太大了,所以一行的20列都被放入到了一個Result實例中。同時緩存為5,所以10個Result實例被兩次RPC請求取回。 |
| 5 | 20 | 10 | 3 | 同上,不過這次的批量值與一行列數(shù)正好相同,所以輸出與上面一種情況相同 |
| 10 | 10 | 20 | 3 | 這次把表分成了較小的result實例,但使用了較大的緩存值,所以也是只用了兩次RPC請求就返回了數(shù)據(jù) |
要計算一次掃描操作的RPC請求的次數(shù),用戶需要先計算出行數(shù)和每行列數(shù)的乘積。然后用這個值除以批量大小和每行列數(shù)中較小的那個值。最后再用除得的結(jié)果除以掃描器緩存值。 用數(shù)學(xué)公式表示如下:
RPC請求的次數(shù)=(行數(shù)x每行的列數(shù))/Min(每行的列數(shù),批量大小)/掃描器緩存
原文地址:http://blog.csdn.net/u010521842/article/details/77719433