一、前言
本文示例協(xié)處理器實(shí)現(xiàn)根據(jù)scan條件對(duì)指點(diǎn)列進(jìn)行count、avg。這里推薦HBase技術(shù)社區(qū)文章,該文非常詳細(xì)的介紹了如何開發(fā)并部署一個(gè)HBase Endpoint Coprocessor。本文基于該文基礎(chǔ)上,介紹如何在參數(shù)中傳入Scan、Filter等參數(shù)。
二、環(huán)境準(zhǔn)備
1、下載安裝Protobuf,請(qǐng)根據(jù)當(dāng)前hbase集群使用的Protobuf版本來進(jìn)行安裝。
2、配置環(huán)境變量
3、創(chuàng)建測(cè)試表
三、Protobuf生成序列化類
protobuf本身支持的數(shù)據(jù)類型不多,如果參數(shù)需要使用一個(gè)對(duì)象或者scan range,scan filter怎么辦?本文主要介紹如何傳入一個(gè)Scan對(duì)象
1、構(gòu)建proto環(huán)境
可以找到hbase中找到hbase-protocol項(xiàng)目,protobuf目錄下有著hbase已經(jīng)定義好的許多proto,本文需要使用的Scan對(duì)象在Client.proto中定義,將需要的或者所有proto文件拷貝到上述安裝Protobuf環(huán)境的機(jī)器上。

2、創(chuàng)建ClifeProto.proto文件,內(nèi)容如下:
syntax = "proto2";
option java_package = "com.clife.data.hbase.coprocessor.aggregate";
option java_outer_classname = "ClifeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
import "Client.proto";
message ClifeAvgRequest {
required string family = 1;
required string columns = 2;
optional Scan scan = 3;
}
message ClifeAvgResponse {
required int64 count = 1;
required string values = 2;
}
service ClifeService {
rpc getAvg(ClifeAvgRequest)
returns (ClifeAvgResponse);
}
注意ClifeProto.proto需要與Client.proto在同一目錄下,如圖:

