pySpark API實操(3)

#if run in windows use this

import findspark

findspark.init()

# import

from pyspark import SparkContext

from pyspark.sql import SQLContext

from pyspark.sql import SparkSession

import IPython

# #version

# print("pyspark version:" + str(sc.version))

# print("Ipython version:" + str(IPython.__version__))

# #agg 聚合

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.agg({"amt": "avg"})

# x.show()

# y.show()

# # alias 返回這個列的新的別名或別名們

# from pyspark.sql.functions import col

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.alias('transactions')

# x.show()

# y.show()

# y.select(col("transactions.to")).show()

# # cache

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.cache()

# print(x.count())? # first action materializes x in memory

# print(x.count())? # later actions avoid IO overhead

# # coalesce 重分區(qū)函數(shù)

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x_rdd = sc.parallelize(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], 2)

# x = sqlContext.createDataFrame(x_rdd, ['from', 'to', 'amt'])

# y = x.coalesce(numPartitions=1)

# print(x.rdd.getNumPartitions())

# print(y.rdd.getNumPartitions())

# # collect

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.collect()? # creates list of rows on driver

# x.show()

# print(y)

# # columns

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.columns? # creates list of column names on driver

# x.show()

# print(y)

# # # corr

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (

#? ? "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])

# y = x.corr(col1="amt", col2="fee")

# x.show()

# print(y)

# # count

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# print(x.count())

# # cov

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (

#? ? "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])

# y = x.cov(col1="amt", col2="fee")

# x.show()

# print(y)

sc = SparkContext('local')

sqlContext = SQLContext(sc)

# # crosstab

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.crosstab(col1='from', col2='to')

# x.show()

# y.show()

# # cube

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Alice", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.cube('from', 'to')

# x.show()

# print(y)? # y is a grouped data object, aggregations will be applied to all numerical columns

# y.sum().show()

# y.max().show()

# # describe

'''計算數(shù)值列的統(tǒng)計信息。

包括計數(shù),平均,標準差,最小和最大。如果沒有指定任何列,這個函數(shù)計算統(tǒng)計所有數(shù)值列'''

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# x.describe().show()

# # distinct 返回行去重的新的DataFrame。

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (

#? ? "Carol", "Dave", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.distinct()

# x.show()

# y.show()

# # drop

# '''

# 返回刪除指定列的新的DataFrame。

# 參數(shù):●? col – 要刪除列的字符串類型名稱,或者要刪除的列。

# '''

# x = sqlContext.createDataFrame(

#? ? [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.drop('amt')

# x.show()

# y.show()

# # dropDuplicates / drop_duplicates

# '''

# 返回去掉重復行的一個新的DataFrame,通常只考慮某幾列。

# drop_duplicates()和dropDuplicates()類似。

# '''

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (

#? ? "Bob", "Carol", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.dropDuplicates(subset=['from', 'to'])

# x.show()

# y.show()

# # dropna

# '''

# 返回一個刪除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()類似。

# 參數(shù):●? how – 'any'或者'all'。如果'any',刪除包含任何空值的行。如果'all',刪除所有值為null的行。

#    ●? thresh – int,默認為None,如果指定這個值,刪除小于閾值的非空值的行。這個會重寫'how'參數(shù)。

#    ●? subset – 選擇的列名稱列表。

# '''

# x = sqlContext.createDataFrame([(None, "Bob", 0.1), ("Bob", "Carol", None), (

#? ? "Carol", None, 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.dropna(how='any', subset=['from', 'to'])

# x.show()

# y.show()

# # dtypes

# '''

# 返回所有列名及類型的列表。

# '''

# x = sqlContext.createDataFrame(

#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.dtypes

# x.show()

# print(y)

# # explain

# '''

# 將(邏輯和物理)計劃打印到控制臺以進行調試。

# 參數(shù):●? extended – boolean類型,默認為False。如果為False,只打印物理計劃。

# '''

# x = sqlContext.createDataFrame(

#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# x.agg({"amt": "avg"}).explain(extended=True)

# # fillna

# '''

# 替換空值,和na.fill()類似,DataFrame.fillna()和dataframenafunctions.fill()類似。

# 參數(shù):●? value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset參數(shù)將被忽略。值必須是要替換的列的映射,替換值必須是int,long,float或者string.

#? ?   ●? subset - 要替換的列名列表。在subset指定的列,沒有對應數(shù)據(jù)類型的會被忽略。例如,如果值是字符串,subset包含一個非字符串的列,這個非字符串的值會被忽略。

# '''

# x = sqlContext.createDataFrame(

#? ? [(None, "Bob", 0.1), ("Bob", "Carol", None), ("Carol", None, 0.3)], ['from', 'to', 'amt'])

# y = x.fillna(value='unknown', subset=['from', 'to'])

# x.show()

# y.show()

# # filter

# x = sqlContext.createDataFrame(

#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.filter("amt > 0.1")

# x.show()

# y.show()

# # first

# x = sqlContext.createDataFrame(

#? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.first()

# x.show()

# print(y)

# flatMap

'''

返回在每行應用F函數(shù)后的新的RDD,然后將結果壓扁。

是df.rdd.flatMap()的簡寫。

'''

x = sqlContext.createDataFrame(

? ? [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

y = x.flatMap(lambda x: (x[0], x[2]))

print(y)? # implicit coversion to RDD

y.collect()

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 案來案去又年終,歲考分低早震驚。 老朽常懷昔日景,青春難與舊時同。 臥思鄉(xiāng)下民還苦,醒怒官場路不平。 莫怨紅塵乘野...
    飛哥判案閱讀 793評論 0 5
  • 羅興 律師 知名律師 埃孚歐學院 聯(lián)合創(chuàng)始人、上海律師協(xié)會保險業(yè)務研究委員會委員、上海律師協(xié)會民事法律業(yè)務研究會委...
    劉麗敏A閱讀 2,079評論 0 0
  • 攝影大多講究光線和角度,那么,陰天又該如何拍攝? 鮮花和人像的光線要求較高,故不在我的考慮范圍,最終決定拍拍山水,...
    木旸閱讀 1,001評論 9 11
  • “喂,你不知道你這樣說話會嚇死人?。∮胁〉恼媸?。”夏微微頓時火了。 她最討厭他們在談論詭秘事件的時候忽然有人在背后...
    薔薇下的陽光閱讀 321評論 0 1

友情鏈接更多精彩內容