import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import datetime
from pyspark.sql.types import *
def transform_time(timestamp):
return datetime.datetime.utcfromtimestamp(float(timestamp)).strftime("%Y%m")
def filter_func(row):
ss = row[1].FILM_TPYE.encode("utf-8").lower()
if row[0].lower() in ss:
return row
def sort_func(k,v):
tmp_list = []
for tu in v:
tmp_list.append(tu)
res_list = sorted(tmp_list,key=lambda x:x[2],reverse=True)
return res_list[0:10]
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local") \
.appName("Dataloader") \
.getOrCreate()
mongo_read_uri = "mongodb://127.0.0.1:27017/raw_data"
mongo_Statistic_uri = "mongodb://127.0.0.1:27017/Statistic_rec"
rating_table = "ratings"
movies_table = "movies"
ratings_df = spark.read \
.option("uri",mongo_read_uri) \
.option("collection",rating_table) \
.format("com.mongodb.spark.sql") \
.load()
transform_time_udf = func.udf(transform_time,StringType())
ratings_df = ratings_df.withColumn("YearMonth",transform_time_udf("TIMESTAMP"))
movies_df = spark.read \
.option("uri",mongo_read_uri) \
.option("collection",movies_table) \
.format("com.mongodb.spark.sql") \
.load()
ratings_df.createOrReplaceTempView("ratings")
movies_df.createOrReplaceTempView("movies")
# sum the movie counts
RateMoreMovies = spark.sql("select MOVIE_ID, count(MOVIE_ID) as Count from ratings group by MOVIE_ID")
RateMoreMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","RateMoreMovies") \
.mode("overwrite") \
.save()
#sum the movie count mongthly
RateMoreRecentlyMovies = spark.sql("select MOVIE_ID,YearMonth,count(MOVIE_ID) from ratings group by YearMonth,MOVIE_ID")
RateMoreRecentlyMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","RateMoreRecentlyMovies") \
.mode("overwrite") \
.save()
#sum the avrage score of each film
AverageMovies = spark.sql("select MOVIE_ID,avg(RATING_SCORE) as Avg_score from ratings group by MOVIE_ID")
AverageMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","AverageMovies") \
.mode("overwrite") \
.save()
#sum the top10 films of each type
movieswithScore = movies_df.join(AverageMovies,"MOVIE_ID")
genres = ["Action","Adventure","Animals","Comedy","Crime","Documentary","Drama","Family","Romance","Science","Tv","Thriller","War","Western"]
genres_rdd = spark.sparkContext.parallelize(genres)
re_rdd = genres_rdd.cartesian(movieswithScore.rdd) \
.filter(filter_func) \
.map(lambda row:(row[0],row[1].MOVIE_ID,row[1].Avg_score)) \
.groupBy(lambda x:x[0]) \
.map(lambda (k,v):sort_func(k,v)) \
.flatMap(lambda tu:tu)
schema_label = [
("Genres",StringType()),
("MOVIE_ID",StringType()),
("Avg_score",StringType())
]
schema = StructType([StructField(e[0],e[1],False) for e in schema_label])
GenresTopMovies = spark.createDataFrame(re_rdd,schema)
GenresTopMovies.write \
.format("com.mongodb.spark.sql") \
.option("uri",mongo_Statistic_uri) \
.option("collection","GenresTopMovies") \
.mode("overwrite") \
.save()
#stop spark
spark.stop()
Spark讀寫MongoDB數(shù)據(jù)并統(tǒng)計
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
相關閱讀更多精彩內(nèi)容
- python——時間與時間戳之間的轉(zhuǎn)換 對于時間數(shù)據(jù),如2016-05-05 20:28:54,有時需要與時間戳進...
- 原文地址:RFC3550 RTP 中文版 英文版原文:RFC3550英文版 - RTP: A Transport ...
- Spark RDD(Resilient Distributed Datasets)論文 概要 1: 介紹 2: R...
- 提到編程,我們會馬上想到一些通用的編程語言,比如C、C++、Java、Python、Go等。但是,對于絕大部分軟件...