開發(fā)部署HBase Endpoint Coprocessor

一、前言

本文示例協(xié)處理器實(shí)現(xiàn)根據(jù)scan條件對(duì)指點(diǎn)列進(jìn)行count、avg。這里推薦HBase技術(shù)社區(qū)文章,該文非常詳細(xì)的介紹了如何開發(fā)并部署一個(gè)HBase Endpoint Coprocessor。本文基于該文基礎(chǔ)上,介紹如何在參數(shù)中傳入Scan、Filter等參數(shù)。

二、環(huán)境準(zhǔn)備

1、下載安裝Protobuf,請(qǐng)根據(jù)當(dāng)前hbase集群使用的Protobuf版本來進(jìn)行安裝。
2、配置環(huán)境變量
3、創(chuàng)建測(cè)試表

三、Protobuf生成序列化類

protobuf本身支持的數(shù)據(jù)類型不多,如果參數(shù)需要使用一個(gè)對(duì)象或者scan range,scan filter怎么辦?本文主要介紹如何傳入一個(gè)Scan對(duì)象

1、構(gòu)建proto環(huán)境
可以找到hbase中找到hbase-protocol項(xiàng)目,protobuf目錄下有著hbase已經(jīng)定義好的許多proto,本文需要使用的Scan對(duì)象在Client.proto中定義,將需要的或者所有proto文件拷貝到上述安裝Protobuf環(huán)境的機(jī)器上。

image

2、創(chuàng)建ClifeProto.proto文件,內(nèi)容如下:

syntax = "proto2";
option java_package = "com.clife.data.hbase.coprocessor.aggregate";
option java_outer_classname = "ClifeProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "Client.proto";

message ClifeAvgRequest {
    required string family = 1;
    required string columns = 2;
    optional Scan scan = 3;
}

message ClifeAvgResponse {
    required int64 count = 1;
    required string values = 2;
}

service ClifeService {
  rpc getAvg(ClifeAvgRequest)
    returns (ClifeAvgResponse);
}

注意ClifeProto.proto需要與Client.proto在同一目錄下,如圖:


image.png

3、生成java類

protoc --java_out=./ ClifeProto.proto

--java_out后面是指定生成java類的輸出目錄
執(zhí)行完后可以在上圖的com目錄下找到對(duì)應(yīng)的java類。

四、Endpoint Coprocessor服務(wù)端實(shí)現(xiàn)

1、構(gòu)建maven項(xiàng)目

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-examples</artifactId>
    <version>1.2.0-cdh5.7.2</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

在maven package的時(shí)候可以將上述依賴排除掉,避免打包后jar很大。
將步驟三中生成的java類,拷貝到ClifeProto.proto文件中配置的com.clife.data.hbase.coprocessor.aggregate包下。
2、構(gòu)建Endpoint實(shí)現(xiàn)類

public class ClifeProtosEndPoint extends ClifeProtos.ClifeService implements Coprocessor, CoprocessorService {

    protected static final Log log = LogFactory.getLog(ClifeProtosEndPoint.class);
    private RegionCoprocessorEnvironment env;

    /**
     * 計(jì)算平均值
     * 根據(jù)客戶端傳入的scan條件,對(duì)指定字段進(jìn)行求和,
     * 將求和結(jié)果與數(shù)據(jù)條數(shù)返回客戶端,由客戶端完成求平均
     * 1、客戶端需要傳入?yún)?shù):
     *    1)scan(可選),可以通過scan設(shè)置rowkeyRange、timeRange、filter等
     *    2)family(必須),每次rpc請(qǐng)求只允許操作一個(gè)列簇
     *    3)colums(必須),需要統(tǒng)計(jì)的列,多個(gè)列之間用逗號(hào)分隔,如:"weight,age"
     * 2、返回值:
     *    1)Count: Long,查詢的數(shù)據(jù)條數(shù)
     *    2)Values:String,columns的求和結(jié)果,如:"weight:234,age:345"
     * @param controller
     * @param request
     * @param done
     */
    @Override
    public void getAvg(RpcController controller, ClifeProtos.ClifeAvgRequest request, RpcCallback<ClifeProtos.ClifeAvgResponse> done) {
        ClifeProtos.ClifeAvgResponse response = null;
        long counter = 0L;
        List<Cell> results = new ArrayList<>();
        InternalScanner scanner = null;
        try {
            log.info("Start Clife avg endpoint.........................");
            Scan scan = null;
            ClientProtos.Scan cScan = request.getScan();
            if (cScan != null) {
                scan = ProtobufUtil.toScan(request.getScan());
                byte[] startRow = scan.getStartRow();
                byte[] stopRow = scan.getStopRow();
                if (startRow != null && stopRow != null)
                    log.info("StartRow = " +  RowKeyUtil.convertByteRunDataRowKeyToString(startRow) +
                            ", StopRow = " + RowKeyUtil.convertByteRunDataRowKeyToString(stopRow));
            } else {
                scan = new Scan();
            }

            byte[] cf = Bytes.toBytes(request.getFamily());
            scan.addFamily(cf);
            //傳入列的方式   sales,sales
            String colums = request.getColumns();
            log.info("Input colums: " + colums);
            Map<String, Long> columnMaps = new HashedMap();
            for (String column : colums.split(",")) {
                columnMaps.put(column, 0L);
                scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(column));
            }
            scanner = this.env.getRegion().getScanner(scan);
            boolean hasMoreRows = false;
            do {
                hasMoreRows = scanner.next(results);

                if (results.size() > 0) {
                    counter++;
                }
                for (Cell cell : results) {
                    String column = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    Long temp = Long.parseLong(value);
                    columnMaps.put(column, columnMaps.get(column) + temp);
                }

                results.clear();
            } while (hasMoreRows);
            StringBuffer values = new StringBuffer();
            for (String key : columnMaps.keySet()) {
                Long value = columnMaps.get(key);
                values.append(key).append(":").append(value).append(",");
            }
            log.info("Clife avg server result: " + values);
            response = ClifeProtos.ClifeAvgResponse.newBuilder()
                    .setCount(counter)
                    .setValues(values.toString())
                    .build();
        } catch (IOException e) {
            ResponseConverter.setControllerException(controller, e);
        } finally {
            if (scanner != null) {
                try {
                    scanner.close();
                } catch (IOException ignored) {
                }
            }
        }
        log.info("Row counter from this region is "
                + env.getRegion().getRegionInfo().getRegionNameAsString() + ": " + counter);
        done.run(response);
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.env = (RegionCoprocessorEnvironment) env;
        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }
    }

    @Override
    public void stop(CoprocessorEnvironment coprocessorEnvironment) throws IOException {
    }

    @Override
    public Service getService() {
        return this;
    }
}

