pyspark api 解讀一

pyspark 是spark的python api

公有類信息:

SparkContext:

spark 函數(shù)式編程的主入口.

RDD:

彈性分布式數(shù)據(jù)集,spark的基本抽象.

Broadcast:

廣播變量可以在任務(wù)之間重復(fù)使用.

Accumulator:

任務(wù)之間共享的只增不減的變量.

SparkConf:

配置spark變量.

SparkFiles:

Access files shipped with jobs.

StorageLevel:

細(xì)粒度的持久化等級.

TaskContext:

當(dāng)前正在運行的任務(wù)信息,在worker節(jié)點上,目前是實驗性的

class?pyspark.SparkConf(loadDefaults=True,?_jvm=None,?_jconf=None)[source]

Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.

spark應(yīng)用程序的配置,用來設(shè)置spark的各種各樣的鍵值對參數(shù)。

大多數(shù)情況下,你需要創(chuàng)建一個SparkConf對象,同時它也會加載java體系的參數(shù)。因此,你設(shè)置的任何參數(shù)的優(yōu)先級是高于系統(tǒng)設(shè)置的參數(shù)的。

對于單元測試,你總是可以設(shè)置SparkConf(false)來跳過外部參數(shù)的加載,并且獲得同樣的配置,不管系統(tǒng)的參數(shù)配置是啥。

SparkConf下的所有setter方法支持鏈?zhǔn)讲僮?。比如,你可以這樣寫:

conf.write.setMaster("local").setAppName("My app")

注意:

一旦SparkConf對象傳遞給了Spark,它就會被克隆并且不能夠再被用戶修改了。

contains(key)[source]

配置中是否含有某個制定的key

get(key,?defaultValue=None)[source]

獲取某個key的值或者獲取默認(rèn)值

getAll()[source]

獲取所有參數(shù)值,返回鍵值對列表

set(key,?value)[source]

設(shè)置一個配置屬性.

setAll(pairs)[source]

設(shè)置多個參數(shù),通過傳入鍵值對列表。

Parameters:pairs?– list of key-value pairs to set

setAppName(value)[source]

Set application name.

setExecutorEnv(key=None,?value=None,?pairs=None)[source]

設(shè)置一個傳遞個executor的環(huán)境變量

setIfMissing(key,?value)[source]

設(shè)置一個配置屬性如果這個配置屬性缺失.

setMaster(value)[source]

設(shè)置master的url.

setSparkHome(value)[source]

設(shè)置worker節(jié)點的spark安裝目錄.

toDebugString()[source]

返回一個可打印版本的配置信息,以一個list key=value 對的形式,一個配置一行


class?pyspark.SparkContext(master=None,?appName=None,?sparkHome=None,?pyFiles=None,?environment=None,?batchSize=0,?serializer=PickleSerializer(),?conf=None,?gateway=None,?jsc=None,?profiler_cls=)[source]

Spark 函數(shù)式編程的主要入口,一個SparkContext對象代表了一個Spark集群的鏈接,在集群中,它能夠被用來創(chuàng)建RDD和廣播變量

PACKAGE_EXTENSIONS?= ('.zip', '.egg', '.jar')

accumulator(value,?accum_param=None)[source]

創(chuàng)建一個指定初始值的累加器,使用一個指定的累加器參數(shù)幫助對象來定義指定類型怎樣累加,如果你沒有指定的話,默認(rèn)的累加器參數(shù)是用來指定整型和浮點型數(shù)據(jù)的累加方式的。對于其他類型,你可以自定義一個累加器參數(shù)。

addFile(path,?recursive=False)[source]

添加一個Spark每個節(jié)點都需要加載的文件,path可以是一個本地文件,hdfs文件,hadoop支持的其他文件,或者h(yuǎn)ttp,https,或者ftp uri

在Spark jobs中訪問這個文件,使用SparkFiles.get(fileName)來獲取這個文件的位置

如果recursive設(shè)置成true這里的path也可以是一個目錄,目前這里的目錄僅支持hadoop支持的文件系統(tǒng)目錄。

>>> from pyspark? import? SparkFiles?

>>> path=os.path.join(tempdir,"test.txt")

>>> with open(path,"w") as testFile:

? ? ????????_=testFile.write("100")

>>> sc.addFile(path)

>>> def func(iterator):

????????with open(SparkFiles.get("test.txt")) as testFile:

? ?????????fileVal=int(testFile.readline())

????????????return? [x*fileValforxiniterator]

>>> sc.parallelize([1,2,3,4]).mapPartitions(func).collect()

[100, 200, 300, 400]

addPyFile(path)[source]

