一、DF轉(zhuǎn)換器
- Transformer:SparkML中有很多直接對(duì)DF進(jìn)行變換的類,如TF-IDF,PCA等,它們統(tǒng)稱Transformer;
%20%5Crightarrow%20DF)
子類主要需要實(shí)現(xiàn)_transform方法。
from pyspark.ml import Estimator, Model
class PytorchClassifyModel(Model):
def _transform(self, dataset: DataFrame):
# ...
return spark.createDataFrame(rdd)
- Estimator:還有很多需要訓(xùn)練后才能對(duì)DF進(jìn)行變換的類,如LR,GBDT以及NaiveBayes,它們統(tǒng)稱Estimator,訓(xùn)練后產(chǎn)出Model,Model也是一種Transformer,因?yàn)樗材苤苯訉?duì)DF進(jìn)行變換;
%20%5Crightarrow%20Model(Transformer)%20%5C%5C%20%26DF%20%5Crightarrow%20Model.transform()%20%5Crightarrow%20DF%20%5Cend%7Baligned%7D)
子類主要需要實(shí)現(xiàn)_fit方法,并返回一個(gè)Model對(duì)象。
class PytorchLocalGPUClassifier(Estimator):
def _fit(self, dataset) -> PytorchClassifyModel:
# ...
return PytorchClassifyModel(model, input_cols, batch_size)
- Pipeline:因?yàn)門ransformer對(duì)DF變換后仍產(chǎn)出DF,于是串聯(lián)多個(gè)Transformer可以對(duì)DF進(jìn)行流式處理。Pipeline就是專門處理流式DF的類。它可以串聯(lián)多個(gè)Estimator和Transformer,經(jīng)過訓(xùn)練后,產(chǎn)出PipelineModel,就是一連串Model和Transformer。因此,Pipeline繼承自Estimator,PipelineModel繼承自Model。
![\begin{aligned} &p = Pipline(stages=[Estimators|Transformers]) \\ &DF \rightarrow p.fit() \rightarrow PiplineModel(Model) \\ &DF \rightarrow PiplineModel.transform() \rightarrow DF \end{aligned}](https://math.jianshu.com/math?formula=%5Cbegin%7Baligned%7D%20%26p%20%3D%20Pipline(stages%3D%5BEstimators%7CTransformers%5D)%20%5C%5C%20%26DF%20%5Crightarrow%20p.fit()%20%5Crightarrow%20PiplineModel(Model)%20%5C%5C%20%26DF%20%5Crightarrow%20PiplineModel.transform()%20%5Crightarrow%20DF%20%5Cend%7Baligned%7D)
繼承了Transformer和Estimator后,自然可以用在Pipeline和PipelineModel中。
二、模型參數(shù)
- Param:模型Model或者Transformer和Estimator都有一些參數(shù)可供調(diào)整,如輸入列名、輸出列名、PCA的主成分?jǐn)?shù)量、LR的迭代次數(shù)等。SparkML中的Param只是變量的一種名字,包含了變量名稱、說明和轉(zhuǎn)換方式,不包含具體變量的內(nèi)容。
- Params:是變量的容器;一堆參數(shù)才是所有模型的共性,因此,Transformer和Estimator都繼承自Params。
Params:
_paramMap={}
_defaultParamMap={}
_set(**kwargs)
_setDefault(**kwargs)
getOrDefault(param)
Foo(Params):
inputCols = Param(Params._dummy(), "inputCols", "input cols", typeConverter=TypeConverters.toListString)
batchSize = Param(Params._dummy(), "batchSize", "batch size", typeConverter=TypeConverters.toInt)
inputColsType = Param(Params._dummy(), "inputColsType", "types of input cols")
def __init__(self, batch_size: int = 100):
self._setDefault(inputCols=['data'], inputColsType=input_cols_type)
self._set(batchSize=batch_size)
def setInputCols(self, value):
self._set(inputCols=value)
三、模型的存取
from pyspark.ml.util import MLReadable, MLWritable, DefaultParamsWriter, DefaultParamsReader
class PytorchClassifyModel(Model, MLWritable, MLReadable):
def write(self):
'''MLWritable的方法,返回一個(gè)有save方法的類,被稱為Writer'''
return self
def save(self, path):
'''實(shí)際存儲(chǔ)代碼'''
DefaultParamsWriter(self).save(path)
sc = SparkSession.builder.getOrCreate().sparkContext
buffer = io.BytesIO()
torch.save(self.model, buffer)
sc.parallelize([buffer.getvalue()], 1).saveAsPickleFile(f'{path}/model.pk')
@classmethod
def read(cls):
'''MLReadable的方法,返回一個(gè)有l(wèi)oad方法的類,被稱為Reader'''
return cls
@classmethod
def load(cls, path):
'''實(shí)際讀取代碼'''
m: PytorchClassifyModel = DefaultParamsReader(cls).load(path)
sc = SparkSession.builder.getOrCreate().sparkContext
model_pk_str = sc.pickleFile(f'{path}/model.pk', 1).collect()[0]
buffer = io.BytesIO(model_pk_str)
m.model = torch.load(buffer)
return m
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。