3、Endpoint Coprocessor客戶端實(shí)現(xiàn)

public class ClifeProtosExample {

    /**
     * 效率最高的方式
     * 通過HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法獲取表的總條數(shù)
     * @param table HBase表名
     * @return 返回表的總條數(shù)
     */
    public static long execFastEndpointCoprocessor(Table table, final Scan scan, final String family, final String cloumes) {
        long start_t = System.currentTimeMillis();
        //定義總的 rowCount 變量
        final AtomicLong totalRowCount = new AtomicLong();
        final Map<String, AtomicLong> sumMap = new HashMap<>();
        try {
            Batch.Callback<ClifeProtos.ClifeAvgResponse> callback = new Batch.Callback<ClifeProtos.ClifeAvgResponse>() {
                @Override
                public void update(byte[] bytes, byte[] bytes1, ClifeProtos.ClifeAvgResponse clifeAvgResponse) {
                    //更新Count值
                    totalRowCount.getAndAdd(clifeAvgResponse.getCount());
                    String values = clifeAvgResponse.getValues();
                    for(String kv : values.split(",")) {
                        String[] kvs = kv.split(":");
                        String key = kvs[0];
                        Long value = Long.parseLong(kvs[1]);
                        if (!sumMap.containsKey(key)) {
                            final AtomicLong sum = new AtomicLong();
                            sum.getAndAdd(value);
                            sumMap.put(key, sum);
                        } else {
                            sumMap.get(key).getAndAdd(value);
                        }
                    }
                }
            };

            final ClientProtos.Scan cScan = ProtobufUtil.toScan(scan);

            table.coprocessorService(ClifeProtos.ClifeService.class, scan.getStartRow(), scan.getStopRow(),
                    new Batch.Call<ClifeProtos.ClifeService, ClifeProtos.ClifeAvgResponse>() {
                        @Override
                        public ClifeProtos.ClifeAvgResponse call(ClifeProtos.ClifeService aggregationService) throws IOException {
                            ClifeProtos.ClifeAvgRequest requet =
                                    ClifeProtos.ClifeAvgRequest.newBuilder()
                                            .setScan(cScan)
                                            .setFamily(family)
                                            .setColumns(cloumes)
                                            .build();
                            BlockingRpcCallback<ClifeProtos.ClifeAvgResponse> rpcCallback = new BlockingRpcCallback<>();
                            aggregationService.getAvg(null, requet, rpcCallback);
                            ClifeProtos.ClifeAvgResponse response = rpcCallback.get();
                            return response;
                        }
                    }, callback);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        System.out.println("耗時(shí):" + (System.currentTimeMillis() - start_t));
        System.out.println("totalRowCount:" + totalRowCount.longValue());
        for (String key : sumMap.keySet()) {
            Double value = sumMap.get(key).doubleValue();
            System.out.println(key + " avg = " + value / totalRowCount.longValue());
        }

        return totalRowCount.longValue();
    }
}

注意:
1)本文協(xié)處理器在服務(wù)端只做求和,平均值在客戶端完成;
2)傳入的Scan對(duì)象,支持rowkey range、time range、filter等scan的條件過濾

五、部署及調(diào)用

1、maven編譯
2、將編譯好的jar上傳到hdfs某目錄下,注意文件所屬用戶組。如:/hbase/coprocessor

image.png

3、協(xié)處理器裝載
協(xié)處理器的裝載分為動(dòng)態(tài)和靜態(tài)兩種,參照這篇文章。本文介紹的協(xié)處理器存在很重的業(yè)務(wù)性,并不適合動(dòng)態(tài)加載。本文的目的也只是介紹如何在協(xié)處理器中使用Scan等對(duì)象。

alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://nameservice1/hbase/coprocessor/clife-data-hbase-1.0.8.jar||com.clife.data.hbase.coprocessor.aggregate.ClifeProtosEndPoint||'
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容