Client-go客戶端源碼解析--EventRecorder

EventRecorder

Kubernetes的事件是一種資源對(duì)像,用于展示集群內(nèi)發(fā)生的情況,Kubernetes 中的各個(gè)組件都會(huì)將運(yùn)行時(shí)的各種事件上報(bào)給Kubernetes API Server,并存儲(chǔ)到Etcd集群中,為了避免磁盤(pán)空間被填滿,對(duì)事件的保存強(qiáng)制執(zhí)行保留策略:在最后一次事件發(fā)生后,刪除1小時(shí)之前的事件。

示例代碼

    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)//注冊(cè)事件消費(fèi)者,將事件打印日志
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})//注冊(cè)事件消費(fèi)者,將事件記錄到Kubernetes API Server,保存到Etcd中
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "myController"})
......
//記錄事件(事件生產(chǎn)者)
recorder.Event(obj,corev1.EventTypeWarning,"test reason","test msg")

eventRecorder機(jī)制

eventRecorder機(jī)制
  1. EventRecorder: 事件生產(chǎn)者,各個(gè)組件(包括用戶自定義的組件)會(huì)通過(guò)EventRecorder記錄事件。

  2. EventBroadcaster: 事件消費(fèi)者,也稱為事件廣播器。消費(fèi)EventRecorder記錄的事件并將事件分發(fā)給注冊(cè)的所有的Watcher。分發(fā)過(guò)程有兩種機(jī)制----阻塞和非阻塞兩種分法機(jī)制。

  3. BroadcasterWatcher: Watcher管理器,用于定義事件的具體處理方式。

EventRecorder

//接口定義
type EventRecorder interface {
    //對(duì)剛發(fā)生的事件進(jìn)行記錄
    Event(object runtime.Object, eventtype, reason, message string)

    // Eventf is just like Event, but with Sprintf for the message field.
  //格式化輸出事件格式
    Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

    // PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
  //允許自定義事件發(fā)生的事件,以記錄過(guò)去發(fā)生的事件
    PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

    // AnnotatedEventf is just like eventf, but with annotations attached
  //同Eventf,附加了Annotations字段
    AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
......
......
......
//核心代碼
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
   return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
   scheme *runtime.Scheme
   source v1.EventSource
   *watch.Broadcaster
   clock clock.Clock
}

func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
   ref, err := ref.GetReference(recorder.scheme, object)
   if err != nil {
      klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
      return
   }

   if !util.ValidateEventType(eventtype) {
      klog.Errorf("Unsupported event type: '%v'", eventtype)
      return
   }

   event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
   event.Source = recorder.source

   go func() {
      // NOTE: events should be a non-blocking operation
     //新建goroutine執(zhí)行recorder.Action,以達(dá)到非阻塞效果(避免c.incoming channel阻塞,導(dǎo)致后續(xù)事件無(wú)法正常發(fā)送)
      defer utilruntime.HandleCrash()
      recorder.Action(watch.Added, event)
   }()
}

func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
   recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}


......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
    m.incoming <- Event{action, obj} //incoming 默認(rèn)緩沖25條消息,后續(xù)的會(huì)阻塞
}

EventBroadcaster

// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
   return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}

///vendor/k8s.io/apimachinery/pkg/watch/mux.go
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
    m := &Broadcaster{
        watchers:            map[int64]*broadcasterWatcher{},
        incoming:            make(chan Event, incomingQueueLength),//25
        watchQueueLength:    queueLength,//1000
        fullChannelBehavior: fullChannelBehavior,//channel滿則丟棄消息
    }
    m.distributing.Add(1)
    go m.loop() //新建goroutine,等待消息到來(lái)
    return m
}
......
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
    // Deliberately not catching crashes here. Yes, bring down the process if there's a
    // bug in watch.Broadcaster.
    for event := range m.incoming {
        if event.Type == internalRunFunctionMarker { //添加watcher的時(shí)候會(huì)生成該類型事件,此事件不分發(fā)給watcher,只是內(nèi)部處理
            event.Object.(functionFakeRuntimeObject)()//執(zhí)行定義好的函數(shù),生成并注冊(cè)watcher
            continue
        }
        m.distribute(event)//分發(fā)消息到所有的watcher
    }
    m.closeAll()
    m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
    m.lock.Lock()
    defer m.lock.Unlock()
    if m.fullChannelBehavior == DropIfChannelFull { //若當(dāng)前watcher的result channel緩沖已滿,則丟棄后續(xù)消息
        for _, w := range m.watchers {
            select {
            case w.result <- event: //發(fā)送消息到watcher的result channel
            case <-w.stopped:
            default: // Don't block if the event can't be queued. 不阻塞,直接返回,丟棄后續(xù)的消息
            }
        }
    } else {
        for _, w := range m.watchers { //若當(dāng)前watcher的result channel緩沖已滿,則阻塞,直到watcher有從result內(nèi)取出消息
            select {
            case w.result <- event:
            case <-w.stopped:
            }
        }
    }
}

BroacasterWatcher

func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
   // The default math/rand package functions aren't thread safe, so create a
   // new Rand object for each StartRecording call.
   randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
   eventCorrelator := NewEventCorrelator(clock.RealClock{})
   return eventBroadcaster.StartEventWatcher(
      func(event *v1.Event) {
         recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
      })
}
......
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
    return eventBroadcaster.StartEventWatcher(
        func(e *v1.Event) {
            logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
        })
}

......
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
    watcher := eventBroadcaster.Watch() //生成watcher并注冊(cè)
    //新建goroutine啟動(dòng)watcher,等待事件到來(lái)并處理
    go func() {
        defer utilruntime.HandleCrash()
        for watchEvent := range watcher.ResultChan() { //從watcher的result channel中讀取事件
            event, ok := watchEvent.Object.(*v1.Event)
            if !ok {
                // This is all local, so there's no reason this should
                // ever happen.
                continue
            }
            eventHandler(event)//調(diào)用定義好的事件處理函數(shù)
        }
    }()
    return watcher
}

......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
//生成并注冊(cè)watcher
func (m *Broadcaster) Watch() Interface {
    var w *broadcasterWatcher
    m.blockQueue(func() {
        m.lock.Lock()
        defer m.lock.Unlock()
        id := m.nextWatcher
        m.nextWatcher++
    //生成watcher
        w = &broadcasterWatcher{
            result:  make(chan Event, m.watchQueueLength),
            stopped: make(chan struct{}),
            id:      id,
            m:       m,
        }
    //注冊(cè)watcher到map中,后續(xù)分發(fā)事件的時(shí)候要用
        m.watchers[id] = w
    })
    return w
}

//生成并注冊(cè)watcher的時(shí)候,阻塞incoming channel.
//這樣做的目的是,確保watcher在某個(gè)事件之后被添加,看不到該事件。但是能夠看到watcher被添加之后的所有事件。
func (b *Broadcaster) blockQueue(f func()) {
    var wg sync.WaitGroup
    wg.Add(1)
    b.incoming <- Event{
        Type: internalRunFunctionMarker,
        Object: functionFakeRuntimeObject(func() {
            defer wg.Done()
            f()
        }),
    }
    wg.Wait()
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容