kubernetes的webhook開發(fā)(一篇搭好開發(fā)架構)

webhook

對kubernetes的webhook開發(fā)實例

介紹

Webhook就是一種HTTP回調,用于在某種情況下執(zhí)行某些動作,Webhook不是K8S獨有的,很多場景下都可以進行Webhook,比如在提交完代碼后調用一個Webhook自動構建docker鏡像

K8S中提供了自定義資源類型自定義控制器來擴展功能,還提供了動態(tài)準入控制,其實就是通過Webhook來實現(xiàn)準入控制,分為兩種:驗證性質的準入 Webhook (Validating Admission Webhook)修改性質的準入 Webhook (Mutating Admission Webhook)

Admission Webhook使用較多的場景如下

  • 在資源持久化到ETCD之前進行修改(Mutating Webhook),比如增加init Container或者sidecar Container
  • 在資源持久化到ETCD之前進行校驗(Validating Webhook),不滿足條件的資源直接拒絕并給出相應信息

現(xiàn)在非?;馃岬牡?Service Mesh 應用istio就是通過 mutating webhooks 來自動將Envoy這個 sidecar 容器注入到 Pod 中去的:https://istio.io/docs/setup/kubernetes/sidecar-injection/。

更多詳情介紹可參考:https://kubernetes.io/zh/docs/reference/access-authn-authz/extensible-admission-controllers/

Admission Webhook

上面提到K8S的動態(tài)準入控制是通過Webhook來實現(xiàn)的,請看下圖

k8s-api-request-lifecycle.png

Webhook可以理解成Java Web開發(fā)中的Filter,每個請求都會經過Filter處理,從圖中可以看到,先執(zhí)行的是Mutating Webhook,它可以對資源進行修改,然后執(zhí)行的是Validating Webhook,它可以拒絕或者接受請求,但是它不能修改請求

K8S中有已經實現(xiàn)了的Admission Webhook列表,詳情參考每個準入控制器的作用是什么?

webhook使用

檢查是否開啟了動態(tài)準入控制

一般k8s會默認開啟,可以跳過此步驟。(如果部署后,查看kube-apiserver日志沒有沒有準入日志,按照下面方式開啟)

查看APIServer是否開啟了MutatingAdmissionWebhookValidatingAdmissionWebhook

# 獲取apiserver pod名字
apiserver_pod_name=`kubectl get --no-headers=true po -n kube-system | grep kube-apiserver | awk '{ print $1 }'`
# 查看api server的啟動參數(shù)plugin
kubectl get po $apiserver_pod_name -n kube-system -o yaml | grep plugin

如果輸出如下,說明已經開啟

- --enable-admission-plugins=NodeRestriction,MutatingAdmissionWebhook,ValidatingAdmissionWebhook

否則,需要修改啟動參數(shù),請不然直接修改Pod的參數(shù),這樣修改不會成功,請修改配置文件/etc/kubernetes/manifests/kube-apiserver.yaml,加上相應的插件參數(shù)后保存,APIServer的Pod會監(jiān)控該文件的變化,然后重新啟動。

webhook動態(tài)準入控制說明

可查看官網
https://kubernetes.io/zh-cn/docs/reference/access-authn-authz/extensible-admission-controllers/#side-effects

webhooks:
  - name: webhook-example.github.com
    clientConfig:
      service:
        name: webhook-example
        namespace: default
        path: "/mutate"                     #與代碼邏輯相同
      caBundle: ${CA_BUNDLE}
    admissionReviewVersions: [ "v1beta1" ]
    sideEffects: None
    rules:                                  # 資源攔截規(guī)則
      - operations: [ "CREATE" ]
        apiGroups: ["apps", ""]
        apiVersions: ["v1"]
        resources: ["deployments"]
    namespaceSelector:                      # 生效的namespace
      matchLabels:
        webhook-example: enabled

webhook簡單實例

實例說明

實例將給原服務增加label、Annotation和sidecar

下載代碼:https://github.com/yuenandi/webhookExample

項目結構:

.
├── Dockerfile
├── build                          # 鏡像構建
├── debug                          # debug啟動腳本(認證與資源創(chuàng)建)
├── deploy                         # 部署啟動腳本(認證與資源創(chuàng)建)
├── k8s                            # 服務啟動前k8s資源創(chuàng)建(主要是認證)
│   ├── run.go
│   └── utils.go
├── main.go                        # 啟動入口
├── options
│   └── WhsvrParameters.go         # 服務啟動參數(shù)
├── pki
└── webhook
    └── webhook.go                 # 主要代碼邏輯

