Python 讀寫 hbase 數(shù)據(jù)的正確姿勢(一)


title: Python 讀寫 hbase 數(shù)據(jù)的正確姿勢(一)

tags:

  • hbase
  • happybase
  • python

categories:

  • ?Hbase

comments: true
date: 2017-09-09 19:00:00


之前操作 hbase 大都是用 java 寫,或者偶爾用 python 寫幾個一些簡單的 put、get 操作。最近在使用 happybase 庫批量向 hbase 導入數(shù)據(jù),并通過 java 實現(xiàn)查詢的一些復雜的搜索時(scan+filter),遇到了一些有趣的問題。

實驗版本

Hbase 版本:1.0.0
Happybase 版本:1.1.0
Python 版本:2.7.13

問題1:filter 過濾失敗

問題重現(xiàn)

hbase 的使用場景大概是這樣的:

有一個 hbase table,存儲一些文章的基本信息,包括創(chuàng)建時間、文章ID、文章類別ID等,同屬于一個column family,"article"。

查詢的場景則是查找"指定的時間范圍","文章類型ID為N" 的所有文章數(shù)據(jù)。

根據(jù)以上場景,設計如下 table:

  1. hbase table 為 article 。
  2. rowkey 是 "ARTICLE" + 微秒級時間戳(類似OpenTSDB 的rowkey,便于按時間序列查到某一段時間創(chuàng)建的 articles),即 "ARTICLE1504939752000000"。
  3. family 為 "basic",包含 "ArticleID", "ArticleTypeID", "Created", 三個 column。

查詢時通過指定 rowkey start 和 rowkey stop,可以 scan 某一個時間段的數(shù)據(jù)(因為 rowkey 中包含數(shù)值型的時間戳),通過 hbase filter 實現(xiàn)"ArticleTypeID" == N 的過濾條件。

開始導入數(shù)據(jù)、準備查詢,以下是導入數(shù)據(jù)部分代碼 demo:

def save_batch_events(datas, table=None):
    with get_connetion_pool().connection() as conn:
        if table is not None:
            t = conn.table(table)
        else:
            t = conn.table(TABLE)
        b = t.batch(transaction=False)
        for row, data in datas.items():
            b.put(row, data)
        b.send()

def save_main_v1():
    datas = dict()
    for i in range(100):
        article_type_id = i % 2
        timestamp = time.time() + i
        rowkey = "ARTICLE" + str(timestamp * 1000000)
        data = {
            "basic:" + "ArticleID": str(i),
            "basic:" + "ArticleTypeID": str(article_type_id),
            "basic:" + "Created": str(timestamp),
        }
        datas[rowkey] = data
    save_batch_events(datas)

查看一下 hbase 的數(shù)據(jù),100 條數(shù)據(jù)全部正常導入,其中50條數(shù)據(jù) "ArticleTypeID" 為0,50條為1

圖 1:python-happyhbase 寫入的數(shù)據(jù)

接下來就是用 hbase filter 過濾的過程了,假設查詢 "ArticleTypeID" 為 0 的數(shù)據(jù),使用 java 客戶端實現(xiàn)查詢:

    public static void test_hbase_filter() throws IOException {
        TableName tableName = TableName.valueOf("test_article_1");
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(tableName);

        // Scan python table `test_article_1`
        System.out.println("Prepare to scan !");
        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("basic"),
                Bytes.toBytes("ArticleTypeID"), CompareOp.EQUAL, Bytes.toBytes(1L));
        list.addFilter(filter1);
        Scan s = new Scan();
        s.addFamily(Bytes.toBytes("basic"));
        s.setFilter(list);
        ResultScanner scanner = table.getScanner(s);
        int num = 0;
        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
            num++;
        }
        System.out.println("Found row: " + num);// 預期 50,結果為 0

問題出現(xiàn):使用 java 期望的查詢結果為 50 條,但是查出的結果卻是 0 條!

使用 python 查詢卻可以得到正確的結果:

def recent_events_v1(start, end, table=None, filter_str=None, limit=2000):
    with get_connetion_pool().connection() as conn:
        if table is not None:
            t = conn.table(table)
        else:
            t = conn.table(TABLE)
        start_row = 'ARTICLE' + str(start * 1000000)
        end_row = 'ARTICLE' + str(end * 1000000)
        return t.scan(row_start=start_row, row_stop=end_row, filter=filter_str, limit=limit)

if __name__ == '__main__':
    filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:1')"
    results = recent_events_v1(start=0, end=1505023900, filter_str=filter_str)
    print len([i for i in results])  # 期望值為50, 實際值為 50,正確

尋找原因

