SparkML中的transformer和estimator

一、DF轉(zhuǎn)換器

  1. Transformer:SparkML中有很多直接對(duì)DF進(jìn)行變換的類,如TF-IDF,PCA等,它們統(tǒng)稱Transformer;
    DF \rightarrow Transformer.transform() \rightarrow DF
    子類主要需要實(shí)現(xiàn)_transform方法。
from pyspark.ml import Estimator, Model
class PytorchClassifyModel(Model):
    def _transform(self, dataset: DataFrame):
        # ...
        return spark.createDataFrame(rdd)
  1. Estimator:還有很多需要訓(xùn)練后才能對(duì)DF進(jìn)行變換的類,如LR,GBDT以及NaiveBayes,它們統(tǒng)稱Estimator,訓(xùn)練后產(chǎn)出Model,Model也是一種Transformer,因?yàn)樗材苤苯訉?duì)DF進(jìn)行變換;
    \begin{aligned} &DF \rightarrow Estimator.fit() \rightarrow Model(Transformer) \\ &DF \rightarrow Model.transform() \rightarrow DF \end{aligned}
    子類主要需要實(shí)現(xiàn)_fit方法,并返回一個(gè)Model對(duì)象。
class PytorchLocalGPUClassifier(Estimator):
    def _fit(self, dataset) -> PytorchClassifyModel:
        # ...
        return PytorchClassifyModel(model, input_cols, batch_size)
  1. Pipeline:因?yàn)門ransformer對(duì)DF變換后仍產(chǎn)出DF,于是串聯(lián)多個(gè)Transformer可以對(duì)DF進(jìn)行流式處理。Pipeline就是專門處理流式DF的類。它可以串聯(lián)多個(gè)Estimator和Transformer,經(jīng)過訓(xùn)練后,產(chǎn)出PipelineModel,就是一連串Model和Transformer。因此,Pipeline繼承自Estimator,PipelineModel繼承自Model。
    \begin{aligned} &p = Pipline(stages=[Estimators|Transformers]) \\ &DF \rightarrow p.fit() \rightarrow PiplineModel(Model) \\ &DF \rightarrow PiplineModel.transform() \rightarrow DF \end{aligned}
    繼承了Transformer和Estimator后,自然可以用在Pipeline和PipelineModel中。

二、模型參數(shù)

  1. Param:模型Model或者Transformer和Estimator都有一些參數(shù)可供調(diào)整,如輸入列名、輸出列名、PCA的主成分?jǐn)?shù)量、LR的迭代次數(shù)等。SparkML中的Param只是變量的一種名字,包含了變量名稱、說明和轉(zhuǎn)換方式,不包含具體變量的內(nèi)容。
  2. Params:是變量的容器;一堆參數(shù)才是所有模型的共性,因此,Transformer和Estimator都繼承自Params。
Params:
    _paramMap={}
    _defaultParamMap={}

    _set(**kwargs)
    _setDefault(**kwargs)
    getOrDefault(param)

Foo(Params):
    inputCols = Param(Params._dummy(), "inputCols", "input cols", typeConverter=TypeConverters.toListString)
    batchSize = Param(Params._dummy(), "batchSize", "batch size", typeConverter=TypeConverters.toInt)
    inputColsType = Param(Params._dummy(), "inputColsType", "types of input cols")
    def __init__(self, batch_size: int = 100):
        self._setDefault(inputCols=['data'], inputColsType=input_cols_type)
        self._set(batchSize=batch_size)
    def setInputCols(self, value):
        self._set(inputCols=value)

三、模型的存取

from pyspark.ml.util import MLReadable, MLWritable, DefaultParamsWriter, DefaultParamsReader
class PytorchClassifyModel(Model, MLWritable, MLReadable):

    def write(self):
        '''MLWritable的方法,返回一個(gè)有save方法的類,被稱為Writer'''
        return self

    def save(self, path):
        '''實(shí)際存儲(chǔ)代碼'''
        DefaultParamsWriter(self).save(path)
        sc = SparkSession.builder.getOrCreate().sparkContext
        buffer = io.BytesIO()
        torch.save(self.model, buffer)
        sc.parallelize([buffer.getvalue()], 1).saveAsPickleFile(f'{path}/model.pk')

    @classmethod
    def read(cls):
        '''MLReadable的方法,返回一個(gè)有l(wèi)oad方法的類,被稱為Reader'''
        return cls

    @classmethod
    def load(cls, path):
        '''實(shí)際讀取代碼'''
        m: PytorchClassifyModel = DefaultParamsReader(cls).load(path)
        sc = SparkSession.builder.getOrCreate().sparkContext
        model_pk_str = sc.pickleFile(f'{path}/model.pk', 1).collect()[0]
        buffer = io.BytesIO(model_pk_str)
        m.model = torch.load(buffer)
        return m
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容