加載 讀取數(shù)據(jù)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_processing').getOrCreate()
df = spark.read_csv('data.csv',inferSchema=True,header=True) # inferSchema用于自行推斷數(shù)據(jù)類型
-------------
df.columns #查看列名
(df.count,len(df.columns))#查看形狀
df.printSchema() #類似info
df.show(5) #類似head
df.select('age','name').show()
df.describe().show()
-------------
添加一列新列
df = df.withColumn('age_after_10_years',(df['age']+10))
轉(zhuǎn)換數(shù)據(jù)類型
from pyspark.sql.types import StringType,DoubleType
df['age'].cast(DoubleType())
-------------
篩選數(shù)據(jù)
df.filter(df['age']==20).show()
df.filter((df['age']==20) & (df['name']=='biob')).show()
求unique
df.select('name').distinct().show()
-------------
數(shù)據(jù)分組
df.groupBy('age').count().orderBy('age',ascending=False).show()
df.groupBy('age').mean() #返回 所有除去age的其他列的mean值
聚合
df.groupBy('age').agg({'salary':'sum'})
------------
自定義函數(shù)
from pyspark.sql.function import udf
def func():
***
***
return **
UDF = udf(func,StringType()) #func的返回指的類型 可以是 StringType 也可以是 IntegerType() 看具體情況
df.withColumn('salary_new',UDF(df['salary'])) #新加一列 對df['salary'] 進(jìn)行定義的函數(shù)操作
-----------
UDF = udf(lambda x:....,StringType())
df.withColumn('salary_new',UDF(df['salary']))
-----------
使用pandas_udf速度更快
from pyspark.sql.function import pandas_udf
def func():
...
UDF = pandas_udf(func,IntegerType())
df.withColumn(df['name'],UDF(df['name']))
函數(shù)的方法也可適用于多列
-----------
去重
df = df.dropDuplicates()
-----------
刪除列
df = df.drop('salary')
寫入數(shù)據(jù) csv格式
path = 'result/train'
df.coalesce(1).write.format('csv').option('header','true').save(path)
-----------
寫入數(shù)據(jù) 嵌套結(jié)構(gòu) 適用于數(shù)據(jù)集特別大 可以對其進(jìn)行壓縮
path = 'result/train'
df.write.format('parquet').save(path)
------------
多列特征合并
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler #VectorAssembler可將所有的特征匯成一列特征
vec_assembler = VectorAssembler(inputCols=['city','career','age','gender'],outputCol='people_feature')
df = vec_assembler.transform(df)
-----------
劃分?jǐn)?shù)據(jù)集
train,test = df.randomSplit([0.8,0.2])
-----------
from pyspark.ml.regression import LinearRegression
linear_reg = LinearRegression(labelCol='label')
model = linear_reg.fit(train)
model_ = model.evaluate(train)
r2 = model_.r2 #還可以取更多的指標(biāo)
res = model.evaluate(test).predictions
-----------
特征工程
from spark.ml.feature import StringIndexer #類似于sklearn 的 label encoder
from spark.ml.feature import OneHotEncoder
from spark.ml.feature import VectorAssembler
df = StringIndexer(inputCol='city',outputCol='city_').fit_transform(df)
-----------
查看數(shù)據(jù)分布 這里用不了value_counts
df.groupBy('label').count().show()
-----------
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
model = RandomForestClassifier(labelCol='label',numTrees=50).fit(train)
-----------
查看準(zhǔn)確率 精度 auc
from spark.ml.evaluation import MulticlassClassificationEvaluator
from spark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.transform(test)
accuracy = MulticlassClassificationEvaluator(labelCol='label',metricName='accuracy').evaluate(predictions) # metricName = 'weightedPrecision' 精度
auc = BinaryClassificationEvaluator(labelCol='label').evaluate(predictions)
----------
推薦算法 ALS(基于矩陣分解的一種方法)
from pyspark.ml.recommendation import ALS
rec = ALS(maxIter=10, regParam=0.01, userCol='userID', itemCol='itemID', ratingCol='rating', nonnegative=True, coldStartStrategy='drop')
其中 nonnegative 表示 不會在推薦系統(tǒng)中創(chuàng)建負(fù)數(shù)評分 coldStartStrategy可以防止生成任何nan評分預(yù)測
-----------
rec_model = rec.fit(train)
predicted_ratings = rec_model.transfrom(train)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
rmase = evaluator.evaluate(predicted_ratings)
進(jìn)行到這一步還沒有完成推薦 只是完成了對測試集用戶評分的預(yù)測
-----------
接下來
推薦用戶可能喜歡的排名靠前的電影
先創(chuàng)建一系列獨立的電影
unique_movies = df.select('movie').distinct()
假如我們對一位特定的userid 推薦電影 先過濾掉 他看過的電影
watched_movies = df.filter(df['userid']==userid).select('movie').distinct()
-----------
然后通過合并兩張表 過濾空值 找出可以推薦的電影
total_movies = unique_movies.join(watched_movies,unique_movies.movie == watched_movies.movie)
remaining_movies = total_movies.where(col('watched_movies.movie').isNull()).select(unique.movies).distinct()
-----------
然后再用之前的模型對其進(jìn)行評分預(yù)測
recommendations = rec_model.transform(remaing_movies).orderBy('prediction',ascending=False)
之后還可以用 IndexToString 反變換 把推薦的電影數(shù)字 映射為 電影名字
pyspark常用操作
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 1.提交任務(wù)spark-submit --driver-memory 10g --queue queue.name...
- pySpark DataFrames常用操作指南 前1, 2步是環(huán)境數(shù)據(jù)集操作,如果只想看常用操作請?zhí)? 1. ...
- dataframe字段操作 打印權(quán)重 解析概率 模型調(diào)參 初始化spark 常用缺失值填充 StringIndex...