定義
Spark是一個高效,通用的大數(shù)據(jù)處理引擎。
背景
- 2009年,Spark誕生于伯克利大學(xué)AMPLab,最初屬于伯克利大學(xué)的研究性項目。
- 2010年,正式開源。
- 2013年,成為了Apache基金項目,同年,基于spark的開源商業(yè)公司Databricks成立。
- 2014年,成為Apache基金的頂級項目。

spark相關(guān)組件
MapReduce & Spark

1.png
七個MapReduce作業(yè)意味著需要七次讀取和寫入HDFS,而它們的輸入輸出數(shù)據(jù)存在關(guān)聯(lián),七個作業(yè)輸入輸出數(shù)據(jù)關(guān)系如下圖。

2.jpg
基于MapReduce實現(xiàn)此算法存在以下問題:
- 為了實現(xiàn)一個業(yè)務(wù)邏輯需要使用七個MapReduce作業(yè),七個作業(yè)間的數(shù)據(jù)交換通過HDFS完成,增加了網(wǎng)絡(luò)和磁盤的開銷。
- 七個作業(yè)都需要分別調(diào)度到集群中運行,增加了Gaia集群的資源調(diào)度開銷。
- MR2和MR3重復(fù)讀取相同的數(shù)據(jù),造成冗余的HDFS讀寫開銷。
這些問題導(dǎo)致作業(yè)運行時間大大增長,作業(yè)成本增加。相比與MapReduce編程模型,Spark提供了更加靈活的DAG(Directed Acyclic Graph) 編程模型, 不僅包含傳統(tǒng)的map、reduce接口, 還增加了filter、flatMap、union等操作接口,使得編寫Spark程序更加靈活方便。使用Spark編程接口實現(xiàn)上述的業(yè)務(wù)邏輯如下圖所示。

3.jpg
相對于MapReduce,Spark在以下方面優(yōu)化了作業(yè)的執(zhí)行時間和資源使用。
- DAG編程模型。 通過Spark的DAG編程模型可以把七個MapReduce簡化為一個Spark作業(yè)。Spark會把該作業(yè)自動切分為八個Stage,每個Stage包含多個可并行執(zhí)行的Tasks。Stage之間的數(shù)據(jù)通過Shuffle傳遞。最終只需要讀取和寫入HDFS一次。減少了六次HDFS的讀寫,讀寫HDFS減少了70%。
- Spark作業(yè)啟動后會申請所需的Executor資源,所有Stage的Tasks以線程的方式運行,共用Executors,相對于MapReduce方式,Spark申請資源的次數(shù)減少了近90%。
- Spark引入了RDD(Resilient Distributed Dataset)模型,中間數(shù)據(jù)都以RDD的形式存儲,而RDD分布存儲于slave節(jié)點的內(nèi)存中,這就減少了計算過程中讀寫磁盤的次數(shù)。RDD還提供了Cache機制,例如對上圖的rdd3進(jìn)行Cache后,rdd4和rdd7都可以訪問rdd3的數(shù)據(jù)。相對于MapReduce減少MR2和MR3重復(fù)讀取相同數(shù)據(jù)的問題。
附(spark統(tǒng)計字符串代碼)
WordCount.java
public class WordCount {
// 比較器,其中的Tuple2是模仿的scala寫法,
// 諸如此類的還有Tuple3,Tuple4,Tuple22
public static class TupleComparator implements Comparator<Tuple2<String, Integer>>, Serializable {
@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
return o2._2.compareTo(o1._2);
}
}
public static void main(String[] args) throws InterruptedException {
// 使用local模式,不需要啟動spark集群
SparkConf sparkConf = new SparkConf().setAppName("wordCount ").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> file = ctx.textFile("分析的文件路徑", 6);
file.persist(StorageLevel.MEMORY_ONLY());
file.cache();
Comparator<Tuple2<String, Integer>> orderCompare = new TupleComparator();
List<Tuple2<String, Integer>> wordToCounts = file
.flatMap(line -> Arrays.asList(line.split("")).iterator())
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))//把分割的內(nèi)容作為key,1作為初始值
.reduceByKey((s1, s2) -> s1 + s2)// 將相同的key進(jìn)行reduce,并將value相加
.takeOrdered(50, orderCompare);
wordToCounts.forEach(line -> System.out.println(line._1() + ":" + line._2()));
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>liao</groupId>
<artifactId>wordcount</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>wordcount</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.1.0</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-client</artifactId>-->
<!--<version>2.6.5</version>-->
<!--<scope>compile</scope>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.ansj</groupId>
<artifactId>ansj_seg</artifactId>
<version>5.1.6</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>