從本地的 data frames 來創(chuàng)建 SparkDataFrames
從 Data Sources(數(shù)據(jù)源)創(chuàng)建 SparkDataFrame
從 Hive tables 來創(chuàng)建 SparkDataFrame
應(yīng)用 User-Defined Function(UDF 用戶自定義函數(shù))
Run a given function on a large dataset usingdapplyordapplyCollect
使用spark.lapply分發(fā)運行一個本地的 R 函數(shù)
SparkR 是一個 R package, 它提供了一個輕量級的前端以從 R 中使用 Apache Spark. 在 Spark 2.2.0 中, SparkR 提供了一個分布式的 data frame, 它實現(xiàn)了像 selection, filtering, aggregation etc 一系列所支持的操作.(dplyr與 R data frames 相似) ), 除了可用于海量數(shù)據(jù)上之外. SparkR 還支持使用 MLlib 來進行分布式的 machine learning(機器學(xué)習(xí)).
SparkDataFrame 是一個分布式的, 將數(shù)據(jù)映射到有名稱的 colums(列)的集合. 在概念上 相當(dāng)于關(guān)系數(shù)據(jù)庫中的table表或 R 中的 data frame,但在該引擎下有更多的優(yōu)化. SparkDataFrames 可以從各種來源構(gòu)造,例如: 結(jié)構(gòu)化的數(shù)據(jù)文件,Hive 中的表,外部數(shù)據(jù)庫或現(xiàn)有的本地 R data frames.
All of the examples on this page use sample data included in R or the Spark distribution and can be run using the./bin/sparkRshell.
SparkR 的入口點是SparkSession, 它會連接您的 R 程序到 Spark 集群中. 您可以使用sparkR.session來創(chuàng)建SparkSession, 并傳遞諸如應(yīng)用程序名稱, 依賴的任何 spark 軟件包等選項, 等等. 此外,還可以通過SparkSession來與SparkDataFrames一起工作。 如果您正在使用sparkRshell,那么SparkSession應(yīng)該已經(jīng)被創(chuàng)建了,你不需要再調(diào)用sparkR.session.
sparkR.session()
您可以從 RStudio 中來啟動 SparkR. 您可以從 RStudio, R shell, Rscript 或者 R IDEs 中連接你的 R 程序到 Spark 集群中去. 要開始, 確保已經(jīng)在環(huán)境變量中設(shè)置好 SPARK_HOME (您可以檢測下Sys.getenv), 加載 SparkR package, 并且像下面一樣調(diào)用sparkR.session. 它將檢測 Spark 的安裝, 并且, 如果沒有發(fā)現(xiàn), 它將自動的下載并且緩存起來. 當(dāng)然,您也可以手動的運行install.spark.
為了調(diào)用sparkR.session, 您也可以指定某些 Spark driver 的屬性. 通常哪些應(yīng)用程序?qū)傩?/a>和運行時環(huán)境不能以編程的方式來設(shè)置, 這是因為 driver 的 JVM 進程早就已經(jīng)啟動了, 在這種情況下 SparkR 會幫你做好準(zhǔn)備. 要設(shè)置它們, 可以像在sparkConfig參數(shù)中的其它屬性一樣傳遞它們到sparkR.session()中去.
if(nchar(Sys.getenv("SPARK_HOME"))<1){Sys.setenv(SPARK_HOME="/home/spark")}library(SparkR,lib.loc=c(file.path(Sys.getenv("SPARK_HOME"),"R","lib")))sparkR.session(master="local[*]",sparkConfig=list(spark.driver.memory="2g"))
下面的 Spark driver 屬性可以 從 RStudio 的 sparkR.session 的 sparkConfig 中進行設(shè)置:
Property Name<(屬性名稱)Property group(屬性分組)spark-submitequivalent
spark.masterApplication Properties--master
spark.yarn.keytabApplication Properties--keytab
spark.yarn.principalApplication Properties--principal
spark.driver.memoryApplication Properties--driver-memory
spark.driver.extraClassPathRuntime Environment--driver-class-path
spark.driver.extraJavaOptionsRuntime Environment--driver-java-options
spark.driver.extraLibraryPathRuntime Environment--driver-library-path
有了一個SparkSession之后, 可以從一個本地的 R data frame,Hive 表, 或者其它的data sources中來創(chuàng)建SparkDataFrame應(yīng)用程序.
從本地的 data frames 來創(chuàng)建 SparkDataFrames
要創(chuàng)建一個 data frame 最簡單的方式是去轉(zhuǎn)換一個本地的 R data frame 成為一個 SparkDataFrame. 我們明確的使用as.DataFrame或createDataFrame并且經(jīng)過本地的 R data frame 中以創(chuàng)建一個 SparkDataFrame. 例如, 下面的例子基于 R 中已有的faithful來創(chuàng)建一個SparkDataFrame.
df<-as.DataFrame(faithful)# 展示第一個 SparkDataFrame 的內(nèi)容head(df)##? eruptions waiting##1? ? 3.600? ? ? 79##2? ? 1.800? ? ? 54##3? ? 3.333? ? ? 74
從 Data Sources(數(shù)據(jù)源)創(chuàng)建 SparkDataFrame
SparkR 支持通過SparkDataFrame接口對各種 data sources(數(shù)據(jù)源)進行操作. 本節(jié)介紹使用數(shù)據(jù)源加載和保存數(shù)據(jù)的常見方法. 您可以查看 Spark Sql 編程指南的specific options部分以了解更多可用于內(nèi)置的 data sources(數(shù)據(jù)源)內(nèi)容.
從數(shù)據(jù)源創(chuàng)建 SparkDataFrames 常見的方法是read.df. 此方法將加載文件的路徑和數(shù)據(jù)源的類型,并且將自動使用當(dāng)前活動的 SparkSession. SparkR 天生就支持讀取 JSON, CSV 和 Parquet 文件, 并且通過可靠來源的軟件包第三方項目, 您可以找到 Avro 等流行文件格式的 data source connectors(數(shù)據(jù)源連接器). 可以用spark-submit或sparkR命令指定--packages來添加這些包, 或者在交互式 R shell 或從 RStudio 中使用sparkPackages參數(shù)初始化SparkSession.
sparkR.session(sparkPackages="com.databricks:spark-avro_2.11:3.0.0")
We can see how to use data sources using an example JSON input file. Note that the file that is used here isnota typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please seeJSON Lines text format, also called newline-delimited JSON. As a consequence, a regular multi-line JSON file will most often fail.
我們可以看看如何使用 JSON input file 的例子來使用數(shù)據(jù)源. 注意, 這里使用的文件是not一個經(jīng)典的 JSON 文件. 文件中的每行都必須包含一個單獨的,獨立的有效的JSON對象
people<-read.df("./examples/src/main/resources/people.json","json")head(people)##? age? ? name##1? NA Michael##2? 30? ? Andy##3? 19? Justin# SparkR 自動從 JSON 文件推斷出 schema(模式)printSchema(people)# root#? |-- age: long (nullable = true)#? |-- name: string (nullable = true)# 同樣, 使用? read.json 讀取多個文件people<-read.json(c("./examples/src/main/resources/people.json","./examples/src/main/resources/people2.json"))
該 data sources API 原生支持 CSV 格式的 input files(輸入文件). 要了解更多信息請參閱 SparkRread.dfAPI 文檔.
df<-read.df(csvPath,"csv",header="true",inferSchema="true",na.strings="NA")
該 data sources API 也可用于將 SparkDataFrames 存儲為多個 file formats(文件格式). 例如, 我們可以使用write.df把先前的示例的 SparkDataFrame 存儲為一個 Parquet 文件.
write.df(people,path="people.parquet",source="parquet",mode="overwrite")
從 Hive tables 來創(chuàng)建 SparkDataFrame
您也可以從 Hive tables(表)來創(chuàng)建 SparkDataFrames. 為此,我們需要創(chuàng)建一個具有 Hive 支持的 SparkSession,它可以訪問 Hive MetaStore 中的 tables(表). 請注意, Spark 應(yīng)該使用Hive support來構(gòu)建,更多細(xì)節(jié)可以在SQL 編程指南中查閱.
sparkR.session()sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries can be expressed in HiveQL.results<-sql("FROM src SELECT key, value")# results is now a SparkDataFramehead(results)##? key? value## 1 238 val_238## 2? 86? val_86## 3 311 val_311
SparkDataFrames 支持一些用于結(jié)構(gòu)化數(shù)據(jù)處理的 functions(函數(shù)). 這里我們包括一些基本的例子,一個完整的列表可以在API文檔中找到:
# Create the SparkDataFramedf<-as.DataFrame(faithful)# 獲取關(guān)于 SparkDataFrame 基礎(chǔ)信息df## SparkDataFrame[eruptions:double, waiting:double]# Select only the "eruptions" columnhead(select(df,df$eruptions))##? eruptions##1? ? 3.600##2? ? 1.800##3? ? 3.333# You can also pass in column name as stringshead(select(df,"eruptions"))# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 minshead(filter(df,df$waiting<50))##? eruptions waiting##1? ? 1.750? ? ? 47##2? ? 1.750? ? ? 47##3? ? 1.867? ? ? 48
SparkR data frames 支持一些常見的, 用于在 grouping(分組)數(shù)據(jù)后進行 aggregate(聚合)的函數(shù). 例如, 我們可以在faithfuldataset 中計算waiting時間的直方圖, 如下所示.
# We use the `n` operator to count the number of times each waiting time appearshead(summarize(groupBy(df,df$waiting),count=n(df$waiting)))##? waiting count##1? ? ? 70? ? 4##2? ? ? 67? ? 1##3? ? ? 69? ? 2# We can also sort the output from the aggregation to get the most common waiting timeswaiting_counts<-summarize(groupBy(df,df$waiting),count=n(df$waiting))head(arrange(waiting_counts,desc(waiting_counts$count)))##? waiting count##1? ? ? 78? ? 15##2? ? ? 83? ? 14##3? ? ? 81? ? 13
SparkR 還提供了一些可以直接應(yīng)用于列進行數(shù)據(jù)處理和 aggregatation(聚合)的函數(shù). 下面的例子展示了使用基本的算術(shù)函數(shù).
# Convert waiting time from hours to seconds.# Note that we can assign this to a new column in the same SparkDataFramedf$waiting_secs<-df$waiting*60head(df)##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440
應(yīng)用 User-Defined Function(UDF 用戶自定義函數(shù))
在 SparkR 中, 我們支持幾種 User-Defined Functions:
Run a given function on a large dataset usingdapplyordapplyCollect
應(yīng)用一個 function(函數(shù))到SparkDataFrame的每個 partition(分區(qū)). 應(yīng)用于SparkDataFrame每個 partition(分區(qū))的 function(函數(shù))應(yīng)該只有一個參數(shù), 它中的data.frame對應(yīng)傳遞的每個分區(qū). 函數(shù)的輸出應(yīng)該是一個data.frame. Schema 指定生成的SparkDataFramerow format. 它必須匹配返回值的data types.
# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame.schema<-structType(structField("eruptions","double"),structField("waiting","double"),structField("waiting_secs","double"))df1<-dapply(df,function(x){x<-cbind(x,x$waiting*60)},schema)head(collect(df1))##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440##4? ? 2.283? ? ? 62? ? ? ? 3720##5? ? 4.533? ? ? 85? ? ? ? 5100##6? ? 2.883? ? ? 55? ? ? ? 3300
像dapply那樣, 應(yīng)用一個函數(shù)到SparkDataFrame的每個分區(qū)并且手機返回結(jié)果. 函數(shù)的輸出應(yīng)該是一個data.frame. 但是, 不需要傳遞 Schema. 注意, 如果運行在所有分區(qū)上的函數(shù)的輸出不能 pulled(拉)到 driver 的內(nèi)存中過去, 則dapplyCollect會失敗.
# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame and return a R's data.frameldf<-dapplyCollect(df,function(x){x<-cbind(x,"waiting_secs"=x$waiting*60)})head(ldf,3)##? eruptions waiting waiting_secs##1? ? 3.600? ? ? 79? ? ? ? 4740##2? ? 1.800? ? ? 54? ? ? ? 3240##3? ? 3.333? ? ? 74? ? ? ? 4440
Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一個大的 dataset 上通過 input colums(輸入列)來進行 grouping(分組)并且使用gapplyorgapplyCollect來運行一個指定的函數(shù))
應(yīng)用給一個函數(shù)到SparkDataFrame的每個 group. 該函數(shù)被應(yīng)用到SparkDataFrame的每個 group, 并且應(yīng)該只有兩個參數(shù): grouping key 和 Rdata.frame對應(yīng)的 key. 該 groups 從SparkDataFrame的 columns(列)中選擇. 函數(shù)的輸出應(yīng)該是data.frame. Schema 指定生成的SparkDataFramerow format. 它必須在 Sparkdata types 數(shù)據(jù)類型的基礎(chǔ)上表示 R 函數(shù)的輸出 schema(模式). 用戶可以設(shè)置返回的data.frame列名.
# Determine six waiting times with the largest eruption time in minutes.schema<-structType(structField("waiting","double"),structField("max_eruption","double"))result<-gapply(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))},schema)head(collect(arrange(result,"max_eruption",decreasing=TRUE)))##? ? waiting? max_eruption##1? ? ? 64? ? ? 5.100##2? ? ? 69? ? ? 5.067##3? ? ? 71? ? ? 5.033##4? ? ? 87? ? ? 5.000##5? ? ? 63? ? ? 4.933##6? ? ? 89? ? ? 4.900
像gapply那樣, 將函數(shù)應(yīng)用于SparkDataFrame的每個分區(qū),并將結(jié)果收集回 R data.frame. 函數(shù)的輸出應(yīng)該是一個data.frame. 但是,不需要傳遞 schema(模式). 請注意,如果在所有分區(qū)上運行的 UDF 的輸出無法 pull(拉)到 driver 的內(nèi)存, 那么gapplyCollect可能會失敗.
# Determine six waiting times with the largest eruption time in minutes.result<-gapplyCollect(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))colnames(y)<-c("waiting","max_eruption")y})head(result[order(result$max_eruption,decreasing=TRUE),])##? ? waiting? max_eruption##1? ? ? 64? ? ? 5.100##2? ? ? 69? ? ? 5.067##3? ? ? 71? ? ? 5.033##4? ? ? 87? ? ? 5.000##5? ? ? 63? ? ? 4.933##6? ? ? 89? ? ? 4.900
使用spark.lapply分發(fā)運行一個本地的 R 函數(shù)
類似于本地 R 中的lapply,spark.lapply在元素列表中運行一個函數(shù),并使用 Spark 分發(fā)計算. 以類似于doParallel或lapply的方式應(yīng)用于列表的元素. 所有計算的結(jié)果應(yīng)該放在一臺機器上. 如果不是這樣, 他們可以像df < - createDataFrame(list)這樣做, 然后使用dapply.
# Perform distributed training of multiple models with spark.lapply. Here, we pass# a read-only list of arguments which specifies family the generalized linear model should be.families<-c("gaussian","poisson")train<-function(family){model<-glm(Sepal.Length~Sepal.Width+Species,iris,family=family)summary(model)}# Return a list of model's summariesmodel.summaries<-spark.lapply(families,train)# Print the summary of each modelprint(model.summaries)
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. Thesqlfunction enables applications to run SQL queries programmatically and returns the result as aSparkDataFrame.
# Load a JSON filepeople<-read.df("./examples/src/main/resources/people.json","json")# Register this SparkDataFrame as a temporary view.createOrReplaceTempView(people,"people")# SQL statements can be run by using the sql methodteenagers<-sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")head(teenagers)##? ? name##1 Justin
SparkR 現(xiàn)支持下列機器學(xué)習(xí)算法:
spark.logit:邏輯回歸 Logistic Regression
spark.survreg:加速失敗時間生存模型 Accelerated Failure Time (AFT) Survival Model
spark.glmorglm:廣義線性模型 Generalized Linear Model (GLM)
spark.randomForest:隨機森林 for回歸and分類
spark.gaussianMixture:高斯混合模型 (GMM)
spark.kstest:柯爾莫哥洛夫-斯米爾諾夫檢驗
SparkR 底層實現(xiàn)使用 MLlib 來訓(xùn)練模型. 有關(guān)示例代碼,請參閱MLlib用戶指南的相應(yīng)章節(jié). 用戶可以調(diào)用summary輸出擬合模型的摘要, 利用模型對數(shù)據(jù)進行預(yù)測, 并且使用write.ml/read.ml來 保存/加載擬合的模型 . SparkR 支持對模型擬合使用部分R的公式運算符, 包括 ‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘.
下面的例子展示了SparkR如何 保存/加載 機器學(xué)習(xí)模型.
training<-read.df("data/mllib/sample_multiclass_classification_data.txt",source="libsvm")# Fit a generalized linear model of family "gaussian" with spark.glmdf_list<-randomSplit(training,c(7,3),2)gaussianDF<-df_list[[1]]gaussianTestDF<-df_list[[2]]gaussianGLM<-spark.glm(gaussianDF,label~features,family="gaussian")# Save and then load a fitted MLlib modelmodelPath<-tempfile(pattern="ml",fileext=".tmp")write.ml(gaussianGLM,modelPath)gaussianGLM2<-read.ml(modelPath)# Check model summarysummary(gaussianGLM2)# Check model predictiongaussianPredictions<-predict(gaussianGLM2,gaussianTestDF)head(gaussianPredictions)unlink(modelPath)
Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.
RSpark
bytebyte
integerinteger
floatfloat
doubledouble
numericdouble
characterstring
stringstring
binarybinary
rawbinary
logicalboolean
POSIXcttimestamp
POSIXlttimestamp
Datedate
arrayarray
listarray
envmap
SparkR 支持 Structured Streaming API (測試階段). Structured Streaming 是一個 構(gòu)建于SparkSQL引擎之上的易拓展、可容錯的流式處理引擎. 更多信息請參考 R APIStructured Streaming Programming Guide
當(dāng)在R中加載或引入(attach)一個新package時, 可能會發(fā)生函數(shù)名沖突,一個函數(shù)掩蓋了另一個函數(shù)
下列函數(shù)是被SparkR所掩蓋的:
被掩蓋函數(shù)如何獲取
covinpackage:statsstats::cov(x, y = NULL, use = "everything",
method = c("pearson", "kendall", "spearman"))
filterinpackage:statsstats::filter(x, filter, method = c("convolution", "recursive"),
sides = 2, circular = FALSE, init)
sampleinpackage:basebase::sample(x, size, replace = FALSE, prob = NULL)
由于SparkR的一部分是在dplyr軟件包上建模的,因此SparkR中的某些函數(shù)與dplyr中同名. 根據(jù)兩個包的加載順序, 后加載的包會掩蓋先加載的包的部分函數(shù). 在這種情況下, 可以在函數(shù)名前指定包名前綴, 例如:SparkR::cume_dist(x)ordplyr::cume_dist(x).
你可以在 R 中使用search()檢查搜索路徑
在Spark 1.6.0 之前, 寫入模式默認(rèn)值為append. 在 Spark 1.6.0 改為error匹配 Scala API.
SparkSQL 將R 中的NA轉(zhuǎn)換為null,反之亦然.
table方法已經(jīng)移除并替換為tableToDF.
類DataFrame已改名為SparkDataFrame避免名稱沖突.
Spark的SQLContext和HiveContext已經(jīng)過時并替換為SparkSession. 相應(yīng)的摒棄sparkR.init()而通過調(diào)用sparkR.session()來實例化SparkSession. 一旦實例化完成, 當(dāng)前的SparkSession即可用于SparkDataFrame 操作(注釋:spark2.0開始所有的driver實例通過sparkSession來進行構(gòu)建).
sparkR.session不支持sparkExecutorEnv參數(shù).要為executors設(shè)置環(huán)境,請使用前綴”spark.executorEnv.VAR_NAME”設(shè)置Spark配置屬性,例如”spark.executorEnv.PATH”, -sqlContext不再需要下列函數(shù):createDataFrame,as.DataFrame,read.json,jsonFile,read.parquet,parquetFile,read.text,sql,tables,tableNames,cacheTable,uncacheTable,clearCache,dropTempTable,read.df,loadDF,createExternalTable.
registerTempTable方法已經(jīng)過期并且替換為createOrReplaceTempView.
dropTempTable方法已經(jīng)過期并且替換為dropTempView.
scSparkContext 參數(shù)不再需要下列函數(shù):setJobGroup,clearJobGroup,cancelJobGroup
join不再執(zhí)行笛卡爾積計算, 使用crossJoin來進行笛卡爾積計算.
createDataFrame和as.DataFrame添加numPartitions參數(shù). 數(shù)據(jù)分割時, 分區(qū)位置計算已經(jīng)與scala計算相一致.
方法createExternalTable已經(jīng)過期并且替換為createTable. 可以調(diào)用這兩種方法來創(chuàng)建外部或托管表. 已經(jīng)添加額外的 catalog 方法.
默認(rèn)情況下,derby.log現(xiàn)在已保存到tempdir()目錄中. 當(dāng)實例化SparkSession且選項enableHiveSupport 為TRUE,會創(chuàng)建derby.log .
更正spark.lda錯誤設(shè)置優(yōu)化器的bug.
更新模型概況輸出coefficientsasmatrix. 更新的模型概況包括spark.logit,spark.kmeans,spark.glm.spark.gaussianMixture的模型概況已經(jīng)添加對數(shù)概度(log-likelihood)loglik.

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sparkr.html
網(wǎng)頁地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(覺得不錯麻煩給個 Star,謝謝!~)