[k8s源碼分析][client-go] k8s選舉leaderelection (分布式資源鎖實(shí)現(xiàn))

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動(dòng)成果!

本文將分析用于kube-schedulerkube-controller-manager高可用的leaderelection. 相當(dāng)于實(shí)現(xiàn)一個(gè)分布式資源鎖.

源碼位置: https://github.com/nicktming/client-go
分支: tming-v1.13 (基于v1.13版本)

2. 例子

architecture.png

例子中會啟動(dòng)三個(gè)進(jìn)程來競爭leadership.

2.1 創(chuàng)建一個(gè)namespace

[root@master kubectl]# ./kubectl get ns
NAME          STATUS   AGE
default       Active   21h
kube-public   Active   21h
kube-system   Active   21h
[root@master kubectl]# ./kubectl create ns nicktming
namespace/nicktming created
[root@master kubectl]# 
[root@master kubectl]# ./kubectl get ns
NAME          STATUS   AGE
default       Active   21h
kube-public   Active   21h
kube-system   Active   21h
nicktming     Active   3s
[root@master kubectl]# ./kubectl get endpoints -n nicktming
No resources found.
[root@master kubectl]# 

2.2 代碼

package main

import (
    "context"
    "flag"
    "k8s.io/client-go/rest"
    "log"
    "os"
    "os/signal"
    "strings"
    "syscall"
    "time"

    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/leaderelection"
    "k8s.io/client-go/tools/leaderelection/resourcelock"
    "k8s.io/klog"
    "fmt"
)

func main() {
    klog.InitFlags(nil)

    var endpointLockName string
    var endpointLockNamespace string
    var id string

    flag.StringVar(&id, "id", "", "the holder identity name")
    flag.StringVar(&endpointLockName, "lease-lock-name", "example", "the lease lock resource name")
    flag.StringVar(&endpointLockNamespace, "lease-lock-namespace", "nicktming", "the lease lock resource namespace")
    flag.Parse()

    if id == "" {
        klog.Fatal("unable to get id (missing id flag).")
    }
    
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)

    lock := &resourcelock.EndpointsLock{
        EndpointsMeta: metav1.ObjectMeta{
            Name:      endpointLockName,
            Namespace: endpointLockNamespace,
        },
        Client: client.CoreV1(),
        LockConfig: resourcelock.ResourceLockConfig{
            Identity: id,
        },
    }

    // use a Go context so we can tell the leaderelection code when we
    // want to step down
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // use a client that will stop allowing new requests once the context ends
//  config.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

    // listen for interrupts or the Linux SIGTERM signal and cancel
    // our context, which the leader election code will observe and
    // step down
    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-ch
        log.Printf("Received termination, signaling shutdown")
        cancel()
    }()

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: lock,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
//      ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,
        RenewDeadline:   15 * time.Second,
        RetryPeriod:     5 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                // we're notified when we start - this is where you would
                // usually put your code
                klog.Infof("%s: leading", id)
            },
            OnStoppedLeading: func() {
                // we can do cleanup here, or after the RunOrDie method
                // returns
                klog.Infof("%s: lost", id)
            },
            OnNewLeader: func(identity string) {
                // we're notified when new leader elected
                if identity == id {
                    // I just got the lock
                    return
                }
                klog.Infof("new leader elected: %v", identity)
            },
        },
    })

    // because the context is closed, the client should report errors
    _, err := client.CoreV1().Endpoints(endpointLockNamespace).Get(endpointLockName, metav1.GetOptions{})
    if err == nil || !strings.Contains(err.Error(), "the leader is shutting down") {
        log.Fatalf("%s: expected to get an error when trying to make a client call: %v", id, err)
    }

    // we no longer hold the lease, so perform any cleanup and then
    // exit
    log.Printf("%s: done", id)
}

然后啟動(dòng)三個(gè)進(jìn)程來競爭一個(gè)資源.

go run test.go --id=1
go run test.go --id=2
go run test.go --id=3

運(yùn)行查看nicktming這個(gè)namespace.

[root@master kubectl]# ./kubectl get endpoints -n nicktming
NAME      ENDPOINTS   AGE
example   <none>      5s
[root@master kubectl]# ./kubectl get endpoints -o yaml -n nicktming
apiVersion: v1
items:
- apiVersion: v1
  kind: Endpoints
  metadata:
    annotations:
      control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"1","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:15:07Z","renewTime":"2019-10-16T13:23:22Z","leaderTransitions":0}'
    creationTimestamp: "2019-10-16T13:15:07Z"
    name: example
    namespace: nicktming
    resourceVersion: "45526"
    selfLink: /api/v1/namespaces/nicktming/endpoints/example
    uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""