其中main.gowebhook.go是整個webhook的核心,前者用于啟動Server,監(jiān)聽端口,后者用于實現(xiàn)核心業(yè)務邏輯

main.go

服務啟動,監(jiān)聽端口

func main() {
    parameters := options.Parameters
    
    pair, err := tls.LoadX509KeyPair(parameters.CertFile, parameters.KeyFile)
    if err != nil {
        log.Errorf("Failed to load key pair: %v", err)
    }

    whsvr := &webhook.WebhookServer{
        Server: &http.Server{
            Addr:      fmt.Sprintf(":%v", parameters.Port),
            TLSConfig: &tls.Config{Certificates: []tls.Certificate{pair}},
        },
    }

    // define http server and server handler
    mux := http.NewServeMux()
    mux.HandleFunc(options.MutatePath, whsvr.Serve)
    whsvr.Server.Handler = mux

    // start webhook server in new routine
    go func() {
        if err := whsvr.Server.ListenAndServeTLS("", ""); err != nil {
            log.Errorf("Failed to listen and serve webhook server: %v", err)
        }
    }()

    log.Infof("Server started, Listening to the port %d", parameters.Port)

    // listening OS shutdown singal
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
    <-signalChan

    log.Infof("Got OS shutdown signal, shutting down webhook server gracefully...")
    //whsvr.Server.Shutdown(context.Background())

}

webhook.go

其核心在serve方法,根據傳進來的path mutate,然后執(zhí)行相應的操作,這個path是自己在MutatingWebhookConfiguration中定義的

// Serve method for webhook server
func (whsvr *WebhookServer) Serve(w http.ResponseWriter, r *http.Request) {

    //讀取從ApiServer過來的數(shù)據放到body
    var body []byte
    if r.Body != nil {
        if data, err := ioutil.ReadAll(r.Body); err == nil {
            body = data
        }
    }
    ....

    var admissionResponse *v1beta1.AdmissionResponse
    ar := v1beta1.AdmissionReview{}

    if _, _, err := deserializer.Decode(body, nil, &ar); err != nil {
        ...
    } else {
        if r.URL.Path == options.MutatePath {
            // mutate 業(yè)務邏輯
            admissionResponse = whsvr.mutate(&ar)

            admissionReview := v1beta1.AdmissionReview{}
            if admissionResponse != nil {
                admissionReview.Response = admissionResponse
                if ar.Request != nil {
                    admissionReview.Response.UID = ar.Request.UID
                }
            }

            resp, err := json.Marshal(admissionReview)
            if err != nil {
                ...
            }
            
            if _, err := w.Write(resp); err != nil {
                ...
            }

        }
    }
}

mutate方法,發(fā)往apiserver的patch

func (whsvr *WebhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
    req := ar.Request
    var (
        objectMeta                      *metav1.ObjectMeta
        resourceNamespace, resourceName string
        deployment                      appsv1.Deployment
    )

    switch req.Kind.Kind {
    // 支持Deployment
    case "Deployment":
        if err := json.Unmarshal(req.Object.Raw, &deployment); err != nil {
            log.Errorln(fmt.Sprintf("\nCould not unmarshal raw object: %v", err))
            return &v1beta1.AdmissionResponse{
                Result: &metav1.Status{
                    Message: err.Error(),
                },
            }
        }
        resourceName, resourceNamespace, objectMeta, deployment = deployment.Name, deployment.Namespace, &deployment.ObjectMeta, deployment
    //其他不支持的類型
    default:
        msg := fmt.Sprintf("\nNot support for this Kind of resource  %v", req.Kind.Kind)
        log.Warnf(msg)
        return &v1beta1.AdmissionResponse{
            Result: &metav1.Status{
                Message: msg,
            },
        }
    }

    //跳過不進行處理的情況
    if !mutationRequired(ignoredNamespaces, objectMeta) {
        log.Infoln(fmt.Sprintf("Skipping validation for %s/%s due to policy check", resourceNamespace, resourceName))
        return &v1beta1.AdmissionResponse{
            Allowed: true,
        }
    }
    //開始處理,主要處理方法
    patchBytes, err := createPatch(deployment, addAnnotations, addLabels)
    ...

    log.Debugf(fmt.Sprintf("AdmissionResponse: patch=%v\n", string(patchBytes)))
    return &v1beta1.AdmissionResponse{
        Allowed: true,
        Patch:   patchBytes,
        PatchType: func() *v1beta1.PatchType {
            pt := v1beta1.PatchTypeJSONPatch
            return &pt
        }(),
    }
}

