總結(jié)一下,避免后面再重復(fù)踩坑。
Spark Streaming是近實時(near real time)的小批處理系統(tǒng), 可以對接各類消息中間或者直接監(jiān)控Hdfs目錄, 可以做為實時大數(shù)據(jù)流式計算,也可以做一些按時間窗口的數(shù)據(jù)聚合分析,比如流量監(jiān)控之類的, 主要的優(yōu)勢是和spark-sql, spark-mlib, spark-graphx無縫結(jié)合的生態(tài)系統(tǒng)。
官方地址: http://spark.apache.org/docs/2.2.0/streaming-programming-guide.html

上游數(shù)據(jù)可以是Kafka, Flume, Hdfs或者是TCP Sockets;處理后的下游數(shù)據(jù)可以是落到HDFS, 數(shù)據(jù)庫, 或者重新寫回消息中間件,隨意處理。
maven環(huán)境
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>`
</dependency>
spark-streaming2.20適配的消息中間件
| Source | Artifact |
|---|---|
| Kafka | spark-streaming-kafka-0-8_2.11 |
| Flume | spark-streaming-flume_2.11 |
| Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
官方給了一些例子:
nc -lk 9999 同一臺機器上socket, 端口9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
Spark Streaming的優(yōu)勢在于:
能運行在100+的結(jié)點上,并達到秒級延遲(最小設(shè)置batch-time為500ms,再小就很容易task大量堆積)。
使用基于內(nèi)存的Spark作為執(zhí)行引擎,具有高效和容錯的特性。
能集成Spark的批處理和交互查詢。
為實現(xiàn)復(fù)雜的算法提供和批處理類似的簡單接口

spark Streaming封裝了kafka的高級接口: Kafka Integration Guide.

DStream是spark-streaming提供的一個抽象數(shù)據(jù)類型, 就是按時間切分的一組有序RDD集合。
關(guān)于更多的概念和方法參考官網(wǎng)教程, 這里總結(jié)一下使用的一些坑和優(yōu)化:
一, kerberos 認證問題:
問題: 我們的hadoop訪問有kerberos的認證機制,默認是7天更換,剛開始沒注意這個問題,spark-streaming的程序每隔一周崩一次
解決:
--deploy-mode 由 yarn-client模式改為yarn-cluster模式;
--keytab /home/xxx/xxx.keytab --principal xxx@cloudera.xxx.com (剛開始客戶端是2.1.0沒生效,升級為2.2.0)
二, 優(yōu)雅結(jié)束:
問題:application被人為中斷,當前batch的數(shù)據(jù)沒處理完
解決:源代碼在spark.stop() 之前加了一個鉤子, 來達到優(yōu)雅退出, 保存斷點checkpoint
--conf spark.streaming.stopGracefullyOnShutdown=true;
也可以自己在JVM關(guān)閉之前添加鉤子, 來附加做一些郵件報警之類的事情(發(fā)送kill命令關(guān)閉driver進程,不要使用(-9)強制關(guān)閉,不然鉤子無法捕獲)
Runtime.getRuntime().addShutdownHook(
new Thread() { override def run() {`
log("Gracefully stop Spark Streaming") `
streamingContext.stop(true, true) } }`
)
三, 數(shù)據(jù)緩存和清除:
cache或者persist的數(shù)據(jù)一定要在foreachRDD中清除掉,不然內(nèi)存爆炸
spark.streaming.unpersist=true 這個配置只是自動推測并清除緩存數(shù)據(jù), 最好還是代碼中處理
四,batch的最大處理量,
根據(jù)內(nèi)存和batchDuration設(shè)定合理的值, 保證batchDuration時間內(nèi)能處理完,不造成堆積, 也和流數(shù)據(jù)大小有關(guān)。
– conf spark.streaming.kafka.maxRatePerPartition=1000
五, 應(yīng)用程序失敗自動重啟次數(shù), 和重試間隔
--conf spark.yarn.maxAppAttempts=4
--conf [spark.yarn.am](http://spark.yarn.am).attemptFailuresValidityInterval=1h
六,使用YARN Capacity Scheduler調(diào)度, 且提交到單獨的Yarn隊列
--queue realtime_queue
七,開啟spark推測執(zhí)行
# 推測執(zhí)行開啟
spark.speculation true
# 檢測周期
spark.speculation.interval 100
# 完成task的百分比時啟動推測
spark.speculation.quantile 0.75
# 比其他的慢多少倍時啟動推測
spark.speculation.multiplier 1.5
八, 避免單個任務(wù)阻塞:
spark.streaming.concurrentJobs=4
九,合理的batchDuration:
不要小于500ms, 太小,會積壓數(shù)據(jù), 太大,實時性不好
十,合理GC: 開啟并行Mark-Sweep垃圾回收機制, 其它的參照JVM的調(diào)優(yōu),減少full-GC
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
十一,計算效率:
實時計算對效率要求很高(不然大量任務(wù)堆積), 所以spark的性能優(yōu)化的方法在這里通用, 比如:
合理的并行度partition, 一般是core的2~5倍, spark。 spark.default.parallelism=200
spark.sql.shuffle.partitions 設(shè)置大一點, 個人比較喜歡spark-sql處理邏輯,這個是sql shuffle時的并行度
spark.rdd.compress=true 緩存時壓縮, 默認是false, 節(jié)省內(nèi)存, 但是增加性能損耗
十二, 代碼優(yōu)化:
根據(jù)實際情況優(yōu)化,在線任務(wù)和離線任務(wù)還是區(qū)別很大的,更多關(guān)注效率。
處理空Batch:
空batch比較多, 不判斷直接寫的話會形成很多空文件
if(rdd.count() != 0) 或者 if(!rdd.partitions.isEmpty)
推薦第二種, 數(shù)據(jù)量比較大時 count很費時間的高性能算子(平時要加強總結(jié)):
groupByKey → reduceByKey/aggregateByKey
map → mapPartitions
foreachPartitions → foreach
- 序列化(廣播變量, cache, 自定義對象):
通常圖省事, 直接繼承 java的Serializable 接口。
Spark支持使用Kryo序列化機制, 大概效率是java序列化的10倍, 變少網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù),減少在集群中耗費的內(nèi)存資源。
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrationRequired=true // 應(yīng)用的類沒有注冊會報錯,默認false
-
使用:需要先注冊算子里邊用到的類,不然會存儲每個對象的全類名(full class name),這樣的使用方式往往比默認的 Java serialization 還要浪費更多的空間。
- 需要序列化的類繼承 java.io.Serializable
- 注冊類繼承KryoRegistrato并且注冊那些需要序列化的類
- 在sparkConf中設(shè)置spark.serializer和spark.kryo.registrator
十三,其它
checkpoint: http://bit1129.iteye.com/blog/2217505 沒用到自帶的checkpoint機制
Kyro序列化
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
case class UserInfo(name: String ,age: Int,gender: String, addr: String)
class MyRegisterKryo extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[UserInfo])
}
}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* 需要序列化的類繼承java.io.Serializable
* 注冊類繼承KryoRegistrator并且注冊那些需要序列化的類
* 在sparkConf中設(shè)置spark.serializer和spark.kryo.registrator
*/
object KyroExample {
def kyroExample() {
val conf = new SparkConf().setMaster("local[1]").setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "tools.MyRegisterKryo")
conf.registerKryoClasses(Array(classOf[UserInfo], classOf[scala.collection.mutable.WrappedArray.ofRef[_]]))
val sc = new SparkContext(conf)
val arr = new ArrayBuffer[UserInfo]()
val nameArr = Array[String]("lsw","yyy","lss")
val genderArr = Array[String]("male","female")
val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")
for(i <- 1 to 1000){
val name = nameArr(Random.nextInt(3))
val age = Random.nextInt(100)
val gender = genderArr(Random.nextInt(2))
val address = addressArr(Random.nextInt(5))
arr.+=(UserInfo(name,age,gender,address))
}
val start = System.currentTimeMillis()
val rdd = sc.parallelize(arr)
//序列化的方式將rdd存到內(nèi)存
rdd.persist(StorageLevel.MEMORY_ONLY_SER)
println(System.currentTimeMillis() - start)
sc.stop()
}
}
- 用Kyro來讀寫硬盤文件 https://www.iteblog.com/archives/1328.html
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //使用Kryo序列化庫
val sc = new SparkContext(conf)
def saveAsObjectFile[T: ClassTag](rdd: RDD[T], path: String) {
val kryoSerializer = new KryoSerializer(rdd.context.getConf) // KryoSerializer對象, rdd.context.getConf獲取緩存大小
rdd.mapPartitions(iter => iter.grouped(10)
.map(_.toArray))
.map(splitArray => {
//initializes kyro and calls your registrator class
val kryo = kryoSerializer.newKryo() //map種創(chuàng)建Kryo實例, 線程不安全,只能放在map或者mappartition中
//convert data to bytes
val bao = new ByteArrayOutputStream()
val output = kryoSerializer.newKryoOutput()
output.setOutputStream(bao)
kryo.writeClassAndObject(output, splitArray)
output.close()
// We are ignoring key field of sequence file
val byteWritable = new BytesWritable(bao.toByteArray)
(NullWritable.get(), byteWritable)
}).saveAsSequenceFile(path)
}
def objectFile[T](sc: SparkContext, path: String, minPartitions: Int = 1)
(implicit ct: ClassTag[T]) = {
val kryoSerializer = new KryoSerializer(sc.getConf)
sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable],
minPartitions)
.flatMap(x => {
val kryo = kryoSerializer.newKryo()
val input = new Input()
input.setBuffer(x._2.getBytes)
val data = kryo.readClassAndObject(input)
val dataObject = data.asInstanceOf[Array[T]]
dataObject
})
}
參考:
Kryo讀寫硬盤: https://www.iteblog.com/archives/1328.html
Kryo使用: https://blog.csdn.net/cjuexuan/article/details/51485427