經(jīng)過 N 次確認,java 的讀操作是沒有問題的,python 實現(xiàn)的讀寫也得到了預期的效果。進一步探究,特意用 java 完整的實現(xiàn)的數(shù)據(jù)的導入和查詢:

public static void test_hbase_filter1() throws IOException {        
        tableName = TableName.valueOf("test_article_java_1");
        table = conn.getTable(tableName);
        System.out.println("Prepare create table !");
        Admin admin = conn.getAdmin();
        if (!admin.tableExists(tableName)) {
            HTableDescriptor td = new HTableDescriptor(tableName);
            HColumnDescriptor basic = new HColumnDescriptor("basic");
            td.addFamily(basic);
            admin.createTable(td);
            System.out.println("Created !");
        }

        // Put value to test_article_java_1
        System.out.println("Prepare to write data to: " + table.getName().toString());
        for (int i = 0; i < 100; i++) {
            Put p = new Put(Bytes.toBytes("ARTICLE" + (System.currentTimeMillis() + 1000) * 1000));
            p.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("ArticleTypeID"), Bytes.toBytes(Long.valueOf(i % 2)));
            table.put(p);
        }

        // scan test_article_java_1
        scanner = table.getScanner(s);
        num = 0;
        for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
            num++;
        }
        System.out.println("Found row: " + num);// 預期 50,結果為 50
}

可見,用 java 寫的數(shù)據(jù),用 java 讀是沒問題的,用 python 寫的數(shù)據(jù)用 python 讀也沒問題。但 java 讀 python 寫的數(shù)據(jù)就存在異常,難道是 python 寫的數(shù)據(jù)和 java 寫的數(shù)據(jù)不一樣?為此分別對比一下 python 和 java 寫入 hbase 的數(shù)據(jù):

圖 2:java 寫入的數(shù)據(jù)

仔細觀察圖 1 和圖 2 中的數(shù)據(jù)可以發(fā)現(xiàn),python 寫入的數(shù)據(jù)中對應的 ArticleTypeID 值為 01,而 java 則是一串 bytes。突然意識到一個問題,hbase 讀寫的時候要求傳入的數(shù)據(jù)類型為 bytes,而使用 python 傳輸?shù)倪^程中這種整形數(shù)據(jù)是直接通過 str() 方法轉成字符串存儲到 hbase 中的,并不是以 bytes 的形式存于 hbase,所以使用 java 用轉化成 bytes 的 filter 讀才沒能得到預期的結果。

正確的 filter 姿勢

既然找到了原因,解決問題就比較簡單了,存儲的時候將整型數(shù)據(jù)全部都通過 struct.pack 方法轉成 bytes 存入,這樣就可以被通用的查詢了,同時 使用 python 查詢的時候也將 filter 中的整型數(shù)值替換成 bytes 格式。

使用 struct.pack 方法將整型轉成 bytes 時,注意選擇使用 big-endian 的 Byte order,即 pack 方法的第一個參數(shù)使用 >。因為 java 官方 client 采用這種字節(jié)序,下面是 Bytes.toBytes 的實現(xiàn)源碼,可見采用的是 big-endian

  /**
   * Convert a long value to a byte array using big-endian.
   *
   * @param val value to convert
   * @return the byte array
   */
  public static byte[] toBytes(long val) {
    byte [] b = new byte[8];
    for (int i = 7; i > 0; i--) {
      b[i] = (byte) val;
      val >>>= 8;
    }
    b[0] = (byte) val;
    return b;

寫入的代碼:

def save_main_v2():
    datas = dict()
    for i in range(100):
        article_type_id = i % 2
        timestamp = time.time() + i
        rowkey = "ARTICLE" + str(timestamp * 1000000)
        data = {
            "basic:" + "ArticleID": str(i),
            "basic:" + "ArticleTypeID": struct.pack('>q', article_type_id),
            "basic:" + "Created": str(timestamp),
        }
        datas[rowkey] = data
    save_batch_events(datas, table="test_article_2")

查詢是的filter:

filter_str = "SingleColumnValueFilter('basic', 'ArticleTypeID', =, 'binary:{value}')".format(value=struct.pack('>q', 1))

這樣就沒有問題了~

總結

使用 python 讀寫 hbase 數(shù)據(jù),直接傳輸整型參數(shù)時,hbase 的 thrift 接口會拋出 TDecodeException: Field 'value(3)' of 'Mutation' needs type 'STRING' 異常,被告知只接受 string 類型的數(shù)據(jù)。這時注意將整型數(shù)據(jù)轉化成 bytes 形式的 str,而不要直接使用 str() 方法強轉,否則難以避免的會出現(xiàn)一些非預期的結果。

以為這樣就沒問題了? 請關注看下文~

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容