1.讀取csv
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('my_first_app_name') \
.getOrCreate()
file = r'C:\Users\Administrator\Desktop\kaggle泰坦尼克號(hào)獲救率預(yù)測(cè)數(shù)據(jù)集\train.csv'
df = spark.read.csv(file,header=True,inferSchema=True)
df.show(5)

2. 查看字段類型 同pandas
df.dtypes
[('PassengerId', 'int'),
('Survived', 'int'),
('Pclass', 'int'),
('Name', 'string'),
('Sex', 'string'),
('Age', 'double'),
('SibSp', 'int'),
('Parch', 'int'),
('Ticket', 'string'),
('Fare', 'double'),
('Cabin', 'string'),
('Embarked', 'string')]
3. 查看列名 同pandas
df.columns
['PassengerId',
'Survived',
'Pclass',
'Name',
'Sex',
'Age',
'SibSp',
'Parch',
'Ticket',
'Fare',
'Cabin',
'Embarked']
4. 查看行數(shù) pandas len(df)
df.count()
891
5. 重命名列名
pandas df.rename(columns={'Sex':sex})
#withColumnRenamed方法
df = df.withColumnRenamed('Age','age')\
.withColumnRenamed('Sex','sex')
df.columns
6.選擇和切片
6.1 選擇一列
df.select('Age').show(2)

6.2 選擇多列
df.select('Name','Age').show(2)

6.3 多列選擇和切片
df.select('Name','Age')\
.filter(df['Age']>70).show()

6.4 between 范圍選擇
df.filter(df['Age'].between(68,72))\
.select('Name','Age').show()

6.5 聯(lián)合篩選
df.filter(df['Age']>30)\
.filter(df['Sex']=='male')\
.select('Name','Sex','Age').show()

6.6 like 包含文字信息
df.filter("Name like '%Mrs%'").show()

6.7 sql方式選擇
# 首先dataframe注冊(cè)為臨時(shí)表,然后執(zhí)行SQL查詢
df.createOrReplaceTempView('df_sql')
spark.sql('select Name,Age,Sex from df_sql where Age>30 and Age<35').show()

7. 刪除某行
df.drop('PassengerId').show()

8. 增加某列
import pyspark.sql.functions as fn
df.withColumn('id',fn.lit(0)).show()

9. 排序
#pandas 排序
#df.sort_values()
#排序
df.sort('age',ascending=False).show()#ascending=False 倒序
#多列排序
df.sort('age','fare',ascending=False).show()
#混合排序
df.sort(df['Age'].desc(),df['Fare'].asc()).show()#age字段倒序,fare字段正序
# orderBy也是排序,返回的Row對(duì)象列表
df.orderBy('age','fare').take(3)

10. 重復(fù)值
#查看是否有重復(fù)項(xiàng)
print('去重前行數(shù)',df.count())
print('去重后行數(shù)',df.distinct().count())
#去除重復(fù)行,同pandas
df.drop_duplicates()
去重前行數(shù) 891
去重后行數(shù) 891
11. 缺失值
#查看各字段缺失率
df.agg(*[(1-(fn.count(c) /fn.count('*'))).alias(c) for c in df.columns]).show()
#其中age,Cabin,Embarked字段有缺失

#Cabin 缺失率較大刪除該字段
df = df.drop('cabin')
#age字段缺失率20%,填充均值
agg_mean = round(df.select(fn.mean('age')).collect()[0][0],0)
#Embarked缺失率較少,填充眾數(shù)
#查看Embarked字段各分類總計(jì)
df.groupby('embarked').count().sort('count',ascending=False).show()
df = df.fillna({'age':agg_mean,'embarked':'S'})
#去除缺失值大于閾值的行
#df.drop(thresh=)
#姓名為字符型,從字符變量中提取有效信息
str1 = df.select(fn.split('name',',')[1].alias('name'))
str2 = str1.select(fn.split('name','. ')[0].alias('name'))