[root@master kubectl]# 

可以看到該endpoint中的annotations中保存了所有進(jìn)程要競爭的資源以及誰是當(dāng)前的leader.

可以看到id=1的進(jìn)程獲得了該資源, 并且已經(jīng)啟動(dòng)了自己的程序, 而id=2id=3的進(jìn)程一直在等待. 通過查看它們的日志就可以看到.

// 進(jìn)程1
[root@worker leaderelection]# go run test.go --id=1
1: leading

// 進(jìn)程2
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1

// 進(jìn)程3
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1

此時(shí)殺死進(jìn)程1, 然后分別查看進(jìn)程2和進(jìn)程3的日志

// 進(jìn)程1
[root@worker leaderelection]# go run test.go --id=1
1: leading
^CReceived termination, signaling shutdown
1: lost
1: expected to get an error when trying to make a client call: <nil>
[root@worker leaderelection]# 

// 進(jìn)程2
[root@worker leaderelection]# go run test.go --id=2
new leader elected: 1
new leader elected: 3

// 進(jìn)程3
[root@worker leaderelection]# go run test.go --id=3
new leader elected: 1
3: leading

然后查看資源

[root@master kubectl]# ./kubectl get endpoints -n nicktming
NAME      ENDPOINTS   AGE
example   <none>      14m
[root@master kubectl]# ./kubectl get endpoints -o yaml -n nicktming
apiVersion: v1
items:
- apiVersion: v1
  kind: Endpoints
  metadata:
    annotations:
      control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"3","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:27:16Z","renewTime":"2019-10-16T13:29:56Z","leaderTransitions":1}'
    creationTimestamp: "2019-10-16T13:15:07Z"
    name: example
    namespace: nicktming
    resourceVersion: "46112"
    selfLink: /api/v1/namespaces/nicktming/endpoints/example
    uid: f988a420-f016-11e9-a4ad-525400d54f7e
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""

鎖已經(jīng)被進(jìn)程3獲得, 此時(shí)如果進(jìn)程1再啟動(dòng)的話, 也只能一直等待.

3. 源碼分析

代碼路徑在client-go/tools/leaderelection下.

leaderElection.png

LeaderElectionRecord: 保存leader的信息.
Interface: 客戶端.
LeaderCallbacks: 回調(diào)函數(shù).
LeaderElectionConfig: 定義了一些競爭資源的參數(shù).
LeaderElector: 定義了一些競爭資源的方法.

3.1 Interface接口

LeaderElectionRecord: 是資源占有者的描述信息, 如上面例子中看到control-plane.alpha.kubernetes.io/leader: '{"holderIdentity":"3","leaseDurationSeconds":60,"acquireTime":"2019-10-16T13:27:16Z","renewTime":"2019-10-16T13:29:56Z","leaderTransitions":1}'control-plane.alpha.kubernetes.io/leader為資源, 而資源的占有者信息就是其對應(yīng)的value, 也就是一個(gè)LeaderElectionRecord.

Interface: 中定義了一系列方法, 包括增加修改獲取一個(gè)LeaderElectionRecord, 說白了就是一個(gè)客戶端, 而且每個(gè)客戶端實(shí)例都要有自己分布式唯一的id.

// tools/leaderelection/resourcelock/interface.go

type LeaderElectionRecord struct {
    // 持有者的id 也就是leader的id
    HolderIdentity       string      `json:"holderIdentity"`
    // 一個(gè)租約多長時(shí)間
    LeaseDurationSeconds int         `json:"leaseDurationSeconds"`
    // 獲得leader的時(shí)間
    AcquireTime          metav1.Time `json:"acquireTime"`
    // 續(xù)約的時(shí)間
    RenewTime            metav1.Time `json:"renewTime"`
    // leader變更的次數(shù)
    LeaderTransitions    int         `json:"leaderTransitions"`
}

type Interface interface {
    // 返回當(dāng)前資源LeaderElectionRecord 
    Get() (*LeaderElectionRecord, error)
    // 創(chuàng)建一個(gè)資源LeaderElectionRecord
    Create(ler LeaderElectionRecord) error
    // 更新資源
    Update(ler LeaderElectionRecord) error
    // 記錄事件
    RecordEvent(string)
    // 返回當(dāng)前該應(yīng)用的id
    Identity() string
    // 描述信息
    Describe() string
}