為將來在SparkContext上執(zhí)行的所有任務(wù)添加一個.py或者.zip依賴。傳遞的路徑可以是一個本地文件,也可以是一個hdfs上的文件或者其他hadoop支持的文件系統(tǒng),或者是http,https,ftp uri。

applicationId

一個spark應(yīng)用程序的獨一無二的標(biāo)識符。它的格式取決于調(diào)度器的實現(xiàn)方式。

如果是本地spark程序可能是‘local-1433865536131’

如果是yarn程序可能是‘a(chǎn)pplication_1433865536131_34483’

binaryFiles(path,?minPartitions=None)[source]

Note


Experimental

Read a directory of binary files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI as a byte array. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

Note


Small files are preferred, large file is also allowable, but may cause bad performance.

binaryRecords(path,?recordLength)[source]

Note


Experimental

Load data from a flat binary file, assuming each record is a set of numbers with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant.

Parameters:path?– Directory to the input data files

recordLength?– The length at which to split the records

broadcast(value)[source]

Broadcast 是一個只讀的變量,返回一個Broadcast對象用于分布式的方法。這個變量發(fā)送給每個節(jié)點僅一次。

cancelAllJobs()[source]

取消所有已經(jīng)調(diào)度的或者正在運行的job。

cancelJobGroup(groupId)[source]

Cancel active jobs for the specified group. See?SparkContext.setJobGroup?for more information.

defaultMinPartitions

Default min number of partitions for Hadoop RDDs when not given by user

defaultParallelism

Default level of parallelism to use when not given by user (e.g. for reduce tasks)

dump_profiles(path)[source]

Dump the profile stats into directory?path

emptyRDD()[source]

Create an RDD that has no partitions or elements.

getConf()[source]

getLocalProperty(key)[source]

Get a local property set in this thread, or null if it is missing. See?setLocalProperty

classmethod?getOrCreate(conf=None)[source]

Get or instantiate a SparkContext and register it as a singleton object.

Parameters:conf?– SparkConf (optional)

hadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java.

Parameters:path?– path to Hadoop file

inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

hadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read an ‘old’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapred.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopFile(path,?inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is the same as for sc.sequenceFile.

A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java

Parameters:path?– path to Hadoop file

inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

newAPIHadoopRDD(inputFormatClass,?keyClass,?valueClass,?keyConverter=None,?valueConverter=None,?conf=None,?batchSize=0)[source]

Read a ‘new API’ Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile.

Parameters:inputFormatClass?– fully qualified classname of Hadoop InputFormat (e.g. “org.apache.hadoop.mapreduce.lib.input.TextInputFormat”)

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?– (None by default)

valueConverter?– (None by default)

conf?– Hadoop configuration, passed in as a dict (None by default)

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

parallelize(c,?numSlices=None)[source]

Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.

>>> sc.parallelize([0,2,3,4,6],5).glom().collect()[[0], [2], [3], [4], [6]]>>> sc.parallelize(xrange(0,6,2),5).glom().collect()[[], [0], [], [2], [4]]

pickleFile(name,?minPartitions=None)[source]

Load an RDD previously saved using?RDD.saveAsPickleFile?method.

>>> tmpFile=NamedTemporaryFile(delete=True)>>> tmpFile.close()>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name,5)>>> sorted(sc.pickleFile(tmpFile.name,3).collect())[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

range(start,?end=None,?step=1,?numSlices=None)[source]

Create a new RDD of int containing elements from?start?to?end?(exclusive), increased by?step?every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as?end, and?start?is set to 0.

Parameters:start?– the start value

end?– the end value (exclusive)

step?– the incremental step (default: 1)

numSlices?– the number of partitions of the new RDD

Returns:An RDD of int

>>> sc.range(5).collect()[0, 1, 2, 3, 4]>>> sc.range(2,4).collect()[2, 3]>>> sc.range(1,7,2).collect()[1, 3, 5]

runJob(rdd,?partitionFunc,?partitions=None,?allowLocal=False)[source]

Executes the given partitionFunc on the specified set of partitions, returning the result as an array of elements.

If ‘partitions’ is not specified, this will run over all partitions.

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart])[0, 1, 4, 9, 16, 25]

>>> myRDD=sc.parallelize(range(6),3)>>> sc.runJob(myRDD,lambdapart:[x*xforxinpart],[0,2],True)[0, 1, 16, 25]

sequenceFile(path,?keyClass=None,?valueClass=None,?keyConverter=None,?valueConverter=None,?minSplits=None,?batchSize=0)[source]

Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows:

A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes

Serialization is attempted via Pyrolite pickling

If this fails, the fallback is to call ‘toString’ on each key and value

PickleSerializer?is used to deserialize pickled objects on the Python side

Parameters:path?– path to sequncefile

keyClass?– fully qualified classname of key Writable class (e.g. “org.apache.hadoop.io.Text”)

