EventRecorder
Kubernetes的事件是一種資源對(duì)像,用于展示集群內(nèi)發(fā)生的情況,Kubernetes 中的各個(gè)組件都會(huì)將運(yùn)行時(shí)的各種事件上報(bào)給Kubernetes API Server,并存儲(chǔ)到Etcd集群中,為了避免磁盤(pán)空間被填滿,對(duì)事件的保存強(qiáng)制執(zhí)行保留策略:在最后一次事件發(fā)生后,刪除1小時(shí)之前的事件。
示例代碼
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)//注冊(cè)事件消費(fèi)者,將事件打印日志
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})//注冊(cè)事件消費(fèi)者,將事件記錄到Kubernetes API Server,保存到Etcd中
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "myController"})
......
//記錄事件(事件生產(chǎn)者)
recorder.Event(obj,corev1.EventTypeWarning,"test reason","test msg")
eventRecorder機(jī)制

eventRecorder機(jī)制
EventRecorder: 事件生產(chǎn)者,各個(gè)組件(包括用戶自定義的組件)會(huì)通過(guò)EventRecorder記錄事件。
EventBroadcaster: 事件消費(fèi)者,也稱為事件廣播器。消費(fèi)EventRecorder記錄的事件并將事件分發(fā)給注冊(cè)的所有的Watcher。分發(fā)過(guò)程有兩種機(jī)制----阻塞和非阻塞兩種分法機(jī)制。
BroadcasterWatcher: Watcher管理器,用于定義事件的具體處理方式。
EventRecorder
//接口定義
type EventRecorder interface {
//對(duì)剛發(fā)生的事件進(jìn)行記錄
Event(object runtime.Object, eventtype, reason, message string)
// Eventf is just like Event, but with Sprintf for the message field.
//格式化輸出事件格式
Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})
// PastEventf is just like Eventf, but with an option to specify the event's 'timestamp' field.
//允許自定義事件發(fā)生的事件,以記錄過(guò)去發(fā)生的事件
PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})
// AnnotatedEventf is just like eventf, but with annotations attached
//同Eventf,附加了Annotations字段
AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}
......
......
......
//核心代碼
// NewRecorder returns an EventRecorder that records events with the given event source.
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}
type recorderImpl struct {
scheme *runtime.Scheme
source v1.EventSource
*watch.Broadcaster
clock clock.Clock
}
func (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := ref.GetReference(recorder.scheme, object)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
return
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
event.Source = recorder.source
go func() {
// NOTE: events should be a non-blocking operation
//新建goroutine執(zhí)行recorder.Action,以達(dá)到非阻塞效果(避免c.incoming channel阻塞,導(dǎo)致后續(xù)事件無(wú)法正常發(fā)送)
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) Event(object runtime.Object, eventtype, reason, message string) {
recorder.generateEvent(object, nil, metav1.Now(), eventtype, reason, message)
}
......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
m.incoming <- Event{action, obj} //incoming 默認(rèn)緩沖25條消息,后續(xù)的會(huì)阻塞
}
EventBroadcaster
// Creates a new event broadcaster.
func NewBroadcaster() EventBroadcaster {
return &eventBroadcasterImpl{watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), defaultSleepDuration}
}
///vendor/k8s.io/apimachinery/pkg/watch/mux.go
func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, incomingQueueLength),//25
watchQueueLength: queueLength,//1000
fullChannelBehavior: fullChannelBehavior,//channel滿則丟棄消息
}
m.distributing.Add(1)
go m.loop() //新建goroutine,等待消息到來(lái)
return m
}
......
// loop receives from m.incoming and distributes to all watchers.
func (m *Broadcaster) loop() {
// Deliberately not catching crashes here. Yes, bring down the process if there's a
// bug in watch.Broadcaster.
for event := range m.incoming {
if event.Type == internalRunFunctionMarker { //添加watcher的時(shí)候會(huì)生成該類型事件,此事件不分發(fā)給watcher,只是內(nèi)部處理
event.Object.(functionFakeRuntimeObject)()//執(zhí)行定義好的函數(shù),生成并注冊(cè)watcher
continue
}
m.distribute(event)//分發(fā)消息到所有的watcher
}
m.closeAll()
m.distributing.Done()
}
// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
if m.fullChannelBehavior == DropIfChannelFull { //若當(dāng)前watcher的result channel緩沖已滿,則丟棄后續(xù)消息
for _, w := range m.watchers {
select {
case w.result <- event: //發(fā)送消息到watcher的result channel
case <-w.stopped:
default: // Don't block if the event can't be queued. 不阻塞,直接返回,丟棄后續(xù)的消息
}
}
} else {
for _, w := range m.watchers { //若當(dāng)前watcher的result channel緩沖已滿,則阻塞,直到watcher有從result內(nèi)取出消息
select {
case w.result <- event:
case <-w.stopped:
}
}
}
}
BroacasterWatcher
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
eventCorrelator := NewEventCorrelator(clock.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *v1.Event) {
recordToSink(sink, event, eventCorrelator, randGen, eventBroadcaster.sleepDuration)
})
}
......
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
return eventBroadcaster.StartEventWatcher(
func(e *v1.Event) {
logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
})
}
......
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
watcher := eventBroadcaster.Watch() //生成watcher并注冊(cè)
//新建goroutine啟動(dòng)watcher,等待事件到來(lái)并處理
go func() {
defer utilruntime.HandleCrash()
for watchEvent := range watcher.ResultChan() { //從watcher的result channel中讀取事件
event, ok := watchEvent.Object.(*v1.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
eventHandler(event)//調(diào)用定義好的事件處理函數(shù)
}
}()
return watcher
}
......
//vendor/k8s.io/apimachinery/pkg/watch/mux.go
//生成并注冊(cè)watcher
func (m *Broadcaster) Watch() Interface {
var w *broadcasterWatcher
m.blockQueue(func() {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
//生成watcher
w = &broadcasterWatcher{
result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}),
id: id,
m: m,
}
//注冊(cè)watcher到map中,后續(xù)分發(fā)事件的時(shí)候要用
m.watchers[id] = w
})
return w
}
//生成并注冊(cè)watcher的時(shí)候,阻塞incoming channel.
//這樣做的目的是,確保watcher在某個(gè)事件之后被添加,看不到該事件。但是能夠看到watcher被添加之后的所有事件。
func (b *Broadcaster) blockQueue(f func()) {
var wg sync.WaitGroup
wg.Add(1)
b.incoming <- Event{
Type: internalRunFunctionMarker,
Object: functionFakeRuntimeObject(func() {
defer wg.Done()
f()
}),
}
wg.Wait()
}