生產(chǎn)環(huán)境中為了保障業(yè)務(wù)的穩(wěn)定性,集群都需要高可用部署,k8s 中 apiserver 是無狀態(tài)的,可以橫向擴容保證其高可用,kube-controller-manager 和 kube-scheduler 兩個組件通過 leader 選舉保障高可用,即正常情況下 kube-scheduler 或 kube-manager-controller 組件的多個副本只有一個是處于業(yè)務(wù)邏輯運行狀態(tài),其它副本則不斷的嘗試去獲取鎖,去競爭 leader,直到自己成為leader。如果正在運行的 leader 因某種原因?qū)е庐斍斑M程退出,或者鎖丟失,則由其它副本去競爭新的 leader,獲取 leader 繼而執(zhí)行業(yè)務(wù)邏輯。
kubernetes 版本: v1.12
組件高可用的使用
k8s 中已經(jīng)為 kube-controller-manager、kube-scheduler 組件實現(xiàn)了高可用,只需在每個組件的配置文件中添加 --leader-elect=true 參數(shù)即可啟用。在每個組件的日志中可以看到 HA 相關(guān)參數(shù)的默認值:
I0306 19:17:14.109511 161798 flags.go:33] FLAG: --leader-elect="true"
I0306 19:17:14.109513 161798 flags.go:33] FLAG: --leader-elect-lease-duration="15s"
I0306 19:17:14.109516 161798 flags.go:33] FLAG: --leader-elect-renew-deadline="10s"
I0306 19:17:14.109518 161798 flags.go:33] FLAG: --leader-elect-resource-lock="endpoints"
I0306 19:17:14.109520 161798 flags.go:33] FLAG: --leader-elect-retry-period="2s"
kubernetes 中查看組件 leader 的方法:
$ kubectl get endpoints kube-controller-manager --namespace=kube-system -o yaml &&
kubectl get endpoints kube-scheduler --namespace=kube-system -o yaml
當前組件 leader 的 hostname 會寫在 annotation 的 control-plane.alpha.kubernetes.io/leader 字段里。
Leader Election 的實現(xiàn)
Leader Election 的過程本質(zhì)上是一個競爭分布式鎖的過程。在 Kubernetes 中,這個分布式鎖是以創(chuàng)建 Endpoint 資源的形式進行,誰先創(chuàng)建了該資源,誰就先獲得鎖,之后會對該資源不斷更新以保持鎖的擁有權(quán)。
下面開始講述 kube-controller-manager 中 leader 的競爭過程,cm 在加載及配置完參數(shù)后就開始執(zhí)行 run 方法了。代碼在 k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go 中:
// Run runs the KubeControllerManagerOptions. This should never exit.
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
// kube-controller-manager 的核心
run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
var clientBuilder controller.ControllerClientBuilder
if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
// It'c possible another controller process is creating the tokens for us.
// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
glog.Warningf("--use-service-account-credentials was specified without providing a --service-account-private-key-file")
}
clientBuilder = controller.SAControllerClientBuilder{
ClientConfig: restclient.AnonymousClientConfig(c.Kubeconfig),
CoreClient: c.Client.CoreV1(),
AuthenticationClient: c.Client.AuthenticationV1(),
Namespace: "kube-system",
}
} else {
clientBuilder = rootClientBuilder
}
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
// 初始化及啟動所有的 controller
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
glog.Fatalf("error starting controllers: %v", err)
}
controllerContext.InformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
select {}
}
// 如果 LeaderElect 參數(shù)未配置,說明 controller-manager 是單點啟動的,
// 則直接調(diào)用 run 方法來啟動需要被啟動的控制器即可。
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
// 如果 LeaderElect 參數(shù)配置為 true,說明 controller-manager 是以 HA 方式啟動的,
// 則執(zhí)行下面的代碼進行 leader 選舉,選舉出的 leader 會回調(diào) run 方法。
id, err := os.Hostname()
if err != nil {
return err
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
// 初始化資源鎖
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
"kube-system",
"kube-controller-manager",
c.LeaderElectionClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
// 進入到選舉的流程
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}
- 1、初始化資源鎖,kubernetes 中默認的資源鎖使用
endpoints,也就是 c.ComponentConfig.Generic.LeaderElection.ResourceLock 的值為 "endpoints",在代碼中我并沒有找到對 ResourceLock 初始化的地方,只看到了對該參數(shù)的說明以及日志中配置的默認值:

