使用 HBase 的 FilterList 過濾器

package wmstat.trip

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import wmutils.WmTimeUtil._

import scala.collection.mutable.ArrayBuffer

object HBaseSpark {
  def main(args:Array[String]): Unit ={

    // 本地模式運行,便于測試
    val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")

    // 創(chuàng)建 HBase 掃描器
    val scan = new Scan()
//    val filter=new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new RegexStringComparator("^[a-zA-Z0-9]+_20180903[0-9]{6}$")) //使用正則表達式過濾近一個月的
//    scan.setFilter(filter)

    // 過去 7 天
    val arrayWeek: ArrayBuffer[String] =  lastestNdays("", 7)

    // filterList
    val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);

    for (d <- arrayWeek) {
      filterList.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator( "_" + d)))
    }

    scan.setFilter(filterList)

    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("vin"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripStatus"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripStartTime"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripEndTime"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripDistance"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("startSoc"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("endSoc"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("maxSpeed"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("startMileage"))
    scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("coordinate"))

    var proto = ProtobufUtil.toScan(scan)
    var scanToString = Base64.encodeBytes(proto.toByteArray());

    // 創(chuàng)建hbase configuration
    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set(TableInputFormat.INPUT_TABLE,"trip_signal")
    hBaseConf.set(TableInputFormat.SCAN, scanToString)


    // 創(chuàng)建 spark context
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 從數(shù)據(jù)源獲取數(shù)據(jù)
    val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

    // 將數(shù)據(jù)映射為表  也就是將 RDD轉(zhuǎn)化為 dataframe schema
    val df = hbaseRDD.map(r=>(
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("vin"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripStatus"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripStartTime"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripEndTime"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripDistance"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("startSoc"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("endSoc"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("maxSpeed"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("startMileage"))),
      Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("coordinate")))
    )).toDF("vin","tripStatus", "tripStartTime", "tripEndTime", "tripDistance", "startSoc", "endSoc", "maxSpeed", "startMileage", "coordinate")


    df.show(500)

    sc.stop()
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容