valueClass?– fully qualified classname of value Writable class (e.g. “org.apache.hadoop.io.LongWritable”)

keyConverter?–

valueConverter?–

minSplits?– minimum splits in dataset (default min(2, sc.defaultParallelism))

batchSize?– The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically)

setCheckpointDir(dirName)[source]

Set the directory under which RDDs are going to be checkpointed. The directory must be a HDFS path if running on a cluster.

setJobDescription(value)[source]

Set a human readable description of the current job.

setJobGroup(groupId,?description,?interruptOnCancel=False)[source]

Assigns a group ID to all the jobs started by this thread until the group ID is set to a different value or cleared.

Often, a unit of execution in an application consists of multiple Spark actions or jobs. Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group.

The application can use?SparkContext.cancelJobGroup?to cancel all running jobs in this group.

>>> importthreading>>> fromtimeimportsleep>>> result="Not Set">>> lock=threading.Lock()>>> defmap_func(x):... sleep(100)... raiseException("Task should have been cancelled")>>> defstart_job(x):... globalresult... try:... sc.setJobGroup("job_to_cancel","some description")... result=sc.parallelize(range(x)).map(map_func).collect()... exceptExceptionase:... result="Cancelled"... lock.release()>>> defstop_job():... sleep(5)... sc.cancelJobGroup("job_to_cancel")>>> supress=lock.acquire()>>> supress=threading.Thread(target=start_job,args=(10,)).start()>>> supress=threading.Thread(target=stop_job).start()>>> supress=lock.acquire()>>> print(result)Cancelled

If interruptOnCancel is set to true for the job group, then job cancellation will result in Thread.interrupt() being called on the job’s executor threads. This is useful to help ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

setLocalProperty(key,?value)[source]

Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool.

setLogLevel(logLevel)[source]

Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN

classmethod?setSystemProperty(key,?value)[source]

Set a Java system property, such as spark.executor.memory. This must must be invoked before instantiating SparkContext.

show_profiles()[source]

Print the profile stats to stdout

sparkUser()[source]

Get SPARK_USER for user who is running SparkContext.

startTime

Return the epoch time when the Spark Context was started.

statusTracker()[source]

Return?StatusTracker?object

stop()[source]

Shut down the SparkContext.

textFile(name,?minPartitions=None,?use_unicode=True)[source]

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

>>> path=os.path.join(tempdir,"sample-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello world!")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello world!']

uiWebUrl

Return the URL of the SparkUI instance started by this SparkContext

union(rdds)[source]

Build the union of a list of RDDs.

This supports unions() of RDDs with different serialized formats, although this forces them to be reserialized using the default serializer:

>>> path=os.path.join(tempdir,"union-text.txt")>>> withopen(path,"w")astestFile:... _=testFile.write("Hello")>>> textFile=sc.textFile(path)>>> textFile.collect()['Hello']>>> parallelized=sc.parallelize(["World!"])>>> sorted(sc.union([textFile,parallelized]).collect())['Hello', 'World!']

version

The version of Spark on which this application is running.

wholeTextFiles(path,?minPartitions=None,?use_unicode=True)[source]

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

If use_unicode is False, the strings will be kept as?str?(encoding as?utf-8), which is faster and smaller than unicode. (Added in Spark 1.2)

For example, if you have the following files:

hdfs://a-hdfs-path/part-00000hdfs://a-hdfs-path/part-00001...hdfs://a-hdfs-path/part-nnnnn

Do?rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), then?rdd?contains:

(a-hdfs-path/part-00000,itscontent)(a-hdfs-path/part-00001,itscontent)...(a-hdfs-path/part-nnnnn,itscontent)

Note


Small files are preferred, as each file will be loaded fully in memory.

>>> dirPath=os.path.join(tempdir,"files")>>> os.mkdir(dirPath)>>> withopen(os.path.join(dirPath,"1.txt"),"w")asfile1:... _=file1.write("1")>>> withopen(os.path.join(dirPath,"2.txt"),"w")asfile2:... _=file2.write("2")>>> textFiles=sc.wholeTextFiles(dirPath)>>> sorted(textFiles.collect())[('.../1.txt', '1'), ('.../2.txt', '2')]

class?pyspark.SparkFiles[source]

Resolves paths to files added through L{SparkContext.addFile()}.

SparkFiles contains only classmethods; users should not create SparkFiles instances.

classmethod?get(filename)[source]

Get the absolute path of a file added through?SparkContext.addFile().

classmethod?getRootDirectory()[source]

Get the root directory that contains files added through?SparkContext.addFile().

SparkFiles 主要解決了向spark添加文件的問題,這個文件用于spark的每個節(jié)點,推測spark有自己的臨時目錄存放文件

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容