CS190 Scalable Machine Learning Spark - Introduction Spark

標(biāo)簽(空格分隔): Spark ML


RDDs

? Two types of operations: transformations and actions
? Transformations are lazy (not computed immediately)
? Transformed RDD is executed when action runs on it
? Persist (cache) RDDs in memory or disk

Working with RDDs

? Create an RDD from a data source:
? Apply transformations to an RDD: map filter
? Apply actions to an RDD: collect count

Some Transformations

Transformation Description
map(func) return a new distributed dataset formed by passing each element of the source through a function func
filter(func) return a new dataset formed by selecting those elements of the source on which func returns true
distinct([numTasks])) return a new dataset that contains the distinct elements of the source dataset
flatMap(func) similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item)

例子:

>>> rdd =   sc.parallelize([1,  2,  3,  4]) 
>>> rdd.map(lambda  x:  x   *   2)  
RDD:    [1, 2,  3,  4]  →   [2, 4,  6,  8]  
>>> rdd.filter(lambda   x:  x   %   2   ==  0)  
RDD:    [1, 2,  3,  4]  →   [2, 4]  
>>> rdd2    =   sc.parallelize([1,  4,  2,  2,  3]) 
>>> rdd2.distinct() 
RDD:    [1, 4,  2,  2,  3]  →   [1, 4,  2,  3]  

>>> rdd =   sc.parallelize([1,  2,  3]) 
>>> rdd.Map(lambda  x:  [x, x+5])   
RDD:    [1, 2,  3]  →   [[1,    6], [2, 7], [3, 8]] 
>>> rdd.flatMap(lambda  x:  [x, x+5])   
RDD:    [1, 2,  3]  →   [1, 6,  2,  7,  3,  8]  

Spark Actions

? Cause Spark to execute recipe to transform source
? Mechanism for getting results out of Spark

Action Description
reduce(func) aggregate dataset’s elements using function func.func takes two arguments and returns one, and is commutative and associative so that it can be computed correctly in parallel
take(n) return an array with the first n elements collect() return all the elements as an array WARNING: make sure will fit in driver program
takeOrdered(n, key=func) return n elements ordered in ascending order or as specified by the optional key function

例子

>>> rdd =   sc.parallelize([1,  2,  3]) 
>>> rdd.reduce(lambda   a,  b:  a   *   b)  
Value:  6 
#(1 * 2 * 3)
>>> rdd.take(2) 
Value:  [1,2]   #   as  list
>>> rdd.collect()   
Value:  [1,2,3] #   as  list    
>>> rdd =   sc.parallelize([5,3,1,2])
>>> rdd.takeOrdered(3,  lambda  s:  -1  *   s)  
Value:  [5,3,2] #   as  list

.count()
.cache()

lines   =   sc.textFile("...",  4)  
lines.cache()   #   save,   don't   recompute!  
comments    =   lines.filter(isComment) 
print   lines.count(),comments.count()  

Spark Program Lifecycle

  1. Create RDDs from external data or parallelize a collection in your driver program
  2. Lazily transform them into new RDDs
  3. cache() some RDDs for reuse
  4. Perform actions to execute parallel computation and produce results

Key-Value RDDs

Key-Value Transformation Description
reduceByKey(func) return a new distributed dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V
sortByKey() return a new dataset (K, V) pairs sorted by keys in ascending order
groupByKey() return a new dataset of (K, Iterable<V>) pairs

! 使用groupByKey()是要注意,可能需要大量數(shù)據(jù)在網(wǎng)絡(luò)中移動(dòng),同時(shí)生成的list可能非常大,導(dǎo)致worker內(nèi)存耗盡

>>> rdd =   sc.parallelize([(1,2),  (3,4),  (3,6)]) 
>>> rdd.reduceByKey(lambda  a,  b:  a   +   b)      
RDD:    [(1,2), (3,4),  (3,6)]  →   [(1,2), (3,10)] 
>>> rdd2    =   sc.parallelize([(1,'a'),    (2,'c'),    (1,'b')])   
>>> rdd2.sortByKey()
RDD:    [(1,'a'),   (2,'c'),    (1,'b')]    →   [(1,'a'),   (1,'b'),    (2,'c')]
>>> rdd2    =   sc.parallelize([(1,'a'),    (2,'c'),    (1,'b')])   
>>> rdd2.groupByKey()   
RDD:    [(1,'a'),   (1,'b'),    (2,'c')]    →   [(1,['a','b']), (2,['c'])]

pySpark Shared Variables

Broadcast Variables(廣播變量)
? Efficiently send large, read-only value to all workers
? Saved at workers for use in one or more Spark operations
? Like sending a large, read-only lookup table to all the nodes

#   Country code lookup for HAM radio call signs
#   Lookup  the locations   of  the call    signs   on  the 
#   RDD contactCounts.  We  load    a   list    of  call    sign        
#   prefixes    to  country code    to  support this    lookup      
signPrefixes    =   sc.broadcast(loadCallSignTable())       

def processSignCount(sign_count,    signPrefixes):
    country =   lookupCountry(sign_count[0],    signPrefixes.value) 
    count   =   sign_count[1]   
    return (country,    count)      
    
countryContactCounts    =   (contactCounts
                            .map(processSignCount)  
                            .reduceByKey((lambda x, y:  x+  y)))        

Accumulators (累加器)
? Aggregate values from workers back to driver
? Only driver can access value of accumulator
? For tasks, accumulators are write-only
? Use to count errors seen in RDD across workers

# Counting empty lines
file=sc.textFile(inputFile) 
#   Create  Accumulator[Int]    initialized to  0       
blankLines  =   sc.accumulator(0)

def extractCallSigns(line): 
    global blankLines # Make    the global  variable    accessible  
    if (line    ==  ""):        
        blankLines  +=  1       
    return line.split(" ")  
        
callSigns   =   file.flatMap(extractCallSigns)      
print   "Blank  lines:  %d" %   blankLines.value        

--
更多文檔參考:
Introduction to Big Data with Apache Spark
pySpark 文檔
pyspark-pictures

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容