主要業(yè)務處理createPatch

func createPatch(deployment appsv1.Deployment, addAnnotations map[string]string, addLabels map[string]string) ([]byte, error) {
    ...
    labelsPatch := updateLabels(labels, addLabels)
    annotationsPatch := updateAnnotation(annotations, addAnnotations)
    containersPatch := updateContainers(addContainer, deployment)
    ...
}

// 手動拼接patch,簡單改動可用
func updateLabels(target map[string]string, added map[string]string) (patch []patchOperation) {
    values := make(map[string]string)
    for key, value := range added {
        if target == nil || target[key] == "" {
            values[key] = value
        }
    }
    patch = append(patch, patchOperation{
        Op:    "add",
        Path:  "/metadata/labels",
        Value: values,
    })
    return patch
}

// 復雜的改動,可定義出新的deployment對象與原deployment做jsondiff.Compare操作
var addContainer = []corev1.Container{
    {
        Name:    "side-car",
        Image:   "busybox",
        Command: []string{"/bin/sleep", "infinity"},
    },
}

func updateContainers(addContainer []corev1.Container, deployment appsv1.Deployment) (patch []patchOperation) {
    currentDeployment := deployment.DeepCopy()
    containers := currentDeployment.Spec.Template.Spec.Containers
    containers = append(containers, addContainer...)
    currentDeployment.Spec.Template.Spec.Containers = containers
    diffPatch, err := jsondiff.Compare(deployment, currentDeployment)
    if err != nil {
        log.Error("")
    }
    for _, v := range diffPatch {
        addPatch := patchOperation{
            Op:    v.Type,
            Value: v.Value,
            Path:  string(v.Path),
        }
        patch = append(patch, addPatch)
    }
    return patch
}

webhook部署

腳本部署

修改install.sh腳本,如下部分,kube_config集群本地執(zhí)行需修改為空kube_config=''

#集群命名空間
ns='webhook-example'
kubectl_ns='--namespace webhook-example'
#集群遠程證書
kube_config='--kubeconfig config'

執(zhí)行腳本

腳本詳情

#!/bin/bash
# 修改serviceaccount的namespace字段
sed -e "s/\${namespace}/${ns}/g" rbac.yaml > current_rbac.yaml
# 部署rbac
kubectl apply -f current_rbac.yaml  ${kubectl_ns} ${kube_config}
# 認證: 或者kubernetes集群證書
./webhook-create-signed-cert.sh  ${kubectl_ns} ${kube_config}
# 部署service
kubectl apply -f service.yaml
# 部署webhook應用
kubectl apply -f webhook-example.yaml
# 部署MutatingWebhookConfiguration
cat ./mutatingwebhook.yaml | ./webhook-patch-ca-bundle.sh > current_mutatingwebhook.yaml ${kube_config} && kubectl apply -f current_mutatingwebhook.yaml ${kubectl_ns}

# 為namespace添加label
kubectl label ns ${ns} webhook-example=enabled ${kube_config}

部署webhook

kubectl apply -f deploy/webhookExample.yaml

不使用邊車

為應用添加如下label

labels:
  webhook-example.github.com/app: "false"

webhook調試

遠程調試,需要做本地與k8s集群的認證

主要腳本,webhook-create-signed-cert.sh

cat <<EOF >> ${tmpdir}/csr.conf
[req]
req_extensions = v3_req
distinguished_name = req_distinguished_name
[req_distinguished_name]
[ v3_req ]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = @alt_names
[alt_names]
#修改為Debug本機Ip
IP.1  = ${currentIp}
EOF

mutatingwebhook.yaml

webhooks:
  - name: webhook-example-debug.github.com
    clientConfig:
      # 修改為本地ip
      url: https://10.8.1.90:6444/mutate/

腳本部署

修改debug/create-debug.sh如下參數(shù)

