kubeflow pipelines--實(shí)現(xiàn)自己的實(shí)例

一直以為這篇文章2月份的時(shí)候就發(fā)出去了,今天才發(fā)現(xiàn) 不知道什么原因居然沒(méi)發(fā)出去 (QAQ)

上篇文章實(shí)現(xiàn)了運(yùn)行系統(tǒng)給出的pipelines實(shí)例。通過(guò)掛載自己的卷,以及使用自己的鏡像總算可以運(yùn)行官方給的ML實(shí)例了。但是我們的最終目的是運(yùn)行自己的ML程序。

接下來(lái)是實(shí)現(xiàn)自己的pipelines實(shí)例。

根據(jù)官方文檔的建議:
https://www.kubeflow.org/docs/pipelines/sdk/build-component/
實(shí)現(xiàn)自己的ML實(shí)例,總共有兩點(diǎn):
(1)Client code (在kubeflow上運(yùn)行你真正任務(wù)所需要的額外代碼)
(2)Runtime code (真正的ML代碼,其實(shí)就是鏡像)

接下來(lái)用我設(shè)置的一個(gè)自定義實(shí)例來(lái)說(shuō)明:


image.png

說(shuō)明:這里我自定義了一個(gè)任務(wù)。任務(wù)很簡(jiǎn)單:
(1)先驗(yàn)證一個(gè)文件(FileInput1)中的數(shù)是否大于0. 將大于0,和小于0的數(shù)保存為倆個(gè)文件。
(2)對(duì)倆個(gè)文件執(zhí)行不同的操作,保存大于0的文件,將里面所有的數(shù)全加起來(lái),然后輸出(fileoutput1)
(3)對(duì)小于0的那個(gè)文件,將里面的數(shù)平方。
eg:
如果FileInput1文件中的數(shù)是:0,-1,-2,-3 ,4,5,6
那么最后fileOutput1文件中的數(shù)為:15
fileOutput2文件中的數(shù)為:1,4,9
(不用糾結(jié)這個(gè)例子,主要是為了能了解如何在pipelines上寫自己的實(shí)例并運(yùn)行。)

步驟1 - 準(zhǔn)備Runtime code (其實(shí)就是鏡像)

可以看出來(lái)這里總共三個(gè)節(jié)點(diǎn):validateOp,MoreThanZeroOp和LessThanZeroOp
三個(gè)節(jié)點(diǎn)的代碼如下:
validate.py (validateOp)

import argparse
import os

def validate(output_dir,filename):
    f = open(filename, 'rb+')
    lessOutput_path = os.path.join(output_dir, 'lessThanZero.txt')
    f1 = open(lessOutput_path, 'w')
    
    moreOutput_path = os.path.join(output_dir, 'moreThanZero.txt')
    f2 = open(moreOutput_path, 'w')
    lines = f.readlines()
    for line in lines:
        num = int(line)
        if num > 0:
            f1.write(str(num) + "\n")
        elif num < 0:
            f2.write(str(num) + "\n")
    f.close()
    f1.close()
    f2.close()
    with open('/lessFilePath.txt', 'w+') as f:
        f.write(lessOutput_path)
    with open('/moreFilePath.txt', 'w+') as f:
        f.write(moreOutput_path)

def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser()
    parser.add_argument( '--inputFilename',type=str,required=True,help='local file to be input')
    parser.add_argument('--output_dir',type=str,required=True,help='local file to be input')
    args = parser.parse_args()
    return args

def main():
    args = parse_arguments()
    validate(args.output_dir,args.inputFilename)

if __name__ == '__main__':
    main()

MoreThanZero.py (MoreThanZeroOp)

import argparse
import os

def MoreThanZero(output_dir,filename):
    f = open(filename, 'rb+')
    output_path=os.path.join(output_dir, 'sum.txt')
    f1 = open(output_path,'w')
    lines = f.readlines()
    sum = 0
    for line in lines:
       sum += int(line)
    f1.write(str(sum) + "\n")
    f.close()
    f1.close()

def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser()
    parser.add_argument( '--output_dir',type=str,required=True,help='local file to be input')
    parser.add_argument('--data',type=str,required=True,help='local file to be input')
    args = parser.parse_args()
    return args

def main():
    args = parse_arguments()
    MoreThanZero(args.output_dir,args.data)

if __name__ == '__main__':
    main()

LessThanZero.py (MoreThanZeroOp)

