- 代碼
results = res.rdd\
.map(lambda word: (word[0].replace(u"(", u"(").replace(u")", u")"), word[1], word[2]))\
.filter(lambda word: word[0] in companys_list)\
.map(lambda word: (companys_dic.get(word[0], word[0]),word[1], word[2]))\
.filter(lambda word: word[1] != None and word[1] != u'None')
schemaString = "company_name,query,keyword"
fields = list(map(lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(",")))
schema = StructType(fields)
results_df = self.spark.createDataFrame(results, schema).distinct()
#results_df.show(100)
output_file = "file:///home/spark/query_20200520_uniq.csv"
results_df.write.mode("overwrite").options(header="true").csv(output_file, sep='\t')
但是運(yùn)行結(jié)束只得到一個(gè)文件夾,并沒有得到期望的結(jié)果,查閱資料發(fā)現(xiàn)保存到本地文件系統(tǒng)(file:///)只有再local模式下才能生效,在cluster模式下(不論是yarn-client還是yarn-cluster)都無法使用。
因此需要修改運(yùn)行腳本,設(shè)置--master local[*] 。最終得到了期望的數(shù)據(jù)。