#if run in windows use this
import findspark
findspark.init()
# import
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import IPython
# #version
# print("pyspark version:" + str(sc.version))
# print("Ipython version:" + str(IPython.__version__))
# #agg 聚合
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.agg({"amt": "avg"})
# x.show()
# y.show()
# # alias 返回這個列的新的別名或別名們
# from pyspark.sql.functions import col
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.alias('transactions')
# x.show()
# y.show()
# y.select(col("transactions.to")).show()
# # cache
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.cache()
# print(x.count())? # first action materializes x in memory
# print(x.count())? # later actions avoid IO overhead
# # coalesce 重分區(qū)函數(shù)
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x_rdd = sc.parallelize(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], 2)
# x = sqlContext.createDataFrame(x_rdd, ['from', 'to', 'amt'])
# y = x.coalesce(numPartitions=1)
# print(x.rdd.getNumPartitions())
# print(y.rdd.getNumPartitions())
# # collect
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.collect()? # creates list of rows on driver
# x.show()
# print(y)
# # columns
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.columns? # creates list of column names on driver
# x.show()
# print(y)
# # # corr
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (
#? ? "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])
# y = x.corr(col1="amt", col2="fee")
# x.show()
# print(y)
# # count
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# print(x.count())
# # cov
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (
#? ? "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])
# y = x.cov(col1="amt", col2="fee")
# x.show()
# print(y)
sc = SparkContext('local')
sqlContext = SQLContext(sc)
# # crosstab
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.crosstab(col1='from', col2='to')
# x.show()
# y.show()
# # cube
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Alice", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.cube('from', 'to')
# x.show()
# print(y)? # y is a grouped data object, aggregations will be applied to all numerical columns
# y.sum().show()
# y.max().show()
# # describe
'''計算數(shù)值列的統(tǒng)計信息。
包括計數(shù),平均,標準差,最小和最大。如果沒有指定任何列,這個函數(shù)計算統(tǒng)計所有數(shù)值列'''
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# x.describe().show()
# # distinct 返回行去重的新的DataFrame。
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (
#? ? "Carol", "Dave", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.distinct()
# x.show()
# y.show()
# # drop
# '''
# 返回刪除指定列的新的DataFrame。
# 參數(shù):●? col – 要刪除列的字符串類型名稱,或者要刪除的列。
# '''
# x = sqlContext.createDataFrame(
#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.drop('amt')
# x.show()
# y.show()
# # dropDuplicates / drop_duplicates
# '''
# 返回去掉重復行的一個新的DataFrame,通常只考慮某幾列。
# drop_duplicates()和dropDuplicates()類似。
# '''
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (
#? ? "Bob", "Carol", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.dropDuplicates(subset=['from', 'to'])
# x.show()
# y.show()
# # dropna
# '''
# 返回一個刪除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()類似。
# 參數(shù):●? how – 'any'或者'all'。如果'any',刪除包含任何空值的行。如果'all',刪除所有值為null的行。
# ●? thresh – int,默認為None,如果指定這個值,刪除小于閾值的非空值的行。這個會重寫'how'參數(shù)。
# ●? subset – 選擇的列名稱列表。
# '''
# x = sqlContext.createDataFrame([(None, "Bob", 0.1), ("Bob", "Carol", None), (
#? ? "Carol", None, 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.dropna(how='any', subset=['from', 'to'])
# x.show()
# y.show()
# # dtypes
# '''
# 返回所有列名及類型的列表。
# '''
# x = sqlContext.createDataFrame(
#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.dtypes
# x.show()
# print(y)
# # explain
# '''
# 將(邏輯和物理)計劃打印到控制臺以進行調試。
# 參數(shù):●? extended – boolean類型,默認為False。如果為False,只打印物理計劃。
# '''
# x = sqlContext.createDataFrame(
#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# x.agg({"amt": "avg"}).explain(extended=True)
# # fillna
# '''
# 替換空值,和na.fill()類似,DataFrame.fillna()和dataframenafunctions.fill()類似。
# 參數(shù):●? value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset參數(shù)將被忽略。值必須是要替換的列的映射,替換值必須是int,long,float或者string.
#? ? ●? subset - 要替換的列名列表。在subset指定的列,沒有對應數(shù)據(jù)類型的會被忽略。例如,如果值是字符串,subset包含一個非字符串的列,這個非字符串的值會被忽略。
# '''
# x = sqlContext.createDataFrame(
#? ? [(None, "Bob", 0.1), ("Bob", "Carol", None), ("Carol", None, 0.3)], ['from', 'to', 'amt'])
# y = x.fillna(value='unknown', subset=['from', 'to'])
# x.show()
# y.show()
# # filter
# x = sqlContext.createDataFrame(
#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.filter("amt > 0.1")
# x.show()
# y.show()
# # first
# x = sqlContext.createDataFrame(
#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.first()
# x.show()
# print(y)
# flatMap
'''
返回在每行應用F函數(shù)后的新的RDD,然后將結果壓扁。
是df.rdd.flatMap()的簡寫。
'''
x = sqlContext.createDataFrame(
? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
y = x.flatMap(lambda x: (x[0], x[2]))
print(y)? # implicit coversion to RDD
y.collect()