在WeTest輿情項(xiàng)目中,需要對(duì)每天千萬(wàn)級(jí)的游戲評(píng)論信息進(jìn)行詞頻統(tǒng)計(jì),在生產(chǎn)者一端,我們將數(shù)據(jù)按照每天的拉取時(shí)間存入了Kafka當(dāng)中,而在消費(fèi)者一端,我們利用了spark streaming從kafka中不斷拉取數(shù)據(jù)進(jìn)行詞頻統(tǒng)計(jì)。本文首先對(duì)spark streaming嵌入kafka的方式進(jìn)行歸納總結(jié),之后簡(jiǎn)單闡述Spark streaming+kafka在輿情項(xiàng)目中的應(yīng)用,最后將自己在Spark Streaming+kafka的實(shí)際優(yōu)化中的一些經(jīng)驗(yàn)進(jìn)行歸納總結(jié)。(如有任何紕漏歡迎補(bǔ)充來(lái)踩,我會(huì)第一時(shí)間改正^v^)
Spark streaming接收Kafka數(shù)據(jù)
用spark streaming流式處理kafka中的數(shù)據(jù),第一步當(dāng)然是先把數(shù)據(jù)接收過(guò)來(lái),轉(zhuǎn)換為spark streaming中的數(shù)據(jù)結(jié)構(gòu)Dstream。接收數(shù)據(jù)的方式有兩種:1.利用Receiver接收數(shù)據(jù),2.直接從kafka讀取數(shù)據(jù)。
這種方式利用接收器(Receiver)來(lái)接收kafka中的數(shù)據(jù),其最基本是使用Kafka高階用戶API接口。對(duì)于所有的接收器,從kafka接收來(lái)的數(shù)據(jù)會(huì)存儲(chǔ)在spark的executor中,之后spark streaming提交的job會(huì)處理這些數(shù)據(jù)。如下圖:

在使用時(shí),我們需要添加相應(yīng)的依賴包:
org.apache.sparkspark-streaming-kafka_2.101.6.3
而對(duì)于Scala的基本使用方式如下:
import org.apache.spark.streaming.kafka._valkafkaStream = KafkaUtils.createStream(streamingContext,? ? ? [ZK quorum], [consumer group id], [per-topic numberofKafka partitionstoconsume])
還有幾個(gè)需要注意的點(diǎn):
在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關(guān)的,所以如果我們加大每個(gè)topic的partition數(shù)量,僅僅是增加線程來(lái)處理由單一Receiver消費(fèi)的主題。但是這并沒(méi)有增加Spark在處理數(shù)據(jù)上的并行度。
對(duì)于不同的Group和topic我們可以使用多個(gè)Receiver創(chuàng)建不同的Dstream來(lái)并行接收數(shù)據(jù),之后可以利用union來(lái)統(tǒng)一成一個(gè)Dstream。
如果我們啟用了Write Ahead Logs復(fù)制到文件系統(tǒng)如HDFS,那么storage level需要設(shè)置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式?jīng)]有receiver這一層,其會(huì)周期性的獲取Kafka中每個(gè)topic的每個(gè)partition中的最新offsets,之后根據(jù)設(shè)定的maxRatePerPartition來(lái)處理每個(gè)batch。其形式如下圖:

