概述
在Kubernetes 中,系統(tǒng)和應(yīng)用程序的健康檢查任務(wù)是由 kubelet 來(lái)完成的,本文主要討論kubelet中 probemanager 相關(guān)的實(shí)現(xiàn)原理。
如果你對(duì)k8s的各種probe如何使用還不了解,可以看下我之前寫(xiě)的這篇K8S 中的健康檢查機(jī)制,是從實(shí)踐的角度介紹的。
statusManager
在 kubelet 初始化的時(shí)候,會(huì)創(chuàng)建 statusManager 和 probeManager,這兩個(gè)都是和 pod 狀態(tài)相關(guān)的邏輯,在kubelet 原理解析一:pod管理文章中有提到,statusManager 負(fù)責(zé)維護(hù)狀態(tài)信息,并把Pod狀態(tài)及時(shí)更新到Api-Server,
但是它并不負(fù)責(zé)監(jiān)控 pod 狀態(tài)的變化,而是提供對(duì)應(yīng)的接口供其他組件調(diào)用,比如 probeManager。probeManager 會(huì)定時(shí)去監(jiān)控 pod 中容器的健康狀況,一旦發(fā)現(xiàn)狀態(tài)發(fā)生變化,就調(diào)用 statusManager 提供的方法更新 pod 的狀態(tài)。
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
statusManager代碼位于:pkg/kubelet/status/status_manager.go
type PodStatusProvider interface {
GetPodStatus(uid types.UID) (api.PodStatus, bool)
}
type Manager interface {
PodStatusProvider
Start()
SetPodStatus(pod *api.Pod, status api.PodStatus)
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
TerminatePod(pod *api.Pod)
RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}
SetPodStatus:如果 pod 的狀態(tài)發(fā)生了變化,會(huì)調(diào)用這個(gè)方法,把新?tīng)顟B(tài)更新到 apiserver,一般在 kubelet 維護(hù) pod 生命周期的時(shí)候會(huì)調(diào)用
SetContainerReadiness:如果健康檢查發(fā)現(xiàn) pod 中容器的健康狀態(tài)發(fā)生變化,會(huì)調(diào)用這個(gè)方法,修改 pod 的健康狀態(tài)
TerminatePod:kubelet 在刪除 pod 的時(shí)候,會(huì)調(diào)用這個(gè)方法,把 pod 中所有的容器設(shè)置為 terminated 狀態(tài)
RemoveOrphanedStatuses:刪除孤兒 pod,直接把對(duì)應(yīng)的狀態(tài)數(shù)據(jù)從緩存中刪除即可
Start() 方法是在 kubelet 運(yùn)行的時(shí)候調(diào)用的,它會(huì)啟動(dòng)一個(gè) goroutine 執(zhí)行更新操作:
const syncPeriod = 10 * time.Second
func (m *manager) Start() {
......
glog.Info("Starting to sync pod status with apiserver")
syncTicker := time.Tick(syncPeriod)
// syncPod and syncBatch share the same go routine to avoid sync races.
go wait.Forever(func() {
select {
case syncRequest := <-m.podStatusChannel:
m.syncPod(syncRequest.podUID, syncRequest.status)
case <-syncTicker:
m.syncBatch()
}
}, 0)
}
這個(gè) goroutine 就能不斷地從兩個(gè) channel 監(jiān)聽(tīng)數(shù)據(jù)進(jìn)行處理:syncTicker 是個(gè)定時(shí)器,也就是說(shuō)它會(huì)定時(shí)保證 apiserver 和自己緩存的最新 pod 狀態(tài)保持一致;podStatusChannel 是所有 pod 狀態(tài)更新發(fā)送到的地方,調(diào)用方不會(huì)直接操作這個(gè) channel,而是通過(guò)調(diào)用上面提到的修改狀態(tài)的各種方法,這些方法內(nèi)部會(huì)往這個(gè) channel 寫(xiě)數(shù)據(jù)。
m.syncPod 根據(jù)參數(shù)中的 pod 和它的狀態(tài)信息對(duì) apiserver 中的數(shù)據(jù)進(jìn)行更新,如果發(fā)現(xiàn) pod 已經(jīng)被刪除也會(huì)把它從內(nèi)部數(shù)據(jù)結(jié)構(gòu)中刪除。
probeManager
probeManager負(fù)責(zé) 檢測(cè) pod 中容器的健康狀態(tài),目前有三種 probe:
- liveness: 讓Kubernetes知道你的應(yīng)用程序是否健康,如果你的應(yīng)用程序不健康,Kubernetes將刪除Pod并啟動(dòng)一個(gè)新的替換它(與RestartPolicy有關(guān))。Liveness 探測(cè)可以告訴 Kubernetes 什么時(shí)候通過(guò)重啟容器實(shí)現(xiàn)自愈。
- readiness: readiness與liveness原理相同,不過(guò)Readiness探針是告訴 Kubernetes 什么時(shí)候可以將容器加入到 Service 負(fù)載均衡中,對(duì)外提供服務(wù)。
- startupProbe:1.16開(kāi)始支持的新特性,檢測(cè)慢啟動(dòng)容器的狀態(tài),具體參考startup-probes
并不是所有的 pod 中的容器都有健康檢查的探針,如果沒(méi)有,則不對(duì)容器進(jìn)行檢測(cè),默認(rèn)認(rèn)為容器是正常的。在每次創(chuàng)建新 pod 的時(shí)候,kubelet 都會(huì)調(diào)用 probeManager.AddPod(pod) 方法,它對(duì)應(yīng)的實(shí)現(xiàn)在 pkg/kubelet/prober/prober_manager.go 文件中:
func (m *manager) AddPod(pod *v1.Pod) {
m.workerLock.Lock()
defer m.workerLock.Unlock()
key := probeKey{podUID: pod.UID}
for _, c := range pod.Spec.Containers {
key.containerName = c.Name
if c.ReadinessProbe != nil {
key.probeType = readiness
if _, ok := m.workers[key]; ok {
klog.Errorf("Readiness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, readiness, pod, c)
m.workers[key] = w
go w.run()
}
if c.LivenessProbe != nil {
key.probeType = liveness
if _, ok := m.workers[key]; ok {
klog.Errorf("Liveness probe already exists! %v - %v",
format.Pod(pod), c.Name)
return
}
w := newWorker(m, liveness, pod, c)
m.workers[key] = w
go w.run()
}
}
}
在這個(gè)方法里,kubelet 會(huì)遍歷pod 中所有的 container,如果配置了 probe,就創(chuàng)建一個(gè) worker,并異步處理這次探測(cè)
// Creates and starts a new probe worker.
func newWorker(
m *manager,
probeType probeType,
pod *v1.Pod,
container v1.Container) *worker {
w := &worker{
stopCh: make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
pod: pod,
container: container,
probeType: probeType,
probeManager: m,
}
switch probeType {
case readiness:
w.spec = container.ReadinessProbe
w.resultsManager = m.readinessManager
w.initialValue = results.Failure
case liveness:
w.spec = container.LivenessProbe
w.resultsManager = m.livenessManager
w.initialValue = results.Success
}
w.proberResultsMetricLabels = prometheus.Labels{
"probe_type": w.probeType.String(),
"container_name": w.container.Name,
"pod_name": w.pod.Name,
"namespace": w.pod.Namespace,
"pod_uid": string(w.pod.UID),
}
return w
}
worker 開(kāi)始run之后,會(huì)調(diào)用doProbe方法
func (w *worker) doProbe() (keepGoing bool) {
defer func() { recover() }()
defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
// pod 沒(méi)有被創(chuàng)建,或者已經(jīng)被刪除了,直接跳過(guò)檢測(cè),但是會(huì)繼續(xù)檢測(cè)
status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
if !ok {
glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
return true
}
// pod 已經(jīng)退出(不管是成功還是失?。?,直接返回,并終止 worker
if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
glog.V(3).Infof("Pod %v %v, exiting probe worker",
format.Pod(w.pod), status.Phase)
return false
}
// 容器沒(méi)有創(chuàng)建,或者已經(jīng)刪除了,直接返回,并繼續(xù)檢測(cè),等待更多的信息
c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
if !ok || len(c.ContainerID) == 0 {
glog.V(3).Infof("Probe target container not found: %v - %v",
format.Pod(w.pod), w.container.Name)
return true
}
// pod 更新了容器,使用最新的容器信息
if w.containerID.String() != c.ContainerID {
if !w.containerID.IsEmpty() {
w.resultsManager.Remove(w.containerID)
}
w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
w.onHold = false
}
if w.onHold {
return true
}
if c.State.Running == nil {
glog.V(3).Infof("Non-running container probed: %v - %v",
format.Pod(w.pod), w.container.Name)
if !w.containerID.IsEmpty() {
w.resultsManager.Set(w.containerID, results.Failure, w.pod)
}
// 容器失敗退出,并且不會(huì)再重啟,終止 worker
return c.State.Terminated == nil ||
w.pod.Spec.RestartPolicy != api.RestartPolicyNever
}
// 容器啟動(dòng)時(shí)間太短,沒(méi)有超過(guò)配置的初始化等待時(shí)間 InitialDelaySeconds
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
return true
}
// 調(diào)用 prober 進(jìn)行檢測(cè)容器的狀態(tài)
result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
if err != nil {
return true
}
if w.lastResult == result {
w.resultRun++
} else {
w.lastResult = result
w.resultRun = 1
}
// 如果容器退出,并且沒(méi)有超過(guò)最大的失敗次數(shù),則繼續(xù)檢測(cè)
if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
(result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
return true
}
// 保存最新的檢測(cè)結(jié)果
w.resultsManager.Set(w.containerID, result, w.pod)
if w.probeType == liveness && result == results.Failure {
// 容器 liveness 檢測(cè)失敗,需要?jiǎng)h除容器并重新創(chuàng)建,在新容器成功創(chuàng)建出來(lái)之前,暫停檢測(cè)
w.onHold = true
}
return true
}
liveness檢測(cè)結(jié)果會(huì)存放在resultsManager,它把結(jié)果保存在緩存中,并發(fā)送到 m.updates 管道。而管道消費(fèi)者是 kubelet 中的主循環(huán)syncLoopIteration。
case update := <-kl.livenessManager.Updates():
if update.Result == proberesults.Failure {
// The liveness manager detected a failure; sync the pod.
pod, ok := kl.podManager.GetPodByUID(update.PodUID)
if !ok {
// If the pod no longer exists, ignore the update.
glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
break
}
glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
handler.HandlePodSyncs([]*api.Pod{pod})
}
liveness檢測(cè)如果不通過(guò),pod就會(huì)重啟,由 kubelet 的 sync 循環(huán)處理即可。但 readness檢測(cè)失敗不能重啟 pod,因此readness的邏輯是:
func (m *manager) updateReadiness() {
update := <-m.readinessManager.Updates()
ready := update.Result == results.Success
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}
proberManager 啟動(dòng)的時(shí)候,會(huì)運(yùn)行一個(gè) goroutine 定時(shí)讀取 readinessManager 管道中的數(shù)據(jù),并根據(jù)數(shù)據(jù)調(diào)用 statusManager 去更新 apiserver 中 pod 的狀態(tài)信息。
負(fù)責(zé) Service 邏輯的組件獲取到了這個(gè)狀態(tài),就能根據(jù)不同的值來(lái)決定是否需要更新 endpoints 的內(nèi)容,也就是 service 的請(qǐng)求是否發(fā)送到這個(gè) pod。
Probe 方法
上面是 probemanager 的主要邏輯,我們接下來(lái)看下真正執(zhí)行探測(cè)任務(wù)的 probe方法
// probe probes the container.
func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
switch probeType {
case readiness:
probeSpec = container.ReadinessProbe
case liveness:
probeSpec = container.LivenessProbe
default:
return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
}
...
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
...
probe主方法調(diào)用pb.runProbeWithRetries 方法,傳入containerid、類(lèi)型、重試次數(shù)等。
exec 方法
調(diào)用runtimeService的ExecSync方法進(jìn)入容器執(zhí)行命令,回收結(jié)果,如果退出碼為 0 ,就認(rèn)為探測(cè)成功。
command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
....
func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
return execInContainer{func() ([]byte, error) {
return pb.runner.RunInContainer(containerID, cmd, timeout)
}}
}
...
func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)
return append(stdout, stderr...), err
}
func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
data, err := e.CombinedOutput()
klog.V(4).Infof("Exec probe response: %q", string(data))
if err != nil {
exit, ok := err.(exec.ExitError)
if ok {
if exit.ExitStatus() == 0 {
return probe.Success, string(data), nil
}
return probe.Failure, string(data), nil
}
return probe.Unknown, "", err
}
return probe.Success, string(data), nil
}
HTTP 方法
標(biāo)準(zhǔn)的 http 探測(cè)模板,如果400 > code >= 200,則認(rèn)為成功。不支持 https
func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
if _, ok := headers["User-Agent"]; !ok {
if headers == nil {
headers = http.Header{}
}
// explicitly set User-Agent so it's not set to default Go value
v := version.Get()
headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
}
req.Header = headers
if headers.Get("Host") != "" {
req.Host = headers.Get("Host")
}
res, err := client.Do(req)
if err != nil {
// Convert errors into failures to catch timeouts.
return probe.Failure, err.Error(), nil
}
defer res.Body.Close()
b, err := ioutil.ReadAll(res.Body)
if err != nil {
return probe.Failure, "", err
}
body := string(b)
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
return probe.Success, body, nil
}
klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
}
TCP 方法
gRPC或FTP服務(wù)一般會(huì)使用 TCP 探測(cè),嘗試在指定端口上建立TCP連接。
如果socket連接能成功,則返回成功。
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
// Convert errors to failures to handle timeouts.
return probe.Failure, err.Error(), nil
}
err = conn.Close()
if err != nil {
klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
}
return probe.Success, "", nil
}