標(biāo)簽(空格分隔): Spark ML
RDDs
? Two types of operations: transformations and actions
? Transformations are lazy (not computed immediately)
? Transformed RDD is executed when action runs on it
? Persist (cache) RDDs in memory or disk
Working with RDDs
? Create an RDD from a data source:
? Apply transformations to an RDD: map filter
? Apply actions to an RDD: collect count
Some Transformations
| Transformation | Description |
|---|---|
| map(func) | return a new distributed dataset formed by passing each element of the source through a function func |
| filter(func) | return a new dataset formed by selecting those elements of the source on which func returns true |
| distinct([numTasks])) | return a new dataset that contains the distinct elements of the source dataset |
| flatMap(func) | similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item) |
例子:
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.map(lambda x: x * 2)
RDD: [1, 2, 3, 4] → [2, 4, 6, 8]
>>> rdd.filter(lambda x: x % 2 == 0)
RDD: [1, 2, 3, 4] → [2, 4]
>>> rdd2 = sc.parallelize([1, 4, 2, 2, 3])
>>> rdd2.distinct()
RDD: [1, 4, 2, 2, 3] → [1, 4, 2, 3]
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.Map(lambda x: [x, x+5])
RDD: [1, 2, 3] → [[1, 6], [2, 7], [3, 8]]
>>> rdd.flatMap(lambda x: [x, x+5])
RDD: [1, 2, 3] → [1, 6, 2, 7, 3, 8]
Spark Actions
? Cause Spark to execute recipe to transform source
? Mechanism for getting results out of Spark
| Action | Description |
|---|---|
| reduce(func) | aggregate dataset’s elements using function func.func takes two arguments and returns one, and is commutative and associative so that it can be computed correctly in parallel |
| take(n) | return an array with the first n elements collect() return all the elements as an array WARNING: make sure will fit in driver program |
| takeOrdered(n, key=func) | return n elements ordered in ascending order or as specified by the optional key function |
例子
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda a, b: a * b)
Value: 6
#(1 * 2 * 3)
>>> rdd.take(2)
Value: [1,2] # as list
>>> rdd.collect()
Value: [1,2,3] # as list
>>> rdd = sc.parallelize([5,3,1,2])
>>> rdd.takeOrdered(3, lambda s: -1 * s)
Value: [5,3,2] # as list
.count()
.cache()
lines = sc.textFile("...", 4)
lines.cache() # save, don't recompute!
comments = lines.filter(isComment)
print lines.count(),comments.count()
Spark Program Lifecycle
- Create RDDs from external data or parallelize a collection in your driver program
- Lazily transform them into new RDDs
- cache() some RDDs for reuse
- Perform actions to execute parallel computation and produce results
Key-Value RDDs
| Key-Value Transformation | Description |
|---|---|
| reduceByKey(func) | return a new distributed dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V |
| sortByKey() | return a new dataset (K, V) pairs sorted by keys in ascending order |
| groupByKey() | return a new dataset of (K, Iterable<V>) pairs |
! 使用groupByKey()是要注意,可能需要大量數(shù)據(jù)在網(wǎng)絡(luò)中移動(dòng),同時(shí)生成的list可能非常大,導(dǎo)致worker內(nèi)存耗盡
>>> rdd = sc.parallelize([(1,2), (3,4), (3,6)])
>>> rdd.reduceByKey(lambda a, b: a + b)
RDD: [(1,2), (3,4), (3,6)] → [(1,2), (3,10)]
>>> rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
>>> rdd2.sortByKey()
RDD: [(1,'a'), (2,'c'), (1,'b')] → [(1,'a'), (1,'b'), (2,'c')]
>>> rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
>>> rdd2.groupByKey()
RDD: [(1,'a'), (1,'b'), (2,'c')] → [(1,['a','b']), (2,['c'])]
pySpark Shared Variables
Broadcast Variables(廣播變量)
? Efficiently send large, read-only value to all workers
? Saved at workers for use in one or more Spark operations
? Like sending a large, read-only lookup table to all the nodes
# Country code lookup for HAM radio call signs
# Lookup the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y)))
Accumulators (累加器)
? Aggregate values from workers back to driver
? Only driver can access value of accumulator
? For tasks, accumulators are write-only
? Use to count errors seen in RDD across workers
# Counting empty lines
file=sc.textFile(inputFile)
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # Make the global variable accessible
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
print "Blank lines: %d" % blankLines.value
--
更多文檔參考:
Introduction to Big Data with Apache Spark
pySpark 文檔
pyspark-pictures