一直以為這篇文章2月份的時(shí)候就發(fā)出去了,今天才發(fā)現(xiàn) 不知道什么原因居然沒(méi)發(fā)出去 (QAQ)
上篇文章實(shí)現(xiàn)了運(yùn)行系統(tǒng)給出的pipelines實(shí)例。通過(guò)掛載自己的卷,以及使用自己的鏡像總算可以運(yùn)行官方給的ML實(shí)例了。但是我們的最終目的是運(yùn)行自己的ML程序。
接下來(lái)是實(shí)現(xiàn)自己的pipelines實(shí)例。
根據(jù)官方文檔的建議:
https://www.kubeflow.org/docs/pipelines/sdk/build-component/
實(shí)現(xiàn)自己的ML實(shí)例,總共有兩點(diǎn):
(1)Client code (在kubeflow上運(yùn)行你真正任務(wù)所需要的額外代碼)
(2)Runtime code (真正的ML代碼,其實(shí)就是鏡像)
接下來(lái)用我設(shè)置的一個(gè)自定義實(shí)例來(lái)說(shuō)明:

說(shuō)明:這里我自定義了一個(gè)任務(wù)。任務(wù)很簡(jiǎn)單:
(1)先驗(yàn)證一個(gè)文件(FileInput1)中的數(shù)是否大于0. 將大于0,和小于0的數(shù)保存為倆個(gè)文件。
(2)對(duì)倆個(gè)文件執(zhí)行不同的操作,保存大于0的文件,將里面所有的數(shù)全加起來(lái),然后輸出(fileoutput1)
(3)對(duì)小于0的那個(gè)文件,將里面的數(shù)平方。
eg:
如果FileInput1文件中的數(shù)是:0,-1,-2,-3 ,4,5,6
那么最后fileOutput1文件中的數(shù)為:15
fileOutput2文件中的數(shù)為:1,4,9
(不用糾結(jié)這個(gè)例子,主要是為了能了解如何在pipelines上寫自己的實(shí)例并運(yùn)行。)
步驟1 - 準(zhǔn)備Runtime code (其實(shí)就是鏡像)
可以看出來(lái)這里總共三個(gè)節(jié)點(diǎn):validateOp,MoreThanZeroOp和LessThanZeroOp
三個(gè)節(jié)點(diǎn)的代碼如下:
validate.py (validateOp)
import argparse
import os
def validate(output_dir,filename):
f = open(filename, 'rb+')
lessOutput_path = os.path.join(output_dir, 'lessThanZero.txt')
f1 = open(lessOutput_path, 'w')
moreOutput_path = os.path.join(output_dir, 'moreThanZero.txt')
f2 = open(moreOutput_path, 'w')
lines = f.readlines()
for line in lines:
num = int(line)
if num > 0:
f1.write(str(num) + "\n")
elif num < 0:
f2.write(str(num) + "\n")
f.close()
f1.close()
f2.close()
with open('/lessFilePath.txt', 'w+') as f:
f.write(lessOutput_path)
with open('/moreFilePath.txt', 'w+') as f:
f.write(moreOutput_path)
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument( '--inputFilename',type=str,required=True,help='local file to be input')
parser.add_argument('--output_dir',type=str,required=True,help='local file to be input')
args = parser.parse_args()
return args
def main():
args = parse_arguments()
validate(args.output_dir,args.inputFilename)
if __name__ == '__main__':
main()
MoreThanZero.py (MoreThanZeroOp)
import argparse
import os
def MoreThanZero(output_dir,filename):
f = open(filename, 'rb+')
output_path=os.path.join(output_dir, 'sum.txt')
f1 = open(output_path,'w')
lines = f.readlines()
sum = 0
for line in lines:
sum += int(line)
f1.write(str(sum) + "\n")
f.close()
f1.close()
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument( '--output_dir',type=str,required=True,help='local file to be input')
parser.add_argument('--data',type=str,required=True,help='local file to be input')
args = parser.parse_args()
return args
def main():
args = parse_arguments()
MoreThanZero(args.output_dir,args.data)
if __name__ == '__main__':
main()
LessThanZero.py (MoreThanZeroOp)
import argparse
import os
def LessThanZero(output_dir,filename):
f = open(filename, 'rb+')
output_path=os.path.join(output_dir, 'square.txt')
f1 = open(output_path,'w')
lines = f.readlines()
for line in lines:
sum = int(line) * int(line)
f1.write(str(sum) + "\n")
f.close()
f1.close()
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument( '--output_dir',type=str,required=True,help='local file to be input')
parser.add_argument('--data',type=str,required=True, help='local file to be input')
args = parser.parse_args()
return args
def main():
args = parse_arguments()
LessThanZero(args.output_dir,args.data)
if __name__ == '__main__':
main()
接下來(lái)要將這些py打包成自己的任務(wù)鏡像,拿Validate.py為例。
dockerfile
FROM python:3.6-slim
RUN mkdir -p /app
ENV APP_HOME /app
COPY src $APP_HOME
WORKDIR $APP_HOME
ENTRYPOINT ["python", "validate.py"]
這里我將上面的三個(gè)py分別打包成以下鏡像:
192.168.14.54:5000/user-validate:v8
192.168.14.54:5000/user-morethanzero:v8
192.168.14.54:5000/user-lessthanzero:v8
到這里第一步(RunTime code)就完成了
步驟2-準(zhǔn)備client code (實(shí)際就是在kubeflow pipelines上運(yùn)行自己的實(shí)例)
打開(kāi)jupyter(參考前面的文章)
輸入client.py文件內(nèi)容即可
(1)
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.notebook
import kfp.gcp as gcp
client = kfp.Client()
from kubernetes import client as k8s_client
EXPERIMENT_NAME = 'usr1'
exp = client.create_experiment(name=EXPERIMENT_NAME)
(2)
class validateOp(dsl.ContainerOp):
"""對(duì)文件中的數(shù)據(jù)進(jìn)行驗(yàn)證"""
def __init__(self, output_dir,inputFilename):
super(validateOp, self).__init__(
name='validate_number',
image='192.168.14.54:5000/user-validate:v8',
arguments = [
'--inputFilename', inputFilename,
'--output_dir', output_dir,
],
file_outputs={
'more': '/moreFilePath.txt',
'less': '/lessFilePath.txt',
})
class MoreThanZeroOp(dsl.ContainerOp):
"""handle the number more than zero"""
def __init__(self,output_dir,data):
super(MoreThanZeroOp, self).__init__(
name='MoreThanZero',
image='192.168.14.54:5000/user-morethanzero:v8',
arguments = [
'--output_dir',output_dir,
'--data', data,
])
class LessThanZeroOp(dsl.ContainerOp):
"""handle the number less than zero"""
def __init__(self,output_dir,data):
super(LessThanZeroOp, self).__init__(
name='LessThanZero',
image='192.168.14.54:5000/user-lessthanzero:v8',
arguments = [
'--output_dir',output_dir,
'--data', data,
])
@dsl.pipeline(
name='Testpipelines',
description='shows how to define dsl.Condition.'
)
def ValidateTest():
output_dir = '/nfs-pv/usr-pv'
inputFilename = 'a.txt'
validate = validateOp(output_dir,inputFilename).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client.V1NFSVolumeSource(path= '/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
moreThanZero = MoreThanZeroOp(output_dir,validate.outputs['more']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client. V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
lessThanZero = LessThanZeroOp(output_dir,validate.outputs['less']).add_volume(k8s_client.V1Volume(name='usr-pv',nfs=k8s_client. V1NFSVolumeSource(path='/nfs-pv/usr-pv',server='192.168.14.54'))).add_volume_mount(k8s_client.V1VolumeMount(mount_path='/nfs-pv/usr-pv',name='usr-pv'))
(3)# Submit a run.
compiler.Compiler().compile(ValidateTest, 'test.tar.gz')
run = client.run_pipeline(exp.id, 'usr', 'test.tar.gz')
這里要注意的是,我自己建立一個(gè)共享存儲(chǔ)卷(NFS)。
這樣我們將輸入文件(a.txt)放入這個(gè)存儲(chǔ)卷下面,容器可以直接去拿,并且最后產(chǎn)生的輸出文件會(huì)保留在 存儲(chǔ)卷里,不會(huì)因?yàn)槿蝿?wù)完成了,容器就消失了,找不到文件。
所有,上面的client.py 內(nèi)容要根據(jù)自己的情況修改。
kubeflow pipelines目前先琢磨到這里。
接下來(lái)會(huì)關(guān)注kubeflow實(shí)現(xiàn)gang scheduler這一方面。