Job用來在k8s集群中批量執(zhí)行一次性任務(wù),并提供了失敗重試機(jī)制。其載體可以是容器中的一個(gè)腳本。
下面是創(chuàng)建一個(gè)Job的具體流程的案例
創(chuàng)建一個(gè)執(zhí)行任務(wù)的鏡像
業(yè)務(wù)腳本testFunc.py
import argparse
import json
def concat(firstName, n):
return json.dumps({'status': 200, 'msg': firstName*n})
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--firstName', type=str)
parser.add_argument('--n', type=int)
args = parser.parse_args()
result = concat(args.firstName, args.n)
print(result)
Dockerfile
FROM python:3.9
WORKDIR /app
COPY testFunc.py /app
CMD ["python", "testFunc.py", "--firstName", "Tommy", "--n", "2"]
使用docker build -t lyudmilalala/job-test .創(chuàng)建本地鏡像lyudmilalala/job-test
測(cè)試鏡像
$ docker run -it --rm lyudmilalala/job-test
{"status": 200, "msg": "TommyTommy"}
通過修改啟動(dòng)命令cmd可以獲得不同結(jié)果
$ docker run -it --rm lyudmilalala/job-test python testFunc.py --firstName "Test" --n 5
{"status": 200, "msg": "TestTestTestTestTest"}
使用yaml創(chuàng)建k8s job
啟動(dòng)k8s job用的yaml配置文件testFunc.yml
使用yaml啟動(dòng)job【確保k8s集群已經(jīng)開啟】
kubectl apply -f .\testFunc.yml
一開始的任務(wù)才開始執(zhí)行,pod剛啟動(dòng)
$ kubectl get job -A
NAMESPACE NAME COMPLETIONS DURATION AGE
default job-test 0/4 1s 1s
$ kubectl get pods -n default
NAME READY STATUS RESTARTS AGE
job-test-q9cdr 0/1 ContainerCreating 0 1s
job-test-w2lr7 0/1 ContainerCreating 0 1s
job-test-xbd8d 0/1 ContainerCreating 0 1s
過了一會(huì)兒后,pod執(zhí)行完畢,job也被標(biāo)記為完成
$ kubectl get pods -n default
NAME READY STATUS RESTARTS AGE
job-test-7vgqz 0/1 Completed 0 6s
job-test-q9cdr 0/1 Completed 0 9s
job-test-w2lr7 0/1 Completed 0 9s
job-test-xbd8d 0/1 Completed 0 9s
$ kubectl get job -A
NAMESPACE NAME COMPLETIONS DURATION AGE
default job-test 4/4 6s 18s
查看對(duì)應(yīng)的執(zhí)行了任務(wù)的pod的log,可以看到結(jié)果
$ kubectl logs job-test-867x8
{"status": 200, "msg": "HelenHelenHelenHelen"}
Job不會(huì)自動(dòng)刪除,因此完成后需要手動(dòng)刪除
Job使用的pod也不會(huì)自動(dòng)刪除,會(huì)在Job被手動(dòng)刪除時(shí)跟著一起刪除
kubectl delete job job-test
使用java程序創(chuàng)建k8s job
引用io.fabric8.kubernetes.client包
Java程序
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
public class Main {
public static void main(String[] args) {
// Set up config
Config config = new ConfigBuilder()
.withMasterUrl("https://kubernetes.docker.internal:6443/")
.withOauthToken("my-jwt-token-in-secret")
// .withUsername("username").withPassword("password") if you're using basic auth
// .withTrustCerts(false) if you want to disable certification checks
.build();
// Initialize client
try (KubernetesClient client = new DefaultKubernetesClient(config)) {
final String namespace = "default";
final Job job = new JobBuilder()
.withApiVersion("batch/v1")
.withNewMetadata()
.withName("java-job-test")
// .withLabels()
// .withAnnotations()
.endMetadata()
.withNewSpec()
.withNewTemplate()
.withNewSpec()
.addNewContainer()
.withName("java-job-test-container")
.withImagePullPolicy("IfNotPresent")
.withImage("lyudmilalala/job-test")
.withCommand("python", "testFunc.py", "--firstName", "Alice", "--n", "3")
.endContainer()
.withRestartPolicy("Never")
.endSpec()
.endTemplate()
.endSpec()
.build();
job.getSpec().setParallelism(3);
job.getSpec().setCompletions(4);
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
// Get All pods created by the job
PodList podList = client.pods().inNamespace(namespace).withLabel("job-name", job.getMetadata().getName()).list();
// Wait for pod to complete
client.pods().inNamespace(namespace).withName(podList.getItems().get(0).getMetadata().getName())
.waitUntilCondition(pod -> pod.getStatus().getPhase().equals("Succeeded"), 1, TimeUnit.MINUTES);
// Print Job's log
String joblog = client.batch().v1().jobs().inNamespace(namespace).withName("java-job-test").getLog();
System.out.println(joblog);
} catch (KubernetesClientException e) {
e.printStackTrace();
}
}
}