CS190 Scalable Machine Learning Spark -word count 實(shí)戰(zhàn)

ML Spark Pyspark


word count 實(shí)戰(zhàn)

sc.parallelize創(chuàng)建一個(gè)基本的RDD

wordsList = ['cat', 'elephant', 'rat', 'rat', 'cat']
wordsRDD = sc.parallelize(wordsList, 4)
# Print out the type of wordsRDD
print type(wordsRDD)
#lambda 函數(shù)
pluralLambdaRDD = wordsRDD.map(lambda x : x+'s')
print pluralLambdaRDD.collect()

#Out:['cats', 'elephants', 'rats', 'rats', 'cats']
#用map 計(jì)算每個(gè)單詞的長(zhǎng)度
pluralLengths = (pluralRDD
                 .map(lambda x: len(x))
                 .collect())
print pluralLengths
#Out:[4, 9, 4, 4, 4]

創(chuàng)建一個(gè)pari RDD,值是 (k,v) k 是key v是value

wordPairs = wordsRDD.map(lambda x : (x,1))
print wordPairs.collect()

#Out: [('cat', 1), ('elephant', 1), ('rat', 1), ('rat', 1), ('cat', 1)]

reduceByKey計(jì)算個(gè)數(shù)

# Note that reduceByKey takes in a function that accepts two values and returns a single value
wordCounts = wordPairs.reduceByKey(lambda x,y : x+y)
print wordCounts.collect()
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]

word count

#各步驟合并在一起 
wordCountsCollected = (wordsRDD
                       .map(lambda x : (x,1))
                       .reduceByKey(lambda x,y: x+y)
                       .collect())
print wordCountsCollected
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]

計(jì)算 均值

from operator import add
#去重后記錄條數(shù)
uniqueWords = wordCounts.count()
#總記錄條數(shù)
totalCount = (wordCounts
              .map(lambda (k,v): (v))
              .reduce(add))
#均值              
average = totalCount / float(uniqueWords)
print totalCount
#Out: 5
print round(average, 2)
#Out: 1.67

實(shí)戰(zhàn)##

1.定義wordCount函數(shù),輸入wordListRDD,返回 wordCount RDD.

def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.
    Args:
        wordListRDD (RDD of str): An RDD consisting of words.
    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return (wordListRDD
            .map(lambda x :(x,1))
            .reduceByKey(lambda x,y:x+y)
            )
            
print wordCount(wordsRDD).collect()
#Out:[('rat', 2), ('elephant', 1), ('cat', 2)]

2.定義特殊字符處理函數(shù)

import re
def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.
    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.
    Args:
        text (str): A string.
    Returns:
        str: The cleaned up string.
    """
    return re.sub(r'[^\w\s]','',text).strip().lower()

    
print removePunctuation('Hi, you!')
print removePunctuation(' No under_score!')
print removePunctuation(' *      Remove punctuation then spaces  * ')

#Out:hi you
#Out:no under_score
#Out:remove punctuation then spaces

3.導(dǎo)入文件

import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab1', 'shakespeare.txt')
fileName = os.path.join(baseDir, inputPath)

#導(dǎo)入文件,處理特殊字符
shakespeareRDD = (sc
                  .textFile(fileName, 8)
                  .map(removePunctuation))
                  
                  
print '\n'.join(shakespeareRDD
                .zipWithIndex()  # to (line, lineNum)
                .map(lambda (l, num): '{0}: {1}'.format(num, l))  # to 'lineNum: line'
                .take(10))
"""
Out:
0: 1609
1: 
2: the sonnets
3: 
4: by william shakespeare
5: 
6: 
7: 
8: 1
9: from fairest creatures we desire increase
10: that thereby beautys rose might never die
"""

4.字符串轉(zhuǎn)成單詞

shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x: x.split(' '))
shakespeareWordCount = shakespeareWordsRDD.count()
print shakespeareWordsRDD.top(5)
print shakespeareWordCount
#Out:[u'zwaggerd', u'zounds', u'zounds', u'zounds', u'zounds']
#Out:927631

5.過濾空字符

shakeWordsRDD = shakespeareWordsRDD.filter(lambda x: x<>'')

shakeWordCount = shakeWordsRDD.count()
print shakeWordCount
#Out:882996

6.計(jì)算Top15 單詞


top15WordsAndCounts = (wordCount(shakeWordsRDD)
                       .takeOrdered(15,key=lambda x: -1 * x[1] ))
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))

"""
Out:
the: 27361
and: 26028
i: 20681
to: 19150
of: 17463
a: 14593
you: 13615
my: 12481
in: 10956
that: 10890
is: 9134
not: 8497
with: 7771
me: 7769
it: 7678
"""
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容