> 新鮮文章,昨天剛經(jīng)過線上驗(yàn)證過的,使用它導(dǎo)出了3億的用戶數(shù)據(jù)出來,花了半個(gè)小時(shí),性能還是穩(wěn)穩(wěn)的,好了不吹牛皮了,直接上代碼吧。
## MR
考查了Hbase的各種MR,沒有發(fā)現(xiàn)哪一個(gè)是能實(shí)現(xiàn)的,如果有請(qǐng)通知我,我給他發(fā)紅包。
所以我們只能自己來寫一個(gè)MR了,編寫一個(gè)Hbase的MR,官方文檔上也有相應(yīng)的例子。
我們用來加以化妝就得到我們想要的了。
導(dǎo)出的CSV格式為
```
admin,22,北京
admin,23,天津
```
依賴 [hbase-mapreduce](https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce)
## 擼scala代碼了
定義Map轉(zhuǎn)換類
```
class MyMapper extends TableMapper[Text, Text] {
? val keyText = new Text()
? val valueText = new Text()
? override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {
? ? val maps = result2Map(value)
? ? keyText.set(maps.get("userId"))
? ? valueText.set(s"${maps.get("regTime")}")
? ? context.write(keyText, valueText)
? }
? //將Result轉(zhuǎn)換為Map
? def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {
? ? val map = new util.HashMap[lang.String, lang.String]()
? ? result.rawCells().foreach {
? ? ? cell =>
? ? ? ? val column: Array[Byte] = CellUtil.cloneQualifier(cell)
? ? ? ? val value: Array[Byte] = CellUtil.cloneValue(cell)
? ? ? ? val qualifierByte = cell.getQualifierArray
? ? ? ? if (qualifierByte != null && qualifierByte.nonEmpty) {
? ? ? ? ? if (value == null || value.length == 0) {
? ? ? ? ? ? map.put(Bytes.toString(column), "")
? ? ? ? ? } else {
? ? ? ? ? ? map.put(Bytes.toString(column), Bytes.toString(value))
? ? ? ? ? }
? ? ? ? }
? ? }
? ? map
? }
}
```
定義Reducer類
```
class MyReducer extends Reducer[Text, Text, Text, Text] {
? override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {
? ? val iter = values.iterator()
? ? while (iter.hasNext) {
? ? //這樣可以只保留下Key字段,也就只有一行數(shù)據(jù)了
? ? ? val tmpText = iter.next()
? ? ? val mergeKey = new Text()
? ? ? mergeKey.set(key.toString + "," + tmpText.toString)
? ? ? val v = new Text()
? ? ? v.set("")
? ? ? context.write(mergeKey, v)
? ? }
? }
}
```
ExportCsv核心
```
class ExportCsv extends Configured with Tool {
? override def run(args: Array[String]): Int = {
? ? val conf = HBaseConfiguration.create()
? ? conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))
? ? conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")
? ? conf.set("mapreduce.job.running.map.limit", "8") //最多有多少個(gè)Task同時(shí)跑
? ? val job = Job.getInstance(conf, "HbaseExportCsv")
? ? job.setJarByClass(classOf[ExportCsv])
? ? val scan = new Scan()
? ? //過濾我們想要的數(shù)據(jù)
? ? scan.addFamily(Bytes.toBytes("ext"))
? ? scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))
? ? scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))
? ? scan.setBatch(1000)
? ? scan.setCacheBlocks(false)
? ? TableMapReduceUtil.initTableMapperJob(
? ? ? "USER_TABLE",
? ? ? scan,
? ? ? classOf[MyMapper],
? ? ? classOf[Text],
? ? ? classOf[Text],
? ? ? job
? ? )
? ? job.setReducerClass(classOf[MyReducer])
? ? val jobConf = new JobConf(job.getConfiguration)
? ? FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))
? ? val isDone = job.waitForCompletion(true)
? ? if (isDone) 0 else 1
? }
}
```
要跑了任務(wù)了
```
hadoop jar ExportCsv.jar
```
---
