Spark The Definitive Guide(Spark權(quán)威指南) 中文版。本書詳細(xì)介紹了Spark2.x版本的各個模塊,目前市面上最好的Spark2.x學(xué)習(xí)書籍?。?!
關(guān)注:登峰大數(shù)據(jù),閱讀中文Spark權(quán)威指南(完整版),系統(tǒng)學(xué)習(xí)Spark大數(shù)據(jù)框架!
如果您覺得作者翻譯的內(nèi)容有幫助,請分享給更多人。您的分享,是作者翻譯的動力
本書前幾章已經(jīng)從用戶的角度介紹了結(jié)構(gòu)化流。這自然是應(yīng)用程序的核心。本章將介紹在開發(fā)應(yīng)用程序之后,在生產(chǎn)環(huán)境中穩(wěn)定運(yùn)行結(jié)構(gòu)化流所需的一些操作工具。在Apache Spark 2.2.0中,結(jié)構(gòu)化流被標(biāo)記為可生產(chǎn)的,這意味著該版本具有生產(chǎn)使用所需的所有特性和穩(wěn)定的API。許多組織已經(jīng)在生產(chǎn)中使用該系統(tǒng),因?yàn)樘孤实卣f,它與運(yùn)行其他生產(chǎn)Spark應(yīng)用程序沒有太大的不同。的確,通過諸如事務(wù)source/sink和精確的一次處理等特性,結(jié)構(gòu)化流設(shè)計人員試圖使其盡可能易于操作。本章將帶您了解一些特定于結(jié)構(gòu)化流的關(guān)鍵操作任務(wù)。這將補(bǔ)充我們在第2部分中看到和了解到的有關(guān)Spark操作的所有內(nèi)容。
23.1.容錯和檢查點(diǎn)
流應(yīng)用程序最重要的操作關(guān)注點(diǎn)是故障恢復(fù)。錯誤是不可避免的:例如丟失集群中的一臺機(jī)器,模式將在沒有適當(dāng)遷移的情況下意外更改,甚至可能故意重啟集群或應(yīng)用程序。在這些情況下,結(jié)構(gòu)化流允許您通過重新啟動應(yīng)用程序來恢復(fù)應(yīng)用程序。為此,必須將應(yīng)用程序配置為使用檢查點(diǎn)和WAL預(yù)寫日志,這兩個操作都由引擎自動處理。具體地說,您必須配置一個查詢來寫入可靠文件系統(tǒng)(例如,HDFS、S3或任何兼容的文件系統(tǒng))上的檢查點(diǎn)位置。然后,結(jié)構(gòu)化流將定期將所有相關(guān)的進(jìn)度信息(例如,給定觸發(fā)器中處理的偏移量范圍)以及當(dāng)前中間狀態(tài)值保存到檢查點(diǎn)位置。在失敗場景中,您只需重新啟動應(yīng)用程序,確保指向相同的檢查點(diǎn)位置,它就會自動恢復(fù)其狀態(tài),并從停止的地方開始處理數(shù)據(jù)。您不必代表應(yīng)用程序手動管理此狀態(tài)----結(jié)構(gòu)化流媒體為您做這些功能。
要使用檢查點(diǎn),請在通過writeStream上的checkpointLocation選項(xiàng)啟動應(yīng)用程序之前指定檢查點(diǎn)位置。你可以這樣做:
// in Scalaval static = spark.read.json("/data/activity-data")val streaming = spark? .readStream? .schema(static.schema)? .option("maxFilesPerTrigger", 10)? .json("/data/activity-data")? .groupBy("gt")? .count()val query = streaming? .writeStream? .outputMode("complete")? .option("checkpointLocation", "/some/location/")? .queryName("test_stream")? .format("memory")? .start()# in Pythonstatic = spark.read.json("/data/activity-data")streaming = spark\? .readStream\? .schema(static.schema)\? .option("maxFilesPerTrigger", 10)\.json("/data/activity-data")\? .groupBy("gt")\? .count()query = streaming\? .writeStream\? .outputMode("complete")\? .option("checkpointLocation", "/some/python/location/")\? .queryName("test_python_stream")\? .format("memory")\??.start()
如果丟失檢查點(diǎn)目錄或其中的信息,應(yīng)用程序?qū)o法從失敗中恢復(fù),您將不得不從頭開始重新啟動流。
23.2.更新程序
為了在生產(chǎn)環(huán)境中運(yùn)行應(yīng)用程序,啟用檢查點(diǎn)可能是最重要的。這是因?yàn)闄z查點(diǎn)將存儲到目前為止您的流所處理的所有信息,以及它可能存儲的中間狀態(tài)。然而,檢查點(diǎn)確實(shí)帶來了一個小陷阱——當(dāng)您更新流應(yīng)用程序時,您將不得不對舊檢查點(diǎn)數(shù)據(jù)進(jìn)行推理。當(dāng)您更新您的應(yīng)用程序時,您必須確保您的更新不是破壞性的更改。當(dāng)我們查看這兩種類型的更新時,我們將詳細(xì)討論這些內(nèi)容:對應(yīng)用程序代碼的更新或運(yùn)行新的Spark版本。
23.2.1.對應(yīng)用程序代碼的更新
結(jié)構(gòu)化流的設(shè)計允許在應(yīng)用程序重啟之間對應(yīng)用程序代碼進(jìn)行某些類型的更改。最重要的是,允許您更改用戶定義函數(shù)(udf),只要它們具有相同的類型簽名。這個特性對于bug修復(fù)非常有用。例如,假設(shè)應(yīng)用程序開始接收一種新類型的數(shù)據(jù),并且當(dāng)前邏輯中的數(shù)據(jù)解析函數(shù)之一崩潰。使用結(jié)構(gòu)化流,您可以使用該函數(shù)的新版本重新編譯應(yīng)用程序,并在流中它之前崩潰的同一位置重新編譯。
雖然添加新列或更改UDF之類的小調(diào)整不會破壞更改,也不需要新的檢查點(diǎn)目錄,但是較大的更改確實(shí)需要一個全新的檢查點(diǎn)目錄。例如,如果更新流應(yīng)用程序以添加新的聚合鍵或從根本上更改查詢本身,Spark就不能從舊檢查點(diǎn)目錄為新查詢構(gòu)造所需的狀態(tài)。在這些情況下,結(jié)構(gòu)化流將拋出一個異常,說它不能從檢查點(diǎn)目錄開始,您必須從頭開始使用一個新的(空的)目錄作為檢查點(diǎn)位置。
23.2.2.運(yùn)行新的Spark版本
結(jié)構(gòu)化的流處理應(yīng)用程序應(yīng)該能夠從一個舊的檢查點(diǎn)目錄重新啟動到Spark的補(bǔ)丁版本更新(例如,從Spark 2.2.0到2.2.1再到2.2.2)。檢查點(diǎn)格式被設(shè)計為向前兼容的,所以它可能被破壞的唯一方法是由于關(guān)鍵的錯誤修復(fù)。如果Spark發(fā)行版不能從舊檢查點(diǎn)恢復(fù),那么它的發(fā)行說明中將清楚地記錄這一點(diǎn)。結(jié)構(gòu)化流媒體開發(fā)人員還希望在較小的版本更新(如Spark 2.2)之間保持格式的兼容性。但是您應(yīng)該檢查發(fā)布說明,看看是否支持每次升級。在這兩種情況下,如果不能從檢查點(diǎn)啟動,都需要使用新的檢查點(diǎn)目錄重新啟動應(yīng)用程序。
23.2.3.調(diào)整應(yīng)用程序的規(guī)模大小
通常,集群的大小應(yīng)該能夠輕松地處理高于數(shù)據(jù)速率的突發(fā)事件。下面討論應(yīng)用程序和集群中應(yīng)該監(jiān)控的關(guān)鍵指標(biāo)。一般來說,如果您看到您的輸入速率遠(yuǎn)遠(yuǎn)高于您的處理速率(稍后將進(jìn)行詳細(xì)說明),那么就該擴(kuò)展您的集群或應(yīng)用程序了。根據(jù)您的資源管理器和部署,您可能只是能夠動態(tài)地將executor添加到應(yīng)用程序中。當(dāng)需要時,您可以用同樣的方法縮小應(yīng)用程序的規(guī)模——刪除執(zhí)行器(可能通過您的云提供商)或使用更低的資源計數(shù)重新啟動應(yīng)用程序。這些更改可能會導(dǎo)致一些處理延遲(當(dāng)刪除執(zhí)行器時,將重新計算數(shù)據(jù)或重新分配分區(qū))。最后,是否值得創(chuàng)建一個具有更復(fù)雜的資源管理功能的系統(tǒng)是一個業(yè)務(wù)決策。
雖然對集群或應(yīng)用程序進(jìn)行底層基礎(chǔ)設(shè)施更改有時是必要的,但有時更改可能只需要重新啟動應(yīng)用程序或使用新配置的流。例如在流處理程序運(yùn)行過程中,更改 spark.sql.shuffle.partitions參數(shù)是不會生效的。這需要重新啟動實(shí)際的流處理程序,而不一定是整個應(yīng)用程序。較重的更改(如更改任意Spark應(yīng)用程序配置)可能需要重新啟動應(yīng)用程序。
23.3.度量和監(jiān)控
流應(yīng)用程序中的度量和監(jiān)視與使用第18章中描述的工具的一般Spark應(yīng)用程序基本相同。不過,結(jié)構(gòu)化流確實(shí)添加了一些更具體的內(nèi)容,以幫助您更好地理解應(yīng)用程序的狀態(tài)。您可以使用兩個關(guān)鍵api來查詢流查詢的狀態(tài)并查看其最近的執(zhí)行進(jìn)度。使用這兩個api,您可以了解您的流是否按預(yù)期運(yùn)行。
23.3.1.Query Status 查詢狀態(tài)
查詢狀態(tài)是最基本的監(jiān)控API,因此它是一個很好的起點(diǎn)。它的目的是回答這個問題:“我的流現(xiàn)在正在執(zhí)行什么處理?” 此信息在startStream返回的查詢對象的status字段中報告。例如,您可能有一個簡單的計數(shù)流,它提供了由以下查詢定義的物聯(lián)網(wǎng)設(shè)備的計數(shù)(這里我們只是使用了與前一章相同的查詢,沒有初始化代碼):
query.status
要獲得給定查詢的狀態(tài),只需運(yùn)行命令query.status將返回流的當(dāng)前狀態(tài)。這為我們提供了關(guān)于流中在那個時間點(diǎn)上發(fā)生的事情的詳細(xì)信息。下面是查詢此狀態(tài)時將返回的示例:
{? "message" : "Getting offsets from ...",? "isDataAvailable" : true,? "isTriggerActive" : true}
上面的代碼片段描述了從結(jié)構(gòu)化流數(shù)據(jù)源獲取偏移量(因此描述獲取偏移量的消息)。有多種消息描述流的狀態(tài)。
注意我們以在Spark shell中調(diào)用的方式在這里內(nèi)聯(lián)顯示了status命令。但是,對于獨(dú)立應(yīng)用程序,您可能沒有附加shell來在流程中運(yùn)行任意代碼。在這種情況下,您可以通過實(shí)現(xiàn)監(jiān)控服務(wù)器來公開它的狀態(tài),例如一個小型HTTP服務(wù)器,它監(jiān)聽端口并返回查詢。獲取請求時的狀態(tài)。或者,您可以使用稍后描述的更豐富的StreamingQueryListener API來偵聽更多事件。
23.3.2.Recent Progress當(dāng)前進(jìn)展
雖然可以查看查詢的當(dāng)前狀態(tài),但是查看查詢過程的能力同樣重要。progress API允許我們回答諸如“我處理元組的速度是多少?”或者“元組從源到達(dá)的速度有多快?” 通過運(yùn)行query.recentProgress命令,流查詢過程還包括關(guān)于流內(nèi)部的輸入源和輸出接收器的信息。
query.recentProgress
這是我們運(yùn)行之前的代碼后Scala版本的結(jié)果;Python版本將是類似的:
Array({? "id" : "d9b5eac5-2b27-4655-8dd3-4be626b1b59b",? "runId" : "f8da8bc7-5d0a-4554-880d-d21fe43b983d",? "name" : "test_stream",? "timestamp" : "2017-08-06T21:11:21.141Z",? "numInputRows" : 780119,? "processedRowsPerSecond" : 19779.89350912779,? "durationMs" : {? ? "addBatch" : 38179,? ? "getBatch" : 235,? ? "getOffset" : 518,? ? "queryPlanning" : 138,? ? "triggerExecution" : 39440,? ? "walCommit" : 312? },? "stateOperators" : [ {? ? "numRowsTotal" : 7,? ? "numRowsUpdated" : 7? } ],? "sources" : [ {? ? "description" : "FileStreamSource[/some/stream/source/]",? ? "startOffset" : null,? ? "endOffset" : {? ? ? "logOffset" : 0? ? },? ? "numInputRows" : 780119,? ? "processedRowsPerSecond" : 19779.89350912779? } ],? "sink" : {? ? "description" : "MemorySink"? }})
正如您從剛才顯示的輸出中所看到的,這包括關(guān)于流狀態(tài)的許多細(xì)節(jié)。需要注意的是,這是一個實(shí)時快照(根據(jù)查詢進(jìn)度的時間)。為了一致地獲得關(guān)于流狀態(tài)的輸出,您需要反復(fù)查詢這個API以獲得更新后的狀態(tài)。前面輸出中的大多數(shù)字段應(yīng)該是不言自明的。但是,讓我們詳細(xì)回顧一些更重要的字段。
Input rate and processing rate輸入速率和處理速率
輸入速率指定有多少數(shù)據(jù)從輸入源流向結(jié)構(gòu)化流。處理速率是應(yīng)用程序分析數(shù)據(jù)的速度。在理想情況下,輸入和處理速率應(yīng)該同時變化。另一種情況可能是輸入速率遠(yuǎn)遠(yuǎn)大于處理速率。當(dāng)這種情況發(fā)生時,流就會落后,您需要將集群擴(kuò)展到更高的級別來處理更大的負(fù)載。
Batch duration批次間隔
幾乎所有的流系統(tǒng)都利用批處理以任何合理的吞吐量進(jìn)行操作(有些系統(tǒng)可以選擇高延遲來換取較低的吞吐量)。結(jié)構(gòu)化流實(shí)現(xiàn)了這兩種功能。當(dāng)它對數(shù)據(jù)進(jìn)行操作時,您可能會看到批處理持續(xù)時間隨著結(jié)構(gòu)化流處理事件數(shù)量的變化而振蕩。當(dāng)然,當(dāng)連續(xù)處理引擎成為執(zhí)行選項(xiàng)時,這個度量將幾乎沒有相關(guān)性。
提示通常,將批處理持續(xù)時間、輸入和處理速率的變化可視化是最佳實(shí)踐。它比簡單地報告隨時間的變化更有幫助。
23.3.3.Spark UI
Spark web UI(在第18章中詳細(xì)介紹)還顯示了結(jié)構(gòu)化流應(yīng)用程序的任務(wù)、作業(yè)和數(shù)據(jù)處理指標(biāo)。在Spark UI上,每個流應(yīng)用程序都將顯示為一系列短作業(yè),每個觸發(fā)器對應(yīng)一個短作業(yè)。但是,您可以使用相同的UI查看來自應(yīng)用程序的度量、查詢計劃、任務(wù)持續(xù)時間和日志。與DStream API不同的一點(diǎn)是,流選項(xiàng)卡不用于結(jié)構(gòu)化流。
23.4.預(yù)警
理解和查看結(jié)構(gòu)化流查詢的指標(biāo)是重要的第一步。然而,這需要不斷地監(jiān)控儀表板或度量,以發(fā)現(xiàn)潛在的問題。您將需要健壯的自動警報,以便在不手動監(jiān)控作業(yè)的情況下,在作業(yè)失敗或無法跟上輸入數(shù)據(jù)速率時通知您。有幾種方法可以將現(xiàn)有的警報工具集成到Spark中,通常是基于我們前面介紹的recent progress API。例如,您可以直接將度量數(shù)據(jù)提供給監(jiān)控系統(tǒng),比如開源Coda Hale度量庫或Prometheus,或者您可以簡單地對它們進(jìn)行日志記錄,并使用Splunk這樣的日志聚合系統(tǒng)。除了監(jiān)控和警告查詢之外,還需要監(jiān)控和警告集群和整個應(yīng)用程序的狀態(tài)(如果同時運(yùn)行多個查詢)。
23.5.使用流監(jiān)聽器進(jìn)行高級監(jiān)控
我們已經(jīng)討論了結(jié)構(gòu)化流中的一些高級監(jiān)控工具。使用一些粘合邏輯,您可以使用status和queryProgress api將監(jiān)控事件輸出到您選擇的監(jiān)控平臺(例如,日志聚合系統(tǒng)或Prometheus儀表板) 除了這些方法之外,還有一種更底層但更強(qiáng)大的方法來觀察應(yīng)用程序的執(zhí)行:StreamingQueryListener類。
StreamingQueryListener類允許您從流查詢接收異步更新,以便自動將此信息輸出到其他系統(tǒng),并實(shí)現(xiàn)健壯的監(jiān)控和警報機(jī)制。首先開發(fā)自己的對象來擴(kuò)展StreamingQueryListener,然后將其附加到正在運(yùn)行的SparkSession。一旦您使用sparkssession .streams. addlistener()附加自定義偵聽器,當(dāng)查詢啟動或停止時,或者在活動查詢上取得進(jìn)展時,您的類將收到通知。下面是結(jié)構(gòu)化流文檔中偵聽器的一個簡單示例:
val spark: SparkSession = ...spark.streams.addListener(new StreamingQueryListener() {? ? override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {? ? ? ? println("Query started: " + queryStarted.id)? ? }? ? override def onQueryTerminated(? ? ? queryTerminated: QueryTerminatedEvent): Unit = {? ? ? ? println("Query terminated: " + queryTerminated.id)? ? }? ? override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {? ? ? ? println("Query made progress: " + queryProgress.progress)? ? }})
流監(jiān)聽器允許您使用自定義代碼處理每個進(jìn)度更新或狀態(tài)更改,并將其傳遞給外部系統(tǒng)。例如,StreamingQueryListener的以下代碼將把所有查詢進(jìn)度信息轉(zhuǎn)發(fā)給Kafka。從Kafka讀取數(shù)據(jù)后,您必須解析這個JSON字符串,以便訪問實(shí)際的指標(biāo):
classKafkaMetrics(servers: String) extends StreamingQueryListener {valkafkaProperties = new Properties()? kafkaProperties.put("bootstrap.servers",? ? servers)? kafkaProperties.put("key.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer")? kafkaProperties.put("value.serializer","kafkashaded.org.apache.kafka.common.serialization.StringSerializer")valproducer = new KafkaProducer[String, String](kafkaProperties)importorg.apache.spark.sql.streaming.StreamingQueryListenerimportorg.apache.kafka.clients.producer.KafkaProduceroverridedef onQueryProgress(event:StreamingQueryListener.QueryProgressEvent):Unit= {producer.send(new ProducerRecord("streaming-metrics",? ? ? event.progress.json))? }overridedef onQueryStarted(event:StreamingQueryListener.QueryStartedEvent):Unit= {}overridedef onQueryTerminated(event:StreamingQueryListener.QueryTerminatedEvent):Unit= {}}
使用StreamingQueryListener接口,您甚至可以通過在同一個(或另一個)集群上運(yùn)行結(jié)構(gòu)化流應(yīng)用程序來監(jiān)控一個集群上的結(jié)構(gòu)化流應(yīng)用程序。您還可以用這種方式管理多個流。
23.6.結(jié)束語
在本章中,我們討論了在生產(chǎn)環(huán)境中運(yùn)行結(jié)構(gòu)化流所需的主要工具:容錯檢查點(diǎn)和各種監(jiān)控api,這些api允許您觀察應(yīng)用程序如何運(yùn)行。幸運(yùn)的是,如果您已經(jīng)在生產(chǎn)環(huán)境中運(yùn)行Spark,那么許多概念和工具都是類似的,因此您應(yīng)該能夠重用大量現(xiàn)有知識。請務(wù)必檢查第4部分,以查看監(jiān)控Spark應(yīng)用程序的其他一些有用工具。