#本機地址
currentIp=10.8.1.90
#本地服務端口
currentPort=6444
#集群命名空間
ns='webhook-example'
kubectl_ns='--namespace webhook-example'
#遠程集群證書
kube_config='--kubeconfig config'

運行webhook

IDEA修改啟動參數(shù),注意地址修改,如下圖:

--tlsCertFile=pki/cert.pem
--tlsKeyFile=pki/key.pem
--log-v=5
--automatic-authentication=false

驗證

  1. 給webhook-example namespace添加label

    kubectl label namespace webhook-example webhook-example-debug=enabled
    
  2. 部署sleep.yaml

    kubectl apply -f deploy/sleep.yaml
    

自動認證,資源創(chuàng)建部分

以上部署在腳本中進行認證和資源創(chuàng)建

也可將認證和一些資源創(chuàng)建,例如csr、MutatingWebhookConfiguration,在程序啟動前進行創(chuàng)建

可擴展,做認證失效監(jiān)控,進行證書自動更新

部署編排文件為deploy/all/webhookExample-all.yaml
主要代碼如下

自動認證參數(shù),DEBUG模式為了方便本地開發(fā)調試

type WhSvrParameters struct {
    Port               int    // webhook server port
    CertFile           string // path to the x509 certificate for https
    KeyFile            string // path to the x509 private key matching `CertFile`
    Logv               int32  // 日志級別,默認4
    AutoAuthentication bool   // 是否自動認證,默認true
    Service            string // 服務的service,默認webhook-example
    Namespace          string // 命名空間
    KubeConfig         string // 集群證書
    IsDebug            bool   // 是否為DEBUG模式,默認false
    Url                string // 本地機器URL,DEBUG模式用到
}
--tlsCertFile=pki/cert.pem
--tlsKeyFile=pki/key.pem
--log-v=5
--kubeconfig=pki/config   
--namespace=webhook-example
--debug=true
--url=10.8.1.90
--automatic-authentication=true

k8s客戶端認證

func NewKubernetsClient(options *options.WhSvrParameters) (k *K8s, err error) {
    k = &K8s{}
    config, err := clientcmd.BuildConfigFromFlags("", options.KubeConfig)
    if err != nil {
        log.Error(err)
        return nil, err
    }
    k.config = config
    k.kubernetesClient, err = kubernetes.NewForConfig(config)
    if err != nil {
        log.Error(err)
        return nil, err
    }
    return k, nil
}

webhook啟動前準備代碼

func (k *K8s) Run() (err error) {
    
    // 獲取證書key,和CSR
    csr, key, err := genKubernetesCSR()
    
    // 創(chuàng)建CSR
    csr, err = k.kubernetesClient.CertificatesV1beta1().CertificateSigningRequests().Create(context.Background(), csr, metav1.CreateOptions{})
    // CSR審批
    cert, err := k.Approve(csr)
    
    // 寫證書
    keyBuf := x509.MarshalPKCS1PrivateKey(key)
    err = writePki(k.parameters.KeyFile, "RSA PRIVATE KEY", keyBuf)
    
    err = writeCert(k.parameters.CertFile, cert)
    
    // 刪除CSR
    err = k.kubernetesClient.CertificatesV1beta1().CertificateSigningRequests().Delete(context.Background(), csr.Name, metav1.DeleteOptions{})
    
    var (
        path    = options.MutatePath
        url     string
        service *admissionV1.ServiceReference
    )
    // 判斷是否為DEBUG模式
    // 創(chuàng)建mutat
    logrus.Debugf("DEBUG模式:%t", k.parameters.IsDebug)
    if k.parameters.IsDebug {
        url = fmt.Sprintf("https://%s:%d%s", k.parameters.Url, k.parameters.Port, path)
        err = k.CreateMutationWebhook(mutationWebhookConfigurationName, mutatingWebhookName, nil, &url)
    } else {
        service = &admissionV1.ServiceReference{
            Name:      k.parameters.Service,
            Namespace: k.parameters.Namespace,
            Path:      &path,
        }
        logMU, _ := yaml.Marshal(service)
        logrus.Debugf(string(logMU))
        err = k.CreateMutationWebhook(mutationWebhookConfigurationName, mutatingWebhookName, service, nil)
    }

    return err

}

參考資料:
https://zhuanlan.zhihu.com/p/404764407

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容