import argparse
import os
def LessThanZero(output_dir,filename):
    f = open(filename, 'rb+')
    output_path=os.path.join(output_dir, 'square.txt')
    f1 = open(output_path,'w')
    lines = f.readlines()
    for line in lines:
       sum = int(line) * int(line)
       f1.write(str(sum) + "\n")
    f.close()
    f1.close()

def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser()
    parser.add_argument( '--output_dir',type=str,required=True,help='local file to be input')
    parser.add_argument('--data',type=str,required=True, help='local file to be input')
    args = parser.parse_args()
    return args

def main():
    args = parse_arguments()
    LessThanZero(args.output_dir,args.data)

if __name__ == '__main__':
    main()

接下來(lái)要將這些py打包成自己的任務(wù)鏡像,拿Validate.py為例。
dockerfile

FROM python:3.6-slim
RUN mkdir -p /app
ENV APP_HOME /app
COPY src $APP_HOME
WORKDIR $APP_HOME
ENTRYPOINT ["python", "validate.py"]

這里我將上面的三個(gè)py分別打包成以下鏡像:
192.168.14.54:5000/user-validate:v8
192.168.14.54:5000/user-morethanzero:v8
192.168.14.54:5000/user-lessthanzero:v8

到這里第一步(RunTime code)就完成了

步驟2-準(zhǔn)備client code (實(shí)際就是在kubeflow pipelines上運(yùn)行自己的實(shí)例)

打開(kāi)jupyter(參考前面的文章)
輸入client.py文件內(nèi)容即可

(1)
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcp
client = kfp.Client()
from kubernetes import client as k8s_client
EXPERIMENT_NAME = 'usr1'
exp = client.create_experiment(name=EXPERIMENT_NAME)


(2)
class validateOp(dsl.ContainerOp):
  """對(duì)文件中的數(shù)據(jù)進(jìn)行驗(yàn)證"""
  def __init__(self, output_dir,inputFilename):
    super(validateOp, self).__init__(
      name='validate_number',
      image='192.168.14.54:5000/user-validate:v8',
      arguments = [
            '--inputFilename', inputFilename,
            '--output_dir', output_dir,
        ],
      file_outputs={
            'more': '/moreFilePath.txt',
            'less': '/lessFilePath.txt',
            })

class MoreThanZeroOp(dsl.ContainerOp):
  """handle the number more than zero"""
  def __init__(self,output_dir,data):
    super(MoreThanZeroOp, self).__init__(
      name='MoreThanZero',
      image='192.168.14.54:5000/user-morethanzero:v8',
      arguments = [
            '--output_dir',output_dir,
            '--data', data,
        ])
    
class LessThanZeroOp(dsl.ContainerOp):
  """handle the number less than zero"""
  def __init__(self,output_dir,data):
    super(LessThanZeroOp, self).__init__(
      name='LessThanZero',
      image='192.168.14.54:5000/user-lessthanzero:v8',
      arguments = [
            '--output_dir',output_dir,
            '--data', data,
        ])
 
@dsl.pipeline(
    name='Testpipelines',
    description='shows how to define dsl.Condition.'
)
def ValidateTest():
    output_dir = '/nfs-pv/usr-pv'
    inputFilename = 'a.txt'
    validate = validateOp(output_dir,inputFilename).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client.V1NFSVolumeSource(path=     '/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
    moreThanZero = MoreThanZeroOp(output_dir,validate.outputs['more']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client.         V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
    
    lessThanZero = LessThanZeroOp(output_dir,validate.outputs['less']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client.         V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))


(3)# Submit a run.  
compiler.Compiler().compile(ValidateTest,  'test.tar.gz')
run = client.run_pipeline(exp.id, 'usr', 'test.tar.gz')

這里要注意的是,我自己建立一個(gè)共享存儲(chǔ)卷(NFS)。
這樣我們將輸入文件(a.txt)放入這個(gè)存儲(chǔ)卷下面,容器可以直接去拿,并且最后產(chǎn)生的輸出文件會(huì)保留在 存儲(chǔ)卷里,不會(huì)因?yàn)槿蝿?wù)完成了,容器就消失了,找不到文件。
所有,上面的client.py 內(nèi)容要根據(jù)自己的情況修改。

kubeflow pipelines目前先琢磨到這里。
接下來(lái)會(huì)關(guān)注kubeflow實(shí)現(xiàn)gang scheduler這一方面。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容