Sparklyr

Sparklyr是rstudio 社區(qū)維護(hù)的一個(gè)spark的接口。

文檔

Sparklyr 文檔:https://spark.rstudio.com

安裝

Sparklyr: sparklyr::spark_install(version = "2.3.0", hadoop_version = "2.7"),不依賴于 Spark 版本,spark 2.X 完美兼容 1.X。

Spark 環(huán)境配置需要注意的問(wèn)題:

  1. 下載和 Hadoop 對(duì)應(yīng)版本號(hào)的發(fā)行版,具體可以通過(guò) sparklyr::spark_available_versions() 查詢可用的 spark 版本
  2. JAVA_HOME/SPARK_HOME/HADOOP_HOME 是必須要指定的環(huán)境變量,建議使用 JDK8/spark2.x/hadoop2.7
  3. yarn-client/yarn-cluster 模式需要設(shè)置環(huán)境變量 Sys.setenv("HADOOP_CONF_DIR"="/etc/hadoop/conf")
  4. 連接 Hive 需要提供 Hive 鏈接配置, 在 spark-connection 初始化時(shí)指定對(duì)應(yīng) hive-site.xml 文件

初始化

sc <- sparklyr::spark_connect(master = "yarn-client",
                             spark_home = "/data/FinanceR/Spark",
                             version = "2.2.0",
                             config = sparklyr::spark_config())

數(shù)據(jù)輸入輸出

以寫(xiě) Parquet 文件為例, 同理你可以用 SparkR::write.()/sparklyr::spark_write_()等寫(xiě)入其他格式文件到HDFS 上, 比如csv/text

什么是 Parquet 文件? Parquet 是一種高性能列式存儲(chǔ)文件格式,比 CSV 文件強(qiáng)在內(nèi)建索引,可以快速查詢數(shù)據(jù),目前普遍應(yīng)用在模型訓(xùn)練過(guò)程。

df <- sparklyr::copy_to(sc,faithful,"df")

sparklyr::spark_write_parquet(df,path="/user/FinanceR",mode="overwrite",partition_by = "dt")

數(shù)據(jù)清洗

library(sparklyr)
library(dplyr)

# 在 mutate 中支持 Hive UDF

remote_df = dplyr::tbl(sc,from = "db.financer_tbl") # 定義數(shù)據(jù)源表 
# 或者 remote_df = dplyr::tbl(sc,from = dplyr::sql("select * from db.financer_tbl limit 10")) #

remote_df %>%
    mutate(a = b+2) %>%   # 在 mutate 中支持 Hive UDF
    filter(a > 2)%>%
    group_by(key)%>%
    summarize(count = n())%>%
    select(cnt = count)%>% 
    order_by(cnt)%>%
    arrange(desc(cnt))%>%
    na.omit() ->
    pipeline

pipeline %>% sdf_persist() # 大數(shù)據(jù)集 緩存在集群上
pipeline %>% head() %>% collect() # 小數(shù)據(jù) 加載到本地

SQL

df <- sc %>% 
      dplyr::tbl(dplyr::sql('SELECT * FROM financer_tbl WHERE dt = "20180318"'))

sc %>% DBI::dbGetQuery('SELECT * FROM financer_tbl WHERE dt = "20180318" limit 10') # 直接將數(shù)據(jù) collect 到本地, 與操作MySQL完全一樣
      
df %>% dbplyr::sql_render() # 將 pipeline 自動(dòng)翻譯為 SQL
# SELECT * FROM financer_tbl WHERE dt = "20180318"

分發(fā) R 代碼

分發(fā)機(jī)制:

系統(tǒng)會(huì)將本地依賴文件壓縮打包上傳到 HDFS 路徑上,通過(guò) Spark 動(dòng)態(tài)分發(fā)到執(zhí)行任務(wù)的機(jī)器上解壓縮。 執(zhí)行任務(wù)的機(jī)器本地獨(dú)立的線程、內(nèi)存中執(zhí)行代碼,最后匯總計(jì)算結(jié)果到主要節(jié)點(diǎn)機(jī)器上實(shí)現(xiàn) R 代碼的分發(fā)。

func <- function(x){x + runif(1) } # 原生 R代碼

sparklyr::spark_apply(x = df,packages=T,name = c("key","value"),func =func,group = "key")

流式計(jì)算

什么是流式計(jì)算? 流式計(jì)算是介于實(shí)時(shí)與離線計(jì)算之間的一種計(jì)算方式,以亞秒級(jí)準(zhǔn)實(shí)時(shí)的方式小批量計(jì)算數(shù)據(jù),廣泛應(yīng)用在互聯(lián)網(wǎng)廣告、推薦等場(chǎng)景。

Sparklyr: 暫時(shí)不支持流式計(jì)算,功能開(kāi)發(fā)中。

統(tǒng)計(jì)之都原文:
https://cosx.org/2018/05/sparkr-vs-sparklyr

學(xué)習(xí)資源

https://spark.rstudio.com/
https://github.com/rstudio/cheatsheets/raw/master/translations/chinese/sparklyr-cheatsheet_zh_CN.pdf

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容