3、生成java類
protoc --java_out=./ ClifeProto.proto
--java_out后面是指定生成java類的輸出目錄
執(zhí)行完后可以在上圖的com目錄下找到對(duì)應(yīng)的java類。
四、Endpoint Coprocessor服務(wù)端實(shí)現(xiàn)
1、構(gòu)建maven項(xiàng)目
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-examples</artifactId>
<version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
在maven package的時(shí)候可以將上述依賴排除掉,避免打包后jar很大。
將步驟三中生成的java類,拷貝到ClifeProto.proto文件中配置的com.clife.data.hbase.coprocessor.aggregate包下。
2、構(gòu)建Endpoint實(shí)現(xiàn)類
public class ClifeProtosEndPoint extends ClifeProtos.ClifeService implements Coprocessor, CoprocessorService {
protected static final Log log = LogFactory.getLog(ClifeProtosEndPoint.class);
private RegionCoprocessorEnvironment env;
/**
* 計(jì)算平均值
* 根據(jù)客戶端傳入的scan條件,對(duì)指定字段進(jìn)行求和,
* 將求和結(jié)果與數(shù)據(jù)條數(shù)返回客戶端,由客戶端完成求平均
* 1、客戶端需要傳入?yún)?shù):
* 1)scan(可選),可以通過scan設(shè)置rowkeyRange、timeRange、filter等
* 2)family(必須),每次rpc請(qǐng)求只允許操作一個(gè)列簇
* 3)colums(必須),需要統(tǒng)計(jì)的列,多個(gè)列之間用逗號(hào)分隔,如:"weight,age"
* 2、返回值:
* 1)Count: Long,查詢的數(shù)據(jù)條數(shù)
* 2)Values:String,columns的求和結(jié)果,如:"weight:234,age:345"
* @param controller
* @param request
* @param done
*/
@Override
public void getAvg(RpcController controller, ClifeProtos.ClifeAvgRequest request, RpcCallback<ClifeProtos.ClifeAvgResponse> done) {
ClifeProtos.ClifeAvgResponse response = null;
long counter = 0L;
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
try {
log.info("Start Clife avg endpoint.........................");
Scan scan = null;
ClientProtos.Scan cScan = request.getScan();
if (cScan != null) {
scan = ProtobufUtil.toScan(request.getScan());
byte[] startRow = scan.getStartRow();
byte[] stopRow = scan.getStopRow();
if (startRow != null && stopRow != null)
log.info("StartRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(startRow) +
", StopRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(stopRow));
} else {
scan = new Scan();
}
byte[] cf = Bytes.toBytes(request.getFamily());
scan.addFamily(cf);
//傳入列的方式 sales,sales
String colums = request.getColumns();
log.info("Input colums: " + colums);
Map<String, Long> columnMaps = new HashedMap();
for (String column : colums.split(",")) {
columnMaps.put(column, 0L);
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
}
scanner = this.env.getRegion().getScanner(scan);
boolean hasMoreRows = false;
do {
hasMoreRows = scanner.next(results);
if (results.size() > 0) {
counter++;
}
for (Cell cell : results) {
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
Long temp = Long.parseLong(value);
columnMaps.put(column, columnMaps.get(column) + temp);
}
results.clear();
} while (hasMoreRows);
StringBuffer values = new StringBuffer();
for (String key : columnMaps.keySet()) {
Long value = columnMaps.get(key);
values.append(key).append(":").append(value).append(",");
}
log.info("Clife avg server result: " + values);
response = ClifeProtos.ClifeAvgResponse.newBuilder()
.setCount(counter)
.setValues(values.toString())
.build();
} catch (IOException e) {
ResponseConverter.setControllerException(controller, e);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {
}
}
}
log.info("Row counter from this region is "
+ env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
done.run(response);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
}
@Override
public Service getService() {
return this;
}
}
3、Endpoint Coprocessor客戶端實(shí)現(xiàn)
public class ClifeProtosExample {
/**
* 效率最高的方式
* 通過HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法獲取表的總條數(shù)
* @param table HBase表名
* @return 返回表的總條數(shù)
*/
public static long execFastEndpointCoprocessor(Table table, final Scan scan, final String family, final String cloumes) {
long start_t = System.currentTimeMillis();
//定義總的 rowCount 變量
final AtomicLong totalRowCount = new AtomicLong();
final Map<String, AtomicLong> sumMap = new HashMap<>();
try {
Batch.Callback<ClifeProtos.ClifeAvgResponse> callback = new Batch.Callback<ClifeProtos.ClifeAvgResponse>() {
@Override
public void update(byte[] bytes, byte[] bytes1, ClifeProtos.ClifeAvgResponse clifeAvgResponse) {
//更新Count值
totalRowCount.getAndAdd(clifeAvgResponse.getCount());
String values = clifeAvgResponse.getValues();
for(String kv : values.split(",")) {
String[] kvs = kv.split(":");
String key = kvs[0];
Long value = Long.parseLong(kvs[1]);
if (!sumMap.containsKey(key)) {
final AtomicLong sum = new AtomicLong();
sum.getAndAdd(value);
sumMap.put(key, sum);
} else {
sumMap.get(key).getAndAdd(value);
}
}
}
};
final ClientProtos.Scan cScan = ProtobufUtil.toScan(scan);
table.coprocessorService(ClifeProtos.ClifeService.class, scan.getStartRow(), scan.getStopRow(),
new Batch.Call<ClifeProtos.ClifeService, ClifeProtos.ClifeAvgResponse>() {
@Override
public ClifeProtos.ClifeAvgResponse call(ClifeProtos.ClifeService aggregationService) throws IOException {
ClifeProtos.ClifeAvgRequest requet =
ClifeProtos.ClifeAvgRequest.newBuilder()
.setScan(cScan)
.setFamily(family)
.setColumns(cloumes)
.build();
BlockingRpcCallback<ClifeProtos.ClifeAvgResponse> rpcCallback = new BlockingRpcCallback<>();
aggregationService.getAvg(null, requet, rpcCallback);
ClifeProtos.ClifeAvgResponse response = rpcCallback.get();
return response;
}
}, callback);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
System.out.println("耗時(shí):" + (System.currentTimeMillis() - start_t));
System.out.println("totalRowCount:" + totalRowCount.longValue());
for (String key : sumMap.keySet()) {
Double value = sumMap.get(key).doubleValue();
System.out.println(key + " avg = " + value / totalRowCount.longValue());
}
return totalRowCount.longValue();
}
}
注意:
1)本文協(xié)處理器在服務(wù)端只做求和,平均值在客戶端完成;
2)傳入的Scan對(duì)象,支持rowkey range、time range、filter等scan的條件過濾
五、部署及調(diào)用
1、maven編譯
2、將編譯好的jar上傳到hdfs某目錄下,注意文件所屬用戶組。如:/hbase/coprocessor

3、協(xié)處理器裝載
協(xié)處理器的裝載分為動(dòng)態(tài)和靜態(tài)兩種,參照這篇文章。本文介紹的協(xié)處理器存在很重的業(yè)務(wù)性,并不適合動(dòng)態(tài)加載。本文的目的也只是介紹如何在協(xié)處理器中使用Scan等對(duì)象。
alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://nameservice1/hbase/coprocessor/clife-data-hbase-1.0.8.jar||com.clife.data.hbase.coprocessor.aggregate.ClifeProtosEndPoint||'