Hive和Java API操作HBase實踐

本博客采用創(chuàng)作共用版權(quán)協(xié)議, 要求署名、非商業(yè)用途和保持一致. 轉(zhuǎn)載本博客文章必須也遵循署名-非商業(yè)用途-保持一致的創(chuàng)作共用協(xié)議.

由于五一假期, 成文較為簡略, 一些細(xì)節(jié)部分并沒有詳細(xì)介紹, 如有需求, 可以參考之前幾篇相當(dāng)MapRuduce主題的博文.

HBase實踐

  • 修改MapReduce階段倒排索引的信息通過文件輸出, 而每個詞極其對應(yīng)的平均出現(xiàn)次數(shù)信息寫入到Hbase的表Wuxia中(具體的要求可以查看之前的博文MapReduce實戰(zhàn)之倒排索引)
  • 編寫Java程序, 遍歷上一步保存在HBase中的表, 并把表格的內(nèi)容保存到本地文件中.
  • Hive使用Hive Shell命令行創(chuàng)建表(表名: Wuxia, (word string, count double)), 導(dǎo)入平均出現(xiàn)次數(shù)的數(shù)據(jù)
    • 查詢出現(xiàn)次數(shù)大于300的詞語
    • 查詢前100個出現(xiàn)次數(shù)最多的數(shù)
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;



public class InvertedIndexHbase {
    //創(chuàng)建表并進行簡單配置
    public static void createHBaseTable(Configuration conf, String tablename) throws IOException {
//      HBaseConfiguration configuration = new HBaseConfiguration();
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tablename)) {  //如果表已經(jīng)存在
            System.out.println("table exits, Trying recreate table!");
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
        HTableDescriptor htd = new HTableDescriptor(tablename); //row
        HColumnDescriptor col = new HColumnDescriptor("content"); //列族
        htd.addFamily(col); //創(chuàng)建列族
        System.out.println("Create new table: " + tablename);
        admin.createTable(htd); //創(chuàng)建表
    }
    //map函數(shù)不變
    public static class Map 
    extends Mapper<Object, Text, Text, Text> {
        private Text keyWord = new Text();
        private Text valueDocCount = new Text();

        public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
            //獲取文檔
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreTokens()) {
                keyWord.set(itr.nextToken() + ":" + fileName);  // key為key#doc
                valueDocCount.set("1"); // value為詞頻
                context.write(keyWord, valueDocCount);
            }
        }
    }
    //combine函數(shù)不變
    public static class InvertedIndexCombiner
        extends Reducer<Text, Text, Text, Text> {
        private Text wordCount = new Text();
        private Text wordDoc = new Text();
        //將key-value轉(zhuǎn)換為word-doc:詞頻
        public void reduce(Text key, Iterable<Text> values, 
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");  // 找到:的位置
            wordDoc.set(key.toString().substring(0, splitIndex));  //key變?yōu)閱卧~
            wordCount.set(sum + "");  //value變?yōu)閐oc:詞頻
            context.write(wordDoc, wordCount);
        }
    }
    //reduce將數(shù)據(jù)存入HBase
    public static class Reduce
        extends TableReducer<Text, Text, NullWritable> {
        private Text temp = new Text();

        public void reduce(Text key, Iterable<Text> values, 
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            Iterator<Text> it = values.iterator();
            //形成最終value
            for(;it.hasNext();) { 
                count++;
                temp.set(it.next());
                sum += Integer.parseInt(temp.toString());
            }
            float averageCount = (float)sum / (float)count;
            FloatWritable average = new FloatWritable(averageCount);
            //加入row為key.toString()
            Put put = new Put(Bytes.toBytes(key.toString()));  //Put實例, 每一詞存一行
            //列族為content, 列修飾符為average表示平均出現(xiàn)次數(shù), 列值為平均出現(xiàn)次數(shù)
            put.add(Bytes.toBytes("content"), Bytes.toBytes("average"), Bytes.toBytes(average.toString()));
            context.write(NullWritable.get(), put); 
        }
    }

    public static void main(String[] args) throws Exception {
        String tablename = "Wuxia";
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        createHBaseTable(conf, tablename);
        Job job = Job.getInstance(conf, "Wuxia");  //配置作業(yè)名
        //配置作業(yè)的各個類
        job.setJarByClass(InvertedIndexHbase.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(Reduce.class);
//        TableMapReduceUtil.initTableReducerJob(tablename, Reduce.class, job);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}

然后在Hadoop執(zhí)行操作.

$ hdfs dfs -mkdir /user
$ hdfs dfs -mkdir /user/input
$ hdfs dfs -put /Users/andrew_liu/Java/Hadoop/wuxia_novels/*  /user/input
$ hadoop jar WorkSpace/InvertedIndexHbase.jar InvertedIndexHbase  /user/input output1

執(zhí)行成功結(jié)束后, 打開HBase Shell的操作

$ hbase shell
> scan 'Wuxia'

HBase中數(shù)據(jù)寫入本地文件

import java.io.FileWriter;
import java.io.IOException;
import java.io.FileWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;


public class Hbase2Local {
    static Configuration conf = HBaseConfiguration.create();
    public static void getResultScan(String tableName, String filePath) throws IOException {
        Scan scan = new Scan();
        ResultScanner rs = null;
        HTable table =  new HTable(conf, Bytes.toBytes(tableName));
        try {
            rs = table.getScanner(scan);
            FileWriter fos = new FileWriter(filePath, true);
            for (Result r : rs) {
//              System.out.println("獲得rowkey: " + new String(r.getRow()));
                for (KeyValue kv : r.raw()) {
//                  System.out.println("列: " + new String(kv.getFamily()) + "  值: " + new String(kv.getValue()));
                    String s = new String(r.getRow() + "\t" + kv.getValue() + "\n");
                    fos.write(s);
                }
            }
            fos.close();
        } catch (IOException e) {
            // TODO: handle exception
            e.printStackTrace();
        }
        rs.close();
    }
    public static void main(String[] args) throws Exception {
        String tableName = "Wuxia";
        String filePath = "/Users/andrew_liu/Java/WorkSpace/Hbaes2Local/bin/Wuxia";
        getResultScan(tableName, filePath);
    }
}

Hive實踐

將本地數(shù)據(jù)導(dǎo)入Hive

hive> create table Wuxia(word string, count double) row format delimited fields terminated by '\t' stored as textfile;
Time taken: 0.049 seconds
hive> load data local inpath '/Users/andrew_liu/Downloads/Wuxia.txt' into table Wuxia;
Loading data to table default.wuxia
Table default.wuxia stats: [numFiles=1, totalSize=2065188]
OK
Time taken: 0.217 seconds

輸出出現(xiàn)次數(shù)大于300的詞語

select * from Wuxia order by count desc limit 100;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容