HBase分析之Filter

Filter的作用是謂詞下推,就是在Scan查詢數(shù)據(jù)時(shí),將過濾數(shù)據(jù)的操作放到服務(wù)端進(jìn)行,減少數(shù)據(jù)的傳輸,減少網(wǎng)絡(luò)IO。

介紹Filter使用方法的文章很多,就不再贅述了,主要記錄下如何自定義Filter。

解析

在一次Scan的過程中,F(xiàn)ilter存在于2個(gè)地方:

  1. RegionScannerImpl中,用來做整理流程的把控、StoreScanner的過濾和行級(jí)別的過濾處理。
  2. ScanQueryMatcher中,做Cell級(jí)別的過濾處理。

RegionScannerImpl中調(diào)用Filter的方法:

  1. 首先是isFamilyEssential,在HRegion創(chuàng)建Scanner時(shí),用于篩選StoreScanner,當(dāng)有多個(gè)列族時(shí),篩選不通過的StoreScanner將放進(jìn)joinedHeap中。joinedHeap中的StoreScanner不參與主流程的查詢,只在主要StoreScanner查詢完,確定了要這行數(shù)據(jù),才會(huì)執(zhí)行查詢,并且只執(zhí)行ScanQueryMatcher中調(diào)用Filter的方法。
  2. hasFilterRow。返回true的話,才有機(jī)會(huì)執(zhí)行filterRowCells、filterRowCellsWithRet。
  3. filterRowCells。
  4. filterRowKey。這里Filter判斷是否要過濾掉這一行,true表示過濾掉這一行。
  5. filterRowCellsWithRet。
  6. filterRow。執(zhí)行差不多了,再判斷一次,是否要過濾掉這一行。
  7. 在完成了一行的查詢之后,會(huì)調(diào)用reset,清空臨時(shí)狀態(tài)。另外有種特殊情況,當(dāng)Size、Time或Batch達(dá)到上限時(shí),查詢會(huì)提前返回,這時(shí)候,是不調(diào)用reset的,狀態(tài)會(huì)持續(xù)到下一次請(qǐng)求。
  8. filterAllRemaining。true表示篩選是否完成了,沒有更多數(shù)據(jù)了,整個(gè)Scan完成。

ScanQueryMatcher中調(diào)用Filter的方法:

  1. filterAllRemaining。里面也給了一次機(jī)會(huì),判斷是否執(zhí)行完成了Scan。
  2. filterKeyValue。ColumnTracker返回值是MatchCode.INCLUDE時(shí),默認(rèn)一定是MatchCode.INCLUDE,會(huì)調(diào)用filterKeyValue,執(zhí)行Filter的過濾操作,返回值是MatchCode。
  3. transformCell、getNextCellHint。嚴(yán)格來說,這2個(gè)方法是在StoreScanner里執(zhí)行的
    1. 判斷返回值為INCLUDE_AND_SEEK_NEXT_COL時(shí),執(zhí)行Filter.transformCell。
    2. 判斷返回值為SEEK_NEXT_USING_HINT時(shí),執(zhí)行Filter.getNextCellHint。

Filter還有2個(gè)方法,用于RPC時(shí),將Filter發(fā)送到服務(wù)端

  1. toByteArray。首先需要轉(zhuǎn)成字節(jié)碼發(fā)送,調(diào)用的就是Filter的toByteArray方法。

  2. parseFrom。到服務(wù)端再轉(zhuǎn)成Filter,調(diào)用的是Filter的類方法parseFrom。

Protobuf安裝

既然要發(fā)送實(shí)例到服務(wù)端,就需要Protobuf了。HBase1.3.2版本使用的是protobuf2.5.0,所以這里裝個(gè)2.5.0。其他版本在Git Release上找下,安裝流程是一樣的。2.x和3.x生成的Java代碼并不兼容,已有3.x也按照下面步驟執(zhí)行就行。

  1. 下載protobuf-2.5.0.zip,解壓。
  2. 檢查下是否安裝autoreconf和make,執(zhí)行brew install autoconf && brew install automake
  3. 到目錄下執(zhí)行./autogen.sh && ./configure && make
  4. 檢查下make狀態(tài),make check,沒問題執(zhí)行make install
  5. 檢查下是否安裝成功,protoc --version顯示libprotoc 2.5.0

基本示例

先寫個(gè)Filter.proto文件,版本proto2。外部類名字不能和內(nèi)部類一樣,名就取個(gè)Filter。

syntax = "proto2";

option java_package = "cn.edu.bupt.hbase.protobuf.generated";
option java_outer_classname = "Filter";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message AnFilter {
}

執(zhí)行protoc編譯命令,生成Protobuf版的Filter類。

protoc --proto_path=src/main/protobuf/ --java_out=src/main/java/ src/main/protobuf/Filter.proto

寫個(gè)Java的Filter類,繼承FilterBase,引用前面生成的Filter.AnFilter。

public class AnFilter extends FilterBase {

    @Override
    public ReturnCode filterKeyValue(Cell cell) throws IOException {
        // 默認(rèn)值
        return null;
    }

    public byte[] toByteArray() {
        // 使用Filter.AnFilter
        Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
        return builder.build().toByteArray();
    }

    public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
        // 這里暫時(shí)不解析了,直接new一個(gè)
        return new AnFilter();
    }
}

