Spark Structured Streaming寫Hive HBase Mysql

組件版本

  • spark版本 2.3.1 (hdp)
  • hadoop 3.1.1 (hdp)
  • HDP hive 3.1.2
  • HBase 2.0.0
  • mysql 版本5.x

使用Spark Structured Streaming讀取kafka的數(shù)據(jù)寫入hive、HBase和MySQL在spark里沒有原生支持,整理實(shí)測。

  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.insight.spark</groupId>
    <artifactId>SparkDemo</artifactId>
    <version>1.1</version>

    <properties>
        <encoding>UTF-8</encoding>
        <spark.version>2.3.1</spark.version>
    </properties>

    <dependencies>

        <!-- https://mvnrepository.com/artifact/com.sun.jersey/jersey-core -->
        <dependency>
            <groupId>com.sun.jersey</groupId>
            <artifactId>jersey-client</artifactId>
            <version>1.19</version>
        </dependency>

        <!-- Spark核心庫 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>jersey-client</artifactId>
                    <groupId>org.glassfish.jersey.core</groupId>
                </exclusion>
            </exclusions>
            <!-- <scope>provided</scope>-->
        </dependency>
        <!--Spark sql庫 提供DF類API -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--HBase相關(guān)庫-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core</artifactId>
            <version>3.1.0-incubating</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>
        
        <!--spark與hive交互 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>



    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <!-- If you have classpath issue like NoDefClassError,... -->
                    <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

主要代碼與使用方法

Usage: StructuredKafkaWordCount <bootstrap-servers> <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1] ……
程序接收多個(gè)參數(shù):第一個(gè)是kafka的broker地址,第二個(gè)是消費(fèi)的topic名稱、第三個(gè)是輸出類型,有4種,用 0 1 2 3 表示,第4個(gè)是checkpoint的路徑,后續(xù)更多的參數(shù)可以傳遞給連接mysql使用。程序的邏輯是接收kafka的消息,做wordcount處理后輸出結(jié)果。

package com.insight.spark.streaming

import com.insight.spark.util.ConfigLoader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

import java.sql
import java.sql.{DriverManager, PreparedStatement}
import java.util.UUID

object StructuredStreamingTest {
  System.setProperty("HADOOP_USER_NAME","hdfs")
  val conf: Configuration = HBaseConfiguration.create()

  def main(args: Array[String]): Unit = {
    SetLogLevel.setStreamingLogLevels()
    if (args.length < 2) {
      System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        " <topics> <number 0/1/2/3> [<checkpoint-location> eg:/tmp/temp-spark-1]")
      System.exit(1)
    }

