spark基礎(chǔ)入門

定義

Spark是一個高效,通用的大數(shù)據(jù)處理引擎。

背景

  • 2009年,Spark誕生于伯克利大學(xué)AMPLab,最初屬于伯克利大學(xué)的研究性項目。
  • 2010年,正式開源。
  • 2013年,成為了Apache基金項目,同年,基于spark的開源商業(yè)公司Databricks成立。
  • 2014年,成為Apache基金的頂級項目。
spark相關(guān)組件

MapReduce & Spark

1.png

七個MapReduce作業(yè)意味著需要七次讀取和寫入HDFS,而它們的輸入輸出數(shù)據(jù)存在關(guān)聯(lián),七個作業(yè)輸入輸出數(shù)據(jù)關(guān)系如下圖。


2.jpg

基于MapReduce實現(xiàn)此算法存在以下問題:

  • 為了實現(xiàn)一個業(yè)務(wù)邏輯需要使用七個MapReduce作業(yè),七個作業(yè)間的數(shù)據(jù)交換通過HDFS完成,增加了網(wǎng)絡(luò)和磁盤的開銷。
  • 七個作業(yè)都需要分別調(diào)度到集群中運行,增加了Gaia集群的資源調(diào)度開銷。
  • MR2和MR3重復(fù)讀取相同的數(shù)據(jù),造成冗余的HDFS讀寫開銷。

這些問題導(dǎo)致作業(yè)運行時間大大增長,作業(yè)成本增加。相比與MapReduce編程模型,Spark提供了更加靈活的DAG(Directed Acyclic Graph) 編程模型, 不僅包含傳統(tǒng)的map、reduce接口, 還增加了filter、flatMap、union等操作接口,使得編寫Spark程序更加靈活方便。使用Spark編程接口實現(xiàn)上述的業(yè)務(wù)邏輯如下圖所示。

3.jpg

相對于MapReduce,Spark在以下方面優(yōu)化了作業(yè)的執(zhí)行時間和資源使用。

  • DAG編程模型。 通過Spark的DAG編程模型可以把七個MapReduce簡化為一個Spark作業(yè)。Spark會把該作業(yè)自動切分為八個Stage,每個Stage包含多個可并行執(zhí)行的Tasks。Stage之間的數(shù)據(jù)通過Shuffle傳遞。最終只需要讀取和寫入HDFS一次。減少了六次HDFS的讀寫,讀寫HDFS減少了70%。
  • Spark作業(yè)啟動后會申請所需的Executor資源,所有Stage的Tasks以線程的方式運行,共用Executors,相對于MapReduce方式,Spark申請資源的次數(shù)減少了近90%。
  • Spark引入了RDD(Resilient Distributed Dataset)模型,中間數(shù)據(jù)都以RDD的形式存儲,而RDD分布存儲于slave節(jié)點的內(nèi)存中,這就減少了計算過程中讀寫磁盤的次數(shù)。RDD還提供了Cache機制,例如對上圖的rdd3進(jìn)行Cache后,rdd4和rdd7都可以訪問rdd3的數(shù)據(jù)。相對于MapReduce減少MR2和MR3重復(fù)讀取相同數(shù)據(jù)的問題。

附(spark統(tǒng)計字符串代碼)

WordCount.java

public class WordCount {
    
    // 比較器,其中的Tuple2是模仿的scala寫法,
    // 諸如此類的還有Tuple3,Tuple4,Tuple22
    public static class TupleComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
        @Override
        public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
            return o2._2.compareTo(o1._2);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 使用local模式,不需要啟動spark集群
        SparkConf sparkConf = new SparkConf().setAppName("wordCount ").setMaster("local[2]");
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        JavaRDD<String> file = ctx.textFile("分析的文件路徑", 6);
        file.persist(StorageLevel.MEMORY_ONLY());
        file.cache();
        Comparator<Tuple2<String, Integer>> orderCompare = new TupleComparator();
        List<Tuple2<String, Integer>> wordToCounts = file
                .flatMap(line -> Arrays.asList(line.split("")).iterator())
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))//把分割的內(nèi)容作為key,1作為初始值
                .reduceByKey((s1, s2) -> s1 + s2)// 將相同的key進(jìn)行reduce,并將value相加
                .takeOrdered(50, orderCompare);
        wordToCounts.forEach(line -> System.out.println(line._1() + ":" + line._2()));
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>liao</groupId>
  <artifactId>wordcount</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>wordcount</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.1.0</spark.version>
  </properties>
  <dependencies>

    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
      <version>2.9.0</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>1.2.4</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>${spark.version}</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
    </dependency>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!--<dependency>-->
    <!--<groupId>org.apache.hadoop</groupId>-->
    <!--<artifactId>hadoop-client</artifactId>-->
    <!--<version>2.6.5</version>-->
    <!--<scope>compile</scope>-->
    <!--</dependency>-->
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>2.1.1</version>
      <scope>compile</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>2.1.1</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.ansj</groupId>
      <artifactId>ansj_seg</artifactId>
      <version>5.1.6</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>net.alchim31.maven</groupId>
          <artifactId>scala-maven-plugin</artifactId>
          <version>3.4.1</version>
        </plugin>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>2.0.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
    <plugins>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>scala-compile-first</id>
            <phase>process-resources</phase>
            <goals>
              <goal>add-source</goal>
              <goal>compile</goal>
            </goals>
          </execution>
          <execution>
            <id>scala-test-compile</id>
            <phase>process-test-resources</phase>
            <goals>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容