它有三個(gè)實(shí)現(xiàn)類, 分別為EndpointLock, ConfigMapLockLeaseLock分別可以操作k8s中的endpoint, configmaplease. 也就是提供了這三種資源類型. 這里以EndpointLock為例子說明.

// tools/leaderelection/resourcelock/endpointslock.go
type EndpointsLock struct {
    // 必須包括namespace和name
    EndpointsMeta metav1.ObjectMeta
    // 訪問api-server的客戶端
    Client        corev1client.EndpointsGetter
    // 該EndpointsLock的分布式唯一身份id
    LockConfig    ResourceLockConfig
    // 當(dāng)前操作的endpoint
    e             *v1.Endpoints
}
// tools/leaderelection/resourcelock/interface.go
type ResourceLockConfig struct {
// 分布式唯一id
    Identity string
    EventRecorder EventRecorder
}

Create, Update, Get方法都是利用client去訪問k8sapi-server. 通過這里可以看得更明白, 就是操作EndpointsLock.e.Annotations中的control-plane.alpha.kubernetes.io/leader就是在上面的例子中看到的.

// tools/leaderelection/resourcelock/endpointslock.go

func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) {
    var record LeaderElectionRecord
    var err error
    el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{})
    if err != nil {
        return nil, err
    }
    if el.e.Annotations == nil {
        el.e.Annotations = make(map[string]string)
    }
    if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found {
        if err := json.Unmarshal([]byte(recordBytes), &record); err != nil {
            return nil, err
        }
    }
    return &record, nil
}

// Create attempts to create a LeaderElectionRecord annotation
func (el *EndpointsLock) Create(ler LeaderElectionRecord) error {
    recordBytes, err := json.Marshal(ler)
    if err != nil {
        return err
    }
    el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{
        ObjectMeta: metav1.ObjectMeta{
            Name:      el.EndpointsMeta.Name,
            Namespace: el.EndpointsMeta.Namespace,
            Annotations: map[string]string{
                LeaderElectionRecordAnnotationKey: string(recordBytes),
            },
        },
    })
    return err
}

另外interface還提供了生成各個(gè)子類的方法

func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     coreClient,
            LockConfig: rlc,
        }, nil
    ...
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

3.2 LeaderElector

LeaderElector是一個(gè)競爭資源的實(shí)體.

type LeaderElectionConfig struct {
    // 客戶端
    Lock rl.Interface
    LeaseDuration time.Duration
    RenewDeadline time.Duration
    RetryPeriod time.Duration
    // 需要用戶配置的回調(diào)函數(shù)
    Callbacks LeaderCallbacks
    WatchDog *HealthzAdaptor
    // 判斷在cancel的時(shí)候如果當(dāng)前是leader是否需要釋放
    ReleaseOnCancel bool
    Name string
}

LeaderElectionConfig擁有一個(gè)Interface對象, 以及用戶需要配置的回調(diào)函數(shù)LeaderCallbacks對象.
關(guān)于LeaseDuration, RenewDeadlineRetryPeriod會在方法中介紹.

type LeaderElector struct {
    // 用于保存當(dāng)前應(yīng)用的一些配置 包括該應(yīng)用的id等等
    config LeaderElectionConfig
    // 遠(yuǎn)程獲取的資源 (不一定自己是leader) 所有想競爭此資源的應(yīng)用獲取的是同一份
    observedRecord rl.LeaderElectionRecord
    // 獲取的時(shí)間
    observedTime   time.Time
    reportedLeader string
    clock clock.Clock
    metrics leaderMetricsAdapter
    name string
}

這里著重要關(guān)注以下幾個(gè)屬性:
config:LeaderElectionConfig對象配置了當(dāng)前應(yīng)用的客戶端, 以及此客戶端的唯一id等等.
observedRecord:LeaderElectionRecord就是保存著從api-server中獲得的leader的信息.
observedTime: 獲得的時(shí)間.

很明顯判斷當(dāng)前進(jìn)程是不是leader只需要判斷config中的idobservedRecord中的id是不是一致即可.

func (le *LeaderElector) GetLeader() string {
    return le.observedRecord.HolderIdentity
}

// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
    return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
}

3.3 Run