    val Array(bootstrapServers, topics, number, _*) = args
    val checkpointLocation =
      if (args.length > 3) args(3) else "/tmp/temp-spark-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredKafkaWordCount")
      .master("local[*]")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topics)
      .option("startingOffsets", "latest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Generate running word count
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    /**
      * Start running the query with user params:0 1 2 3
      * 1:結(jié)果寫入hive
      * 2:結(jié)果寫入hbase
      * 3:結(jié)果寫入mysql
      * 0/other:console 結(jié)果打印到控制臺(tái)
      */
    val dsw = number match {
      //寫hive
      case "1" =>
        wordCounts.writeStream
          .outputMode("complete")
          .trigger(Trigger.ProcessingTime("10 seconds"))//批次時(shí)間
          .format("com.insight.spark.streaming.HiveSinkProvider")//自定義HiveSinkProvider
          .option("checkpointLocation", checkpointLocation)
          .queryName("write hive")

      case "2" =>
        wordCounts.writeStream
          .outputMode("update")
          .foreach(new ForeachWriter[Row] {
            var connection: Connection = _

            def open(partitionId: Long, version: Long): Boolean = {
              conf.set("hbase.zookeeper.quorum", ConfigLoader.getString("hbase.zookeeper.list"))
              conf.set("hbase.zookeeper.property.clientPort", ConfigLoader.getString("hbase.zookeeper.port"))
              conf.set("zookeeper.znode.parent", ConfigLoader.getString("zookeeper.znode.parent"))
              import org.apache.hadoop.hbase.client.ConnectionFactory
              connection = ConnectionFactory.createConnection(conf)
              true
            }

            def process(record: Row): Unit = {
              val tableName = TableName.valueOf(ConfigLoader.getString("hbase.table.name")) //表名
              val table = connection.getTable(tableName)
              val put = new Put(Bytes.toBytes(record.mkString))
              put.addColumn("info".getBytes(), Bytes.toBytes("word"), Bytes.toBytes(record(0).toString))
              put.addColumn("info".getBytes(), Bytes.toBytes("count"), Bytes.toBytes(record(1).toString))
              table.put(put)
            }

            def close(errorOrNull: Throwable): Unit = {
              connection.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write hbase")

      case "3" =>

        /** 建表語句,先建個(gè)spark庫
          * CREATE TABLE `words` (
          * `id` int(11) NOT NULL AUTO_INCREMENT,
          * `word` varchar(255) NOT NULL,
          * `count` int(11) DEFAULT 0,
          * PRIMARY KEY (`id`),
          * UNIQUE KEY `word` (`word`)
          * ) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;
          */
        val (url, user, pwd) = (args(4), args(5), args(6))
        wordCounts.writeStream
          .outputMode("complete")
          .foreach(new ForeachWriter[Row] {
            var conn: sql.Connection = _
            var p: PreparedStatement = _
            def open(partitionId: Long, version: Long): Boolean = {
              Class.forName("com.mysql.jdbc.Driver")
              conn = DriverManager.getConnection(url, user, pwd)
              p = conn.prepareStatement("replace into spark.words(word,count) values(?,?)")
              true
            }

            def process(record: Row): Unit = {
              p.setString(1, record(0).toString)
              p.setInt(2, record(1).toString.toInt)
              p.execute()
            }

            def close(errorOrNull: Throwable): Unit = {
              p.close()
              conn.close()
            }
          })
          .option("checkpointLocation", checkpointLocation)
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .queryName("write mysql")

      case _ =>
        wordCounts.writeStream
          .outputMode("update")
          .format("console")
          .trigger(Trigger.ProcessingTime("10 seconds"))
          .option("checkpointLocation", checkpointLocation)
          .queryName("print it")

    }

    dsw.start().awaitTermination()

  }
}

HiveSinkProvider源碼

其中用到的HiveSinkProvider代碼如下:

package com.insight.spark.streaming

import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.slf4j.LoggerFactory


case class HiveSink(sqlContext: SQLContext,
                    parameters: Map[String, String],
                    partitionColumns: Seq[String],
                    outputMode: OutputMode) extends Sink {

  override def addBatch(batchId: Long, data: DataFrame): Unit = {
    val logger = LoggerFactory.getLogger(this.getClass)

    val schema = StructType(Array(
      StructField("word", StringType),
      StructField("count", IntegerType)
    ))
    val res = data.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row])
    }
    // 轉(zhuǎn)化df格式
    val df = data.sparkSession.createDataFrame(res, schema)
    df.write.mode(SaveMode.Append).format("hive").saveAsTable("words")

  }
}

class HiveSinkProvider extends StreamSinkProvider with DataSourceRegister {
  override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
    HiveSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "HiveSinkProvider"
}

打包運(yùn)行,spark-submit --xxx this.jar ...就可以了。

點(diǎn):結(jié)構(gòu)化流、Spark Structured Streaming、hive、hbase、mysql
線:spark
面:內(nèi)存計(jì)算

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 17.分區(qū)分桶的區(qū)別,為什么要分區(qū) 分區(qū)表:原來的一個(gè)大表存儲(chǔ)的時(shí)候分成不同的數(shù)據(jù)目錄進(jìn)行存儲(chǔ)。如果說是單分區(qū)表,...
    qydong閱讀 764評(píng)論 0 0
  • 表情是什么,我認(rèn)為表情就是表現(xiàn)出來的情緒。表情可以傳達(dá)很多信息。高興了當(dāng)然就笑了,難過就哭了。兩者是相互影響密不可...
    Persistenc_6aea閱讀 129,557評(píng)論 2 7
  • 16宿命:用概率思維提高你的勝算 以前的我是風(fēng)險(xiǎn)厭惡者,不喜歡去冒險(xiǎn),但是人生放棄了冒險(xiǎn),也就放棄了無數(shù)的可能。 ...
    yichen大刀閱讀 7,718評(píng)論 0 4

友情鏈接更多精彩內(nèi)容