這種方法相較于Receiver方式的優(yōu)勢(shì)在于:
簡(jiǎn)化的并行:在Receiver的方式中我們提到創(chuàng)建多個(gè)Receiver之后利用union來(lái)合并成一個(gè)Dstream的方式提高數(shù)據(jù)傳輸并行度。而在Direct方式中,Kafka中的partition與RDD中的partition是一一對(duì)應(yīng)的并行讀取Kafka數(shù)據(jù),這種映射關(guān)系也更利于理解和優(yōu)化。
高效:在Receiver的方式中,為了達(dá)到0數(shù)據(jù)丟失需要將數(shù)據(jù)存入Write Ahead Log中,這樣在Kafka和日志中就保存了兩份數(shù)據(jù),浪費(fèi)!而第二種方式不存在這個(gè)問(wèn)題,只要我們Kafka的數(shù)據(jù)保留時(shí)間足夠長(zhǎng),我們都能夠從Kafka進(jìn)行數(shù)據(jù)恢復(fù)。
精確一次:在Receiver的方式中,使用的是Kafka的高階API接口從Zookeeper中獲取offset值,這也是傳統(tǒng)的從Kafka中讀取數(shù)據(jù)的方式,但由于Spark Streaming消費(fèi)的數(shù)據(jù)和Zookeeper中記錄的offset不同步,這種方式偶爾會(huì)造成數(shù)據(jù)重復(fù)消費(fèi)。而第二種方式,直接使用了簡(jiǎn)單的低階Kafka API,Offsets則利用Spark Streaming的checkpoints進(jìn)行記錄,消除了這種不一致性。
以上主要是對(duì)官方文檔[1]的一個(gè)簡(jiǎn)單翻譯,詳細(xì)內(nèi)容大家可以直接看下官方文檔這里不再贅述。
不同于Receiver的方式,是從Zookeeper中讀取offset值,那么自然zookeeper就保存了當(dāng)前消費(fèi)的offset值,那么如果重新啟動(dòng)開(kāi)始消費(fèi)就會(huì)接著上一次offset值繼續(xù)消費(fèi)。而在Direct的方式中,我們是直接從kafka來(lái)讀數(shù)據(jù),那么offset需要自己記錄,可以利用checkpoint、數(shù)據(jù)庫(kù)或文件記錄或者回寫(xiě)到zookeeper中進(jìn)行記錄。這里我們給出利用Kafka底層API接口,將offset及時(shí)同步到zookeeper中的通用類,我將其放在了github上:
示例中KafkaManager是一個(gè)通用類,而KafkaCluster是kafka源碼中的一個(gè)類,由于包名權(quán)限的原因我把它單獨(dú)提出來(lái),ComsumerMain簡(jiǎn)單展示了通用類的使用方法,在每次創(chuàng)建KafkaStream時(shí),都會(huì)先從zooker中查看上次的消費(fèi)記錄offsets,而每個(gè)batch處理完成后,會(huì)同步offsets到zookeeper中。
Spark向kafka中寫(xiě)入數(shù)據(jù)
上文闡述了Spark如何從Kafka中流式的讀取數(shù)據(jù),下面我整理向Kafka中寫(xiě)數(shù)據(jù)。與讀數(shù)據(jù)不同,Spark并沒(méi)有提供統(tǒng)一的接口用于寫(xiě)入Kafka,所以我們需要使用底層Kafka接口進(jìn)行包裝。
最直接的做法我們可以想到如下這種方式:
input.foreachRDD(rdd =>// 不能在這里創(chuàng)建KafkaProducerrdd.foreachPartition(partition =>? ? partition.foreach{casex:String=>{? ? ? ? val props =newHashMap[String,Object]()? ? ? ? props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)? ? ? ? props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")? ? ? ? props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")? ? ? ? println(x)? ? ? ? val producer =newKafkaProducer[String,String](props)? ? ? ? val message=newProducerRecord[String,String]("output",null,x)? ? ? ? producer.send(message)? ? ? }? ? }? ))
但是這種方式缺點(diǎn)很明顯,對(duì)于每個(gè)partition的每條記錄,我們都需要?jiǎng)?chuàng)建KafkaProducer,然后利用producer進(jìn)行輸出操作,注意這里我們并不能將KafkaProducer的新建任務(wù)放在foreachPartition外邊,因?yàn)镵afkaProducer是不可序列化的(not serializable)。顯然這種做法是不靈活且低效的,因?yàn)槊織l記錄都需要建立一次連接。如何解決呢?
首先,我們需要將KafkaProducer利用lazy val的方式進(jìn)行包裝如下:
importjava.util.concurrent.Futureimportorg.apache.kafka.clients.producer.{KafkaProducer,ProducerRecord,RecordMetadata}classKafkaSink[K,V](createProducer: ()=>KafkaProducer[K,V])extendsSerializable{/* This is the key idea that allows us to work around running into
? ? NotSerializableExceptions. */lazyvalproducer = createProducer()defsend(topic:String, key:K, value:V):Future[RecordMetadata] =? ? producer.send(newProducerRecord[K,V](topic, key, value))defsend(topic:String, value:V):Future[RecordMetadata] =? ? producer.send(newProducerRecord[K,V](topic, value))}objectKafkaSink{importscala.collection.JavaConversions._defapply[K,V](config:Map[String,Object]):KafkaSink[K,V] = {valcreateProducerFunc = () => {valproducer =newKafkaProducer[K,V](config)? ? ? sys.addShutdownHook {// Ensure that, on executor JVM shutdown, the Kafka producer sends// any buffered messages to Kafka before shutting down.producer.close()? ? ? }? ? ? producer? ? }newKafkaSink(createProducerFunc)? }defapply[K,V](config: java.util.Properties):KafkaSink[K,V] = apply(config.toMap)}
之后我們利用廣播變量的形式,將KafkaProducer廣播到每一個(gè)executor,如下:
// 廣播KafkaSinkval kafkaProducer: Broadcast[KafkaSink[String, String]] = {? val kafkaProducerConfig = {? ? val p = new Properties()? ? p.setProperty("bootstrap.servers", Conf.brokers)? ? p.setProperty("key.serializer", classOf[StringSerializer].getName)? ? p.setProperty("value.serializer", classOf[StringSerializer].getName)? ? p}? log.warn("kafka producer init done!")? ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}
這樣我們就能在每個(gè)executor中愉快的將數(shù)據(jù)輸入到kafka當(dāng)中:
//輸出到kafkasegmentedStream.foreachRDD(rdd => {if(!rdd.isEmpty) {? ? rdd.foreach(record => {? ? ? kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)// do something else})? }})
WeTest輿情監(jiān)控對(duì)于每天爬取的千萬(wàn)級(jí)游戲玩家評(píng)論信息都要實(shí)時(shí)的進(jìn)行詞頻統(tǒng)計(jì),對(duì)于爬取到的游戲玩家評(píng)論數(shù)據(jù),我們會(huì)生產(chǎn)到Kafka中,而另一端的消費(fèi)者我們采用了Spark Streaming來(lái)進(jìn)行流式處理,首先利用上文我們闡述的Direct方式從Kafka拉取batch,之后經(jīng)過(guò)分詞、統(tǒng)計(jì)等相關(guān)處理,回寫(xiě)到DB上(至于Spark中DB的回寫(xiě)方式可參考我之前總結(jié)的博文:Spark踩坑記——數(shù)據(jù)庫(kù)(Hbase+Mysql)),由此高效實(shí)時(shí)的完成每天大量數(shù)據(jù)的詞頻統(tǒng)計(jì)任務(wù)。
Spark streaming+Kafka調(diào)優(yōu)
Spark streaming+Kafka的使用中,當(dāng)數(shù)據(jù)量較小,很多時(shí)候默認(rèn)配置和使用便能夠滿足情況,但是當(dāng)數(shù)據(jù)量大的時(shí)候,就需要進(jìn)行一定的調(diào)整和優(yōu)化,而這種調(diào)整和優(yōu)化本身也是不同的場(chǎng)景需要不同的配置。
幾乎所有的Spark Streaming調(diào)優(yōu)文檔都會(huì)提及批處理時(shí)間的調(diào)整,在StreamingContext初始化的時(shí)候,有一個(gè)參數(shù)便是批處理時(shí)間的設(shè)定。如果這個(gè)值設(shè)置的過(guò)短,即個(gè)batchDuration所產(chǎn)生的Job并不能在這期間完成處理,那么就會(huì)造成數(shù)據(jù)不斷堆積,最終導(dǎo)致Spark Streaming發(fā)生阻塞。而且,一般對(duì)于batchDuration的設(shè)置不會(huì)小于500ms,因?yàn)檫^(guò)小會(huì)導(dǎo)致SparkStreaming頻繁的提交作業(yè),對(duì)整個(gè)streaming造成額外的負(fù)擔(dān)。在平時(shí)的應(yīng)用中,根據(jù)不同的應(yīng)用場(chǎng)景和硬件配置,我設(shè)在1~10s之間,我們可以根據(jù)SparkStreaming的可視化監(jiān)控界面,觀察Total Delay來(lái)進(jìn)行batchDuration的調(diào)整,如下圖:

合理的Kafka拉取量(maxRatePerPartition重要)
對(duì)于Spark Streaming消費(fèi)kafka中數(shù)據(jù)的應(yīng)用場(chǎng)景,這個(gè)配置是非常關(guān)鍵的,配置參數(shù)為:spark.streaming.kafka.maxRatePerPartition。這個(gè)參數(shù)默認(rèn)是沒(méi)有上線的,即kafka當(dāng)中有多少數(shù)據(jù)它就會(huì)直接全部拉出。而根據(jù)生產(chǎn)者寫(xiě)入Kafka的速率以及消費(fèi)者本身處理數(shù)據(jù)的速度,同時(shí)這個(gè)參數(shù)需要結(jié)合上面的batchDuration,使得每個(gè)partition拉取在每個(gè)batchDuration期間拉取的數(shù)據(jù)能夠順利的處理完畢,做到盡可能高的吞吐量,而這個(gè)參數(shù)的調(diào)整可以參考可視化監(jiān)控界面中的Input Rate和Processing Time,如下圖:


Spark中的RDD和SparkStreaming中的Dstream,如果被反復(fù)的使用,最好利用cache(),將該數(shù)據(jù)流緩存起來(lái),防止過(guò)度的調(diào)度資源造成的網(wǎng)絡(luò)開(kāi)銷(xiāo)??梢詤⒖加^察Scheduling Delay參數(shù),如下圖:

長(zhǎng)期使用Java的小伙伴都知道,JVM中的垃圾回收機(jī)制,可以讓我們不過(guò)多的關(guān)注與內(nèi)存的分配回收,更加專注于業(yè)務(wù)邏輯,JVM都會(huì)為我們搞定。對(duì)JVM有些了解的小伙伴應(yīng)該知道,在Java虛擬機(jī)中,將內(nèi)存分為了初生代(eden generation)、年輕代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗費(fèi)一定時(shí)間的,尤其是老年代的GC回收,需要對(duì)內(nèi)存碎片進(jìn)行整理,通常采用標(biāo)記-清楚的做法。同樣的在Spark程序中,JVM GC的頻率和時(shí)間也是影響整個(gè)Spark效率的關(guān)鍵因素。在通常的使用中建議:
--conf"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"
CPU的core數(shù)量,每個(gè)executor可以占用一個(gè)或多個(gè)core,可以通過(guò)觀察CPU的使用率變化來(lái)了解計(jì)算資源的使用情況,例如,很常見(jiàn)的一種浪費(fèi)是一個(gè)executor占用了多個(gè)core,但是總的CPU使用率卻不高(因?yàn)橐粋€(gè)executor并不總能充分利用多核的能力),這個(gè)時(shí)候可以考慮讓么個(gè)executor占用更少的core,同時(shí)worker下面增加更多的executor,或者一臺(tái)host上面增加更多的worker來(lái)增加并行執(zhí)行的executor的數(shù)量,從而增加CPU利用率。但是增加executor的時(shí)候需要考慮好內(nèi)存消耗,因?yàn)橐慌_(tái)機(jī)器的內(nèi)存分配給越多的executor,每個(gè)executor的內(nèi)存就越小,以致出現(xiàn)過(guò)多的數(shù)據(jù)spill over甚至out of memory的情況。
partition和parallelism,partition指的就是數(shù)據(jù)分片的數(shù)量,每一次task只能處理一個(gè)partition的數(shù)據(jù),這個(gè)值太小了會(huì)導(dǎo)致每片數(shù)據(jù)量太大,導(dǎo)致內(nèi)存壓力,或者諸多executor的計(jì)算能力無(wú)法利用充分;但是如果太大了則會(huì)導(dǎo)致分片太多,執(zhí)行效率降低。在執(zhí)行action類型操作的時(shí)候(比如各種reduce操作),partition的數(shù)量會(huì)選擇parent RDD中最大的那一個(gè)。而parallelism則指的是在RDD進(jìn)行reduce類操作的時(shí)候,默認(rèn)返回?cái)?shù)據(jù)的paritition數(shù)量(而在進(jìn)行map類操作的時(shí)候,partition數(shù)量通常取自parent RDD中較大的一個(gè),而且也不會(huì)涉及shuffle,因此這個(gè)parallelism的參數(shù)沒(méi)有影響)。所以說(shuō),這兩個(gè)概念密切相關(guān),都是涉及到數(shù)據(jù)分片的,作用方式其實(shí)是統(tǒng)一的。通過(guò)spark.default.parallelism可以設(shè)置默認(rèn)的分片數(shù)量,而很多RDD的操作都可以指定一個(gè)partition參數(shù)來(lái)顯式控制具體的分片數(shù)量。
在SparkStreaming+kafka的使用中,我們采用了Direct連接方式,前文闡述過(guò)Spark中的partition和Kafka中的Partition是一一對(duì)應(yīng)的,我們一般默認(rèn)設(shè)置為Kafka中Partition的數(shù)量。
這里參考了美團(tuán)技術(shù)團(tuán)隊(duì)的博文,并沒(méi)有做過(guò)具體的性能測(cè)試,其建議如下:
使用reduceByKey/aggregateByKey替代groupByKey
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后進(jìn)行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
這個(gè)優(yōu)化原則我本身也沒(méi)有經(jīng)過(guò)測(cè)試,但是好多優(yōu)化文檔有提到,這里也記錄下來(lái)。
在Spark中,主要有三個(gè)地方涉及到了序列化:
在算子函數(shù)中使用到外部變量時(shí),該變量會(huì)被序列化后進(jìn)行網(wǎng)絡(luò)傳輸(見(jiàn)“原則七:廣播大變量”中的講解)。
將自定義的類型作為RDD的泛型類型時(shí)(比如JavaRDD,Student是自定義類型),所有自定義類型對(duì)象,都會(huì)進(jìn)行序列化。因此這種情況下,也要求自定義的類必須實(shí)現(xiàn)Serializable接口。
使用可序列化的持久化策略時(shí)(比如MEMORY_ONLY_SER),Spark會(huì)將RDD中的每個(gè)partition都序列化成一個(gè)大的字節(jié)數(shù)組。
對(duì)于這三種出現(xiàn)序列化的地方,我們都可以通過(guò)使用Kryo序列化類庫(kù),來(lái)優(yōu)化序列化和反序列化的性能。Spark默認(rèn)使用的是Java的序列化機(jī)制,也就是ObjectOutputStream/ObjectInputStream API來(lái)進(jìn)行序列化和反序列化。但是Spark同時(shí)支持使用Kryo序列化庫(kù),Kryo序列化類庫(kù)的性能比Java序列化類庫(kù)的性能要高很多。官方介紹,Kryo序列化機(jī)制比Java序列化機(jī)制,性能高10倍左右。Spark之所以默認(rèn)沒(méi)有使用Kryo作為序列化類庫(kù),是因?yàn)镵ryo要求最好要注冊(cè)所有需要進(jìn)行序列化的自定義類型,因此對(duì)于開(kāi)發(fā)者來(lái)說(shuō),這種方式比較麻煩。
以下是使用Kryo的代碼示例,我們只要設(shè)置序列化類,再注冊(cè)要序列化的自定義類型即可(比如算子函數(shù)中使用到的外部變量類型、作為RDD泛型類型的自定義類型等):
// 創(chuàng)建SparkConf對(duì)象。val conf =newSparkConf().setMaster(...).setAppName(...)// 設(shè)置序列化器為KryoSerializer。conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")// 注冊(cè)要序列化的自定義類型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
經(jīng)過(guò)種種調(diào)試優(yōu)化,我們最終要達(dá)到的目的是,Spark Streaming能夠?qū)崟r(shí)的拉取Kafka當(dāng)中的數(shù)據(jù),并且能夠保持穩(wěn)定,如下圖所示:

當(dāng)然不同的應(yīng)用場(chǎng)景會(huì)有不同的圖形,這是本文詞頻統(tǒng)計(jì)優(yōu)化穩(wěn)定后的監(jiān)控圖,我們可以看到Processing Time這一柱形圖中有一Stable的虛線,而大多數(shù)Batch都能夠在這一虛線下處理完畢,說(shuō)明整體Spark Streaming是運(yùn)行穩(wěn)定的。