func (le *LeaderElector) Run(ctx context.Context) {
    defer func() {
        runtime.HandleCrash()
        le.config.Callbacks.OnStoppedLeading()
    }()
    // 如果獲取失敗 那就是ctx signalled done
    // 不然即使失敗, 該client也會一直去嘗試獲得leader位置
    if !le.acquire(ctx) {
        return // ctx signalled done
    }
    // 如果獲得leadership 以goroutine和回調(diào)的形式啟動(dòng)用戶自己的邏輯方法OnStartedLeading
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    go le.config.Callbacks.OnStartedLeading(ctx)
    // 一直去續(xù)約 這里也是一個(gè)循環(huán)操作
    // 如果失去了leadership 該方法才會返回
    // 該方法返回 整個(gè)Run方法就返回了
    le.renew(ctx)
}

1.client(也就是le這個(gè)實(shí)例)首先會調(diào)用acquire方法一直嘗試去競爭leadership. (如果競爭失敗, 繼續(xù)競爭, 不會進(jìn)入2. 競爭成功, 進(jìn)入2)
2. 異步啟動(dòng)用戶自己的邏輯程序(OnStartedLeading). 進(jìn)入3
3. 通過調(diào)用renew方法續(xù)約自己的leadership. 續(xù)約成功, 繼續(xù)續(xù)約. 續(xù)約失敗, 整個(gè)Run就結(jié)束了.

3.3.1 acquire

func (le *LeaderElector) maybeReportTransition() {
    // 如果沒有變化 則不需要更新
    if le.observedRecord.HolderIdentity == le.reportedLeader {
        return
    }
    // 更新reportedLeader為最新的leader的id
    le.reportedLeader = le.observedRecord.HolderIdentity
    if le.config.Callbacks.OnNewLeader != nil {
        // 調(diào)用當(dāng)前應(yīng)用的回調(diào)函數(shù)OnNewLeader報(bào)告新的leader產(chǎn)生
        go le.config.Callbacks.OnNewLeader(le.reportedLeader)
    }
}

// 一旦獲得leadership 立馬返回true
// 返回false的唯一情況是ctx signals done
func (le *LeaderElector) acquire(ctx context.Context) bool {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    succeeded := false
    desc := le.config.Lock.Describe()
    klog.Infof("attempting to acquire leader lease  %v...", desc)
    wait.JitterUntil(func() {
        // 嘗試獲得或者更新資源
        succeeded = le.tryAcquireOrRenew()
        // 有可能會產(chǎn)生新的leader
        // 所以調(diào)用maybeReportTransition檢查是否需要廣播新產(chǎn)生的leader
        le.maybeReportTransition()
        if !succeeded {
            // 如果獲得leadership失敗 則返回后繼續(xù)競爭
            klog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        // 自己成為leader
        // 可以調(diào)用cancel方法退出JitterUntil進(jìn)而從acquire中返回
        le.config.Lock.RecordEvent("became leader")
        le.metrics.leaderOn(le.config.Name)
        klog.Infof("successfully acquired lease %v", desc)
        cancel()
    }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
    return succeeded
}

acquire的作用如下:
1. 一旦獲得leadership, 立馬返回true. 否則會隔RetryPeriod時(shí)間嘗試一次
2. 一旦有ctx signals done, 會返回false

這里的邏輯比較簡單, 主要的邏輯是在tryAcquireOrRenew方法中.

3.3.2 renew

// RenewDeadline=10s RetryPeriod=2s
func (le *LeaderElector) renew(ctx context.Context) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()
    // 每隔RetryPeriod會調(diào)用 除非cancel()方法被調(diào)用才會退出
    wait.Until(func() {
        timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
        defer timeoutCancel()
        // 每隔2ms調(diào)用該方法直到該方法返回true為止
        // 如果超時(shí)了也會退出該方法 并且err中有錯(cuò)誤信息
        err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
            done := make(chan bool, 1)
            go func() {
                defer close(done)
                done <- le.tryAcquireOrRenew()
            }()

            select {
            case <-timeoutCtx.Done():
                return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err())
            case result := <-done:
                return result, nil
            }
        }, timeoutCtx.Done())

        // 有可能會產(chǎn)生新的leader 如果有會廣播新產(chǎn)生的leader
        le.maybeReportTransition()
        desc := le.config.Lock.Describe()
        if err == nil {
            // 如果err == nil, 表明上面PollImmediateUntil中返回true了 續(xù)約成功 依然處于leader位置
            // 返回后 繼續(xù)運(yùn)行wait.Until的邏輯
            klog.V(5).Infof("successfully renewed lease %v", desc)
            return
        }
        // err != nil 表明超時(shí)了 試的總時(shí)間超過了RenewDeadline 失去了leader位置 續(xù)約失敗
        // 調(diào)用cancel方法退出wait.Until
        le.config.Lock.RecordEvent("stopped leading")
        le.metrics.leaderOff(le.config.Name)
        klog.Infof("failed to renew lease %v: %v", desc, err)
        cancel()
    }, le.config.RetryPeriod, ctx.Done())

    // if we hold the lease, give it up
    if le.config.ReleaseOnCancel {
        le.release()
    }
}