?在初始化資源鎖的時候還傳入了 EventRecorder,其作用是當 leader 發(fā)生變化的時候會將對應(yīng)的 events 發(fā)送到 apiserver。
2、rl 資源鎖被用于 controller-manager 進行 leader 的選舉,RunOrDie 方法中就是 leader 的選舉過程了。
3、Callbacks 中定義了在切換狀態(tài)后需要執(zhí)行的操作,當成為 leader 后會執(zhí)行 OnStartedLeading 中的 run 方法,run 方法是 controller-manager 的核心,run 方法中會初始化并啟動所包含資源的 controller,以下是 kube-controller-manager 中所有的 controller:
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
}
controllers["nodelifecycle"] = startNodeLifecycleController
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
return controllers
}
OnStoppedLeading 是從 leader 狀態(tài)切換為 slave 要執(zhí)行的操作,此方法僅打印了一條日志。
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
if lec.WatchDog != nil {
lec.WatchDog.SetLeaderElection(le)
}
le.Run(ctx)
}
在 RunOrDie 中首先調(diào)用 NewLeaderElector 初始化了一個 LeaderElector 對象,然后執(zhí)行 LeaderElector 的 run 方法進行選舉。
func (le *LeaderElector) Run(ctx context.Context) {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}
Run 中首先會執(zhí)行 acquire 嘗試獲取鎖,獲取到鎖之后會回調(diào) OnStartedLeading 啟動所需要的 controller,然后會執(zhí)行 renew 方法定期更新鎖,保持 leader 的狀態(tài)。
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
glog.Infof("attempting to acquire leader lease %v...", desc)
wait.JitterUntil(func() {
// 嘗試創(chuàng)建或者續(xù)約資源鎖
succeeded = le.tryAcquireOrRenew()
// leader 可能發(fā)生了改變,在 maybeReportTransition 方法中會
// 執(zhí)行相應(yīng)的 OnNewLeader() 回調(diào)函數(shù),代碼中對 OnNewLeader() 并沒有初始化
le.maybeReportTransition()
if !succeeded {
glog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
在 acquire 中首先初始化了一個 ctx,通過 wait.JitterUntil 周期性的去調(diào)用 le.tryAcquireOrRenew 方法來獲取資源鎖,直到獲取為止。如果獲取不到鎖,則會以 RetryPeriod 為間隔不斷嘗試。如果獲取到鎖,就會關(guān)閉 ctx 通知 wait.JitterUntil 停止嘗試,tryAcquireOrRenew 是最核心的方法。
func (le *LeaderElector) tryAcquireOrRenew() bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1、獲取當前的資源鎖
oldLeaderElectionRecord, err := le.config.Lock.Get()
if err != nil {
if !errors.IsNotFound(err) {
glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
// 沒有獲取到資源鎖,開始創(chuàng)建資源鎖,若創(chuàng)建成功則成為 leader
if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
glog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
// 2、獲取資源鎖后檢查當前 id 是不是 leader
if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.observedTime = le.clock.Now()
}
// 如果資源鎖沒有過期且當前 id 不是 Leader,直接返回
if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3、如果當前 id 是 Leader,將對應(yīng)字段的時間改成當前時間,準備續(xù)租
// 如果是非 Leader 節(jié)點則搶奪資源鎖
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 更新資源
// 對于 Leader 來說,這是一個續(xù)租的過程
// 對于非 Leader 節(jié)點(僅在上一個資源鎖已經(jīng)過期),這是一個更新鎖所有權(quán)的過程
if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
glog.Errorf("Failed to update lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
return true
}
上面的這個函數(shù)的主要邏輯:
- 1、獲取 ElectionRecord 記錄,如果沒有則創(chuàng)建一條新的 ElectionRecord 記錄,創(chuàng)建成功則表示獲取到鎖并成為 leader 了。
- 2、當獲取到資源鎖后開始檢查其中的信息,比較當前 id 是不是 leader 以及資源鎖有沒有過期,如果資源鎖沒有過期且當前 id 不是 Leader,則直接返回。
- 3、如果當前 id 是 Leader,將對應(yīng)字段的時間改成當前時間,更新資源鎖進行續(xù)租。
- 4、如果當前 id 不是 Leader 但是資源鎖已經(jīng)過期了,則搶奪資源鎖,搶奪成功則成為 leader 否則返回。
最后是 renew 方法:
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
// 每間隔 RetryPeriod 就執(zhí)行 tryAcquireOrRenew()
// 如果 tryAcquireOrRenew() 返回 false 說明續(xù)租失敗
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
done := make(chan bool, 1)
go func() {
defer close(done)
done <- le.tryAcquireOrRenew()
}()
select {
case <-timeoutCtx.Done():
return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
case result := <-done:
return result, nil
}
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
glog.V(4).Infof("successfully renewed lease %v", desc)
return
}
// 續(xù)租失敗,說明已經(jīng)不是 Leader,然后程序 panic
le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
}
獲取到鎖之后定期進行更新,renew 只有在獲取鎖之后才會調(diào)用,它會通過持續(xù)更新資源鎖的數(shù)據(jù),來確保繼續(xù)持有已獲得的鎖,保持自己的 leader 狀態(tài)。
Leader Election 功能的使用
以下是一個 demo,使用 k8s 中 k8s.io/client-go/tools/leaderelection 進行一個演示:
package main
import (
"context"
"flag"
"fmt"
"os"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
)
var (
masterURL string
kubeconfig string
)
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.Set("logtostderr", "true")
}
func main() {
flag.Parse()
defer glog.Flush()
id, err := os.Hostname()
if err != nil {
panic(err)
}
// 加載 kubeconfig 配置
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}
// 創(chuàng)建 kubeclient
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
// 初始化 eventRecorder
eventBroadcaster := record.NewBroadcaster()
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "test-1"})
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
run := func(ctx context.Context) {
fmt.Println("run.........")
select {}
}
id = id + "_" + "1"
rl, err := resourcelock.New("endpoints",
"kube-system",
"test",
kubeClient.CoreV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: eventRecorder,
})
if err != nil {
glog.Fatalf("error creating lock: %v", err)
}
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Info("leaderelection lost")
},
},
Name: "test-1",
})
}
分別使用多個 hostname 同時運行后并測試 leader 切換,可以在 events 中看到 leader 切換的記錄:
# kubectl describe endpoints test -n kube-system
Name: test
Namespace: kube-system
Labels: <none>
Annotations: control-plane.alpha.kubernetes.io/leader={"holderIdentity":"localhost_2","leaseDurationSeconds":15,"acquireTime":"2019-03-10T08:47:42Z","renewTime":"2019-03-10T08:47:44Z","leaderTransitions":2}
Subsets:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal LeaderElection 50s test-1 localhost_1 became leader
Normal LeaderElection 5s test-2 localhost_2 became leader
總結(jié)
本文講述了 kube-controller-manager 使用 HA 的方式啟動后 leader 選舉過程的實現(xiàn)說明,k8s 中通過創(chuàng)建 endpoints 資源以及對該資源的持續(xù)更新來實現(xiàn)資源鎖輪轉(zhuǎn)的過程。但是相對于其他分布式鎖的實現(xiàn),普遍是直接基于現(xiàn)有的中間件實現(xiàn),比如 redis、zookeeper、etcd 等,其所有對鎖的操作都是原子性的,那 k8s 選舉過程中的原子操作是如何實現(xiàn)的?k8s 中的原子操作最終也是通過 etcd 實現(xiàn)的,其在做 update 更新鎖的操作時采用的是樂觀鎖,通過對比 resourceVersion 實現(xiàn)的,詳細的實現(xiàn)下節(jié)再講。

參考文檔:
API OVERVIEW
Simple leader election with Kubernetes and Docker