把代碼打包,放入HBase的classpath里,重啟HBase。

在ProtobufUtil.toFilter打個(gè)斷點(diǎn),客戶端執(zhí)行Scan代碼,即進(jìn)入斷點(diǎn),看到成功在Server端生成了Filter。

進(jìn)階示例

讓我們寫個(gè)a+b=c的Filter,并且只返回一行。隨便放些數(shù)據(jù)進(jìn)去

f:a f:b f:v
row0 value0
row1 1 1
row2 2 2 value2
row3 3 value3
row4 4
row5 5 5 value5
row6 4 6 value6

更改Filter.proto,增加一個(gè)變量c,這個(gè)變量是要傳給服務(wù)端的

message AnFilter {
    required int32 c = 1;
}

重新執(zhí)行protoc編譯命令,編譯生成Filter。

修改Java的AnFilter,也增加上變量c,修改toByteArray把c設(shè)置進(jìn)去、parseFrom再從byte[]里把c讀出來。

public class AnFilter extends FilterBase {
    private int c;
    
    public AnFilter(int c) {
        this.c = c;
    }
    
    public byte[] toByteArray() {
        Filter.AnFilter.Builder builder = Filter.AnFilter.newBuilder();
        builder.setC(c);
        return builder.build().toByteArray();
    }

    public static AnFilter parseFrom(byte[] pbBytes) throws DeserializationException {
        Filter.AnFilter filter;
        try {
            filter = Filter.AnFilter.parseFrom(pbBytes);
        } catch (InvalidProtocolBufferException e) {
            throw new DeserializationException(e);
        }
        return new AnFilter(filter.getC());
    }

首先要找到a+b=c的行,找到qualifier是a,記錄下值;找到qualifier是b,記錄下值。a、b都找到了,判斷下和是不是等于c,等于就返回ReturnCode.INCLUDE,把當(dāng)前行找齊;否則跳過,返回ReturnCode.NEXT_ROW,繼續(xù)找下一行。如果這一行還沒找到a、b,還是得先返回ReturnCode.INCLUDE,不然這列就不會(huì)包含進(jìn)去了。

private int sum;

private boolean hasA;
private boolean hasB;
private boolean foundC;

@Override
public ReturnCode filterKeyValue(Cell cell) throws IOException {
    String qualifier = Bytes.toString(cell.getQualifierArray(), 
                                      cell.getQualifierOffset(),
                                      cell.getQualifierLength());
    if ("a".equals(qualifier)) {
        sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), 
                           cell.getValueLength());
        hasA = true;
    } else if ("b".equals(qualifier)) {
        sum += Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), 
                           cell.getValueLength());
        hasB = true;
    }
    if (hasA && hasB) {
        if (sum == c) {
            foundC = true;
            return ReturnCode.INCLUDE;
        } else {
            return ReturnCode.NEXT_ROW;
        }
    }
    return ReturnCode.INCLUDE;
}

那列都包含進(jìn)去了,如果后面發(fā)現(xiàn)a+b!=c怎么辦?沒事,還有filterRow方法,沒找到就返回true,把當(dāng)前列過濾掉。

@Override
public boolean filterRow() throws IOException {
    return !foundC;
}

一行執(zhí)行完成,要重置下臨時(shí)狀態(tài)。

@Override
public void reset() throws IOException {
    sum = 0;
    hasA = false;
    hasB = false;
}

找齊了這行就不繼續(xù)找了,filterAllRemaining返回true。找到了a、b,不能直接返回true,因?yàn)榭赡苓€有其他列,filterAllRemaining是每個(gè)cell都執(zhí)行的方法。所以在找齊一列的時(shí)候設(shè)置下值。

private boolean filterAllRemaining;
@Override
public void reset() throws IOException {
    ……
    filterAllRemaining = foundC;
}
@Override
public boolean filterAllRemaining() throws IOException {
    return filterAllRemaining;
}

打包放進(jìn)HBase,重啟HBase,客戶端執(zhí)行代碼。

scan.setFilter(new AnFilter(10));

返回找到的那行。

row5    5   5   value5
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、簡(jiǎn)介 Hbase:全名Hadoop DataBase,是一種開源的,可伸縮的,嚴(yán)格一致性(并非最終一致性)的分...
    菜鳥小玄閱讀 2,605評(píng)論 0 12
  • 1、通過CocoaPods安裝項(xiàng)目名稱項(xiàng)目信息 AFNetworking網(wǎng)絡(luò)請(qǐng)求組件 FMDB本地?cái)?shù)據(jù)庫組件 SD...
    陽明AI閱讀 16,203評(píng)論 3 119
  • 日子一天天過去,步入社會(huì)也快一年多了,離開校園,離開父母,一個(gè)嶄新的世界。經(jīng)歷了不同的生活,體驗(yàn)了人人平等的社會(huì)...
    搬磚不用手閱讀 260評(píng)論 0 1
  • 歸零Eno閱讀 506評(píng)論 4 6
  • 也許,生活會(huì)時(shí)不時(shí)來點(diǎn)小插曲,似乎證明你還是活著的,太過平靜,就如一攤死水,某時(shí)某刻,覺得,大致我們的一生就是這個(gè)...
    夏筱棉眠閱讀 115評(píng)論 0 1

友情鏈接更多精彩內(nèi)容