可以看到該clientbase條件是它自己是當(dāng)前的leader, 然后來續(xù)約操作.

這里來說一下RenewDeadlineRetryPeriod的作用.
每隔RetryPeriod時(shí)間會通過tryAcquireOrRenew續(xù)約, 如果續(xù)約失敗, 還會進(jìn)行再次嘗試. 一直到嘗試的總時(shí)間超過RenewDeadline后該client就會失去leadership.

3.3.3 tryAcquireOrRenew

// 競爭或者更新leadership
// 成功返回true 失敗返回false
func (le *LeaderElector) tryAcquireOrRenew() bool {
    now := metav1.Now()
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. obtain or create the ElectionRecord
    // 從client端中獲得ElectionRecord
    oldLeaderElectionRecord, err := le.config.Lock.Get()
    if err != nil {
        if !errors.IsNotFound(err) {
            // 失敗直接退出
            klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        // 因?yàn)闆]有獲取到, 因此創(chuàng)建一個(gè)新的進(jìn)去
        if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
            klog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        // 然后設(shè)置observedRecord為剛剛加入進(jìn)去的leaderElectionRecord
        le.observedRecord = leaderElectionRecord
        le.observedTime = le.clock.Now()
        return true
    }

    // 2. Record obtained, check the Identity & Time
    // 從遠(yuǎn)端獲取到record(資源)成功存到oldLeaderElectionRecord
    // 如果oldLeaderElectionRecord與observedRecord不相同 更新observedRecord
    // 因?yàn)閛bservedRecord代表是從遠(yuǎn)端存在Record

    // 需要注意的是每個(gè)client都在競爭leadership, 而leader一直在續(xù)約, leader會更新它的RenewTime字段
    // 所以一旦leader續(xù)約成功 每個(gè)non-leader候選者都需要更新其observedTime和observedRecord
    if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
        le.observedRecord = *oldLeaderElectionRecord
        le.observedTime = le.clock.Now()
    }
    // 如果leader已經(jīng)被占有并且不是當(dāng)前自己這個(gè)應(yīng)用, 而且時(shí)間還沒有到期
    // 那就直接返回false, 因?yàn)橐呀?jīng)無法搶占 時(shí)間沒有過期
    if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
        le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        !le.IsLeader() {
        klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3. We're going to try to update. The leaderElectionRecord is set to it's default
    // here. Let's correct it before updating.
    if le.IsLeader() {
        // 如果當(dāng)前服務(wù)就是以前的占有者
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else {
        // 如果當(dāng)前服務(wù)不是以前的占有者 LeaderTransitions加1
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    // update the lock itself
    // 當(dāng)前client占有該資源 成為leader
    if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
        klog.Errorf("Failed to update lock: %v", err)
        return false
    }
    le.observedRecord = leaderElectionRecord
    le.observedTime = le.clock.Now()
    return true
}

這里需要注意的是當(dāng)前client不是leader的時(shí)候, 如何去判斷一個(gè)leader是否已經(jīng)expired了?

le.observedTime.Add(le.config.LeaseDuration).After(now.Time)
le.observedTime: 代表的是獲得leader(截止當(dāng)前時(shí)間為止的最后一次renew)對象的時(shí)間.
le.config.LeaseDuration: 自己(當(dāng)前client)獲得leadership需要的等待時(shí)間.
le.observedTime.Add(le.config.LeaseDuration): 就是自己(當(dāng)前client)被允許獲得leadership的時(shí)間.

如果le.observedTime.Add(le.config.LeaseDuration).before(now.Time)true的話, 就表明leader過期了. 白話文的意思就是從leader上次續(xù)約完, 已經(jīng)超過le.config.LeaseDuration的時(shí)間沒有續(xù)約了, 所以被認(rèn)為該leader過期了. 把before換成after就是表明沒有過期.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容