kubeedge源碼分析系列之cloudcore

本系列的源碼分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上進(jìn)行的。

cloudcore部分的源碼分析是在kubeedge源碼分析系列之整體架構(gòu)基礎(chǔ)上展開(kāi)的,如果沒(méi)有閱讀過(guò)kubeedge源碼分析系列之整體架構(gòu),直接閱讀本文,會(huì)感覺(jué)比較突兀。

本文概述

本文對(duì)cloudcore進(jìn)行了展開(kāi),對(duì)cloudcore組件中功能模塊共用的消息框架和各功能模塊的具體功能進(jìn)行深入剖析,具體如下:

  1. cloudcore功能模塊之間通信的消息框架
  2. cloudhub剖析
  3. edgecontroller剖析
  4. devicecontroller剖析

cloudcore功能模塊之間通信的消息框架

cloudcore組件中各個(gè)功能模塊之間是通過(guò)Beehive來(lái)組織和管理的,Beehive是一個(gè)基于go-channels的消息框架,但本文不對(duì)Beehive做全面的分析,只會(huì)分析cloudcore組件中用到的Beehive的相關(guān)功能。

Beehive的消息框架是在cloudcore的功能模塊啟動(dòng)的時(shí)候一如的,具體如下:

kubeedge/beehive/pkg/core/core.go

import (
    ...
    "github.com/kubeedge/beehive/pkg/core/context"
)

// StartModules starts modules that are registered
func StartModules() {
    coreContext := context.GetContext(context.MsgCtxTypeChannel)

    modules := GetModules()
    for name, module := range modules {
        //Init the module
        coreContext.AddModule(name)
        //Assemble typeChannels for sendToGroup
        coreContext.AddModuleGroup(name, module.Group())
        go module.Start(coreContext)
        klog.Infof("Starting module %v", name)
    }
}

從上面的函數(shù)定義中可以看到,在cloudcore的功能模塊(model)啟動(dòng)之前,首先實(shí)例化了一個(gè)beehive的context,然后再獲得各個(gè)功能模塊,最后用一個(gè)for循環(huán)逐個(gè)啟動(dòng)功能模塊,并將已實(shí)例化的beehive的context作為參數(shù),傳入每個(gè)功能模塊的啟動(dòng)函數(shù)(Start(...))中。這樣cloudcore中獨(dú)立的功能模塊就被beehive的context給組織成了一個(gè)統(tǒng)一的整體,至于beehive的context是怎么做到的,還需要進(jìn)入beehive的context的實(shí)例化函數(shù)context.GetContext(...)一探究竟,具體如下:

kubeedge/beehive/pkg/core/context/contex_factory.go

// GetContext gets global context instance
func GetContext(contextType string) *Context {
    once.Do(func() {
        context = &Context{}
        switch contextType {
        case MsgCtxTypeChannel:
            channelContext := NewChannelContext()
            context.messageContext = channelContext
            context.moduleContext = channelContext
        default:
            klog.Warningf("Do not support context type:%s", contextType)
        }
    })
    return context
}

上面的函數(shù)定義可以中的第3行context = &Context{}實(shí)例化了一個(gè)空context,進(jìn)入該實(shí)例化context的定義,探究一下它的具體內(nèi)容:

kubeedge/beehive/pkg/core/context/context.go

// Context is global context object
type Context struct {
    moduleContext  ModuleContext
    messageContext MessageContext
}

//ModuleContext is interface for context module management
type ModuleContext interface {
    AddModule(module string)
    AddModuleGroup(module, group string)
    Cleanup(module string)
}

//MessageContext is interface for message syncing
type MessageContext interface {
    // async mode
    Send(module string, message model.Message)
    Receive(module string) (model.Message, error)
    // sync mode
    SendSync(module string, message model.Message, timeout time.Duration) (     model.Message, error)
    SendResp(message model.Message)
    // group broadcast
    SendToGroup(moduleType string, message model.Message)
    SendToGroupSync(moduleType string, message model.Message, timeout       time.Duration) error
}

從上面的Context Struct定義可以看出該Context的兩個(gè)屬性moduleContext,messageContext都是interface類型的,所以可以斷定該Context肯定不是真身,在函數(shù)GetContext(...)(kubeedge/beehive/pkg/core/context/context.go)繼續(xù)往下看,在第6行channelContext := NewChannelContext(),又一個(gè)context實(shí)例化函數(shù)NewChannelContext(),進(jìn)入該函數(shù)的定義去看一下,它是不是真身:

kubeedge/beehive/pkg/core/context/context.go

// ChannelContext is object for Context channel
type ChannelContext struct {
    //ConfigFactory goarchaius.ConfigurationFactory
    channels     map[string]chan model.Message
    chsLock      sync.RWMutex
    typeChannels map[string]map[string]chan model.Message
    typeChsLock  sync.RWMutex
    anonChannels map[string]chan model.Message
    anonChsLock  sync.RWMutex
}

在該struct的定義文件中,會(huì)看到ChannelContext struct實(shí)現(xiàn)了Context struct(kubeedge/beehive/pkg/core/context/context.go)屬性包含的所有interface(ModuleContext,MessageContext),毫無(wú)疑問(wèn)ChannelContext struct就是cloudcore中各功能模塊相互通信的消息隊(duì)列框架的真身了。

至于ChannelContext struct(kubeedge/beehive/pkg/core/context/context.go)具體是通過(guò)什么手段來(lái)實(shí)現(xiàn)cloudcore中各功能模塊相互通信的,感興趣的同學(xué)可以根據(jù)本文的梳理自己去研究一下,如果想讓我們繼續(xù)深入分析,可以在評(píng)論區(qū)留言。

cloudhub剖析

從“cloudcore功能模塊之間通信的消息框架”已經(jīng)分析到各功能模塊的啟動(dòng)了,cloudhub功能模塊啟動(dòng)函數(shù)的具體內(nèi)容如下:

kubeedge/cloud/edge/pkg/cloudhub/cloudhub.go

func (a *cloudHub) Start(c *beehiveContext.Context) {
    var ctx context.Context
    a.context = c
    ctx, a.cancel = context.WithCancel(context.Background())

    initHubConfig()

    messageq := channelq.NewChannelMessageQueue(c)

    // start dispatch message from the cloud to edge node
    go messageq.DispatchMessage(ctx)

    // start the cloudhub server
    if util.HubConfig.ProtocolWebsocket {
        go servers.StartCloudHub(servers.ProtocolWebsocket, messageq, c)
    }

    if util.HubConfig.ProtocolQuic {
        go servers.StartCloudHub(servers.ProtocolQuic, messageq, c)
    }

    if util.HubConfig.ProtocolUDS {
        // The uds server is only used to communicate with csi driver from kubeedge on cloud.
        // It is not used to communicate between cloud and edge.
        go udsserver.StartServer(util.HubConfig, c)
    }

}

從以上cloudhub的啟動(dòng)函數(shù)Start(...)定義中,可以清晰地看出cloudhub在啟動(dòng)時(shí)主要做了如下幾件事:

  1. 接受beehiveContext.Context的通信框架實(shí)例,并保存(a.context = c);
  2. 初始化cloudhub的配置(initHubConfig());
  3. 將接受到的beehiveContext.Context的通信框架實(shí)例進(jìn)行修飾(messageq := channelq.NewChannelMessageQueue(c)),在原用通信框架實(shí)例的基礎(chǔ)上加入了緩存功能;
  4. 啟動(dòng)一個(gè)消息分發(fā)go routine(go messageq.DispatchMessage(ctx)),監(jiān)聽(tīng)云端的事件下發(fā)到edge端去;
  5. 如果設(shè)置了websocket啟動(dòng),就啟動(dòng)websocket服務(wù)器的go routine(go servers.StartCloudHub(servers.ProtocolWebsocket, messageq, c)
    );
  6. 如果設(shè)置了quic啟動(dòng),就啟動(dòng)quic服務(wù)器的go routine(go servers.StartCloudHub(servers.ProtocolQuic, messageq, c));
  7. 如果設(shè)置了unix domain socket啟動(dòng),就啟動(dòng)unix domain socket服務(wù)器的go routine(go udsserver.StartServer(util.HubConfig, c));

需要對(duì)以上另外說(shuō)明的是:

  1. websocket server和quic server的功能是相同,也就是說(shuō)兩者可以選其一,如果條件允許的話,建議選quic server,速度更快一些;
  2. unix domain socket是用來(lái)與kubeedge的csi(container storage interface) 通信的;

以上就是cloudcore中cloudhub功能模塊的剖析,如果有同學(xué)對(duì)于cloudhub具體都做了那些事,是怎么做的感興趣,可以在本文的基礎(chǔ)上自行剖析,如果想讓筆者分析,請(qǐng)?jiān)u論區(qū)留言。

edgecontroller剖析

從“cloudcore功能模塊之間通信的消息框架”已經(jīng)分析到各功能模塊的啟動(dòng)了,edgecontroller功能模塊啟動(dòng)函數(shù)的具體內(nèi)容如下:

kubeedge/cloud/pkg/edgecontroller/controller.go

// Start controller
func (ctl *Controller) Start(c *beehiveContext.Context) {
    var ctx context.Context

    config.Context = c
    ctx, ctl.cancel = context.WithCancel(context.Background())

    initConfig()

    upstream, err := controller.NewUpstreamController()
    if err != nil {
        klog.Errorf("new upstream controller failed with error: %s", err)
        os.Exit(1)
    }
    upstream.Start(ctx)

    downstream, err := controller.NewDownstreamController()
    if err != nil {
        klog.Warningf("new downstream controller failed with error: %s", err)
        os.Exit(1)
    }
    downstream.Start(ctx)

}

從以上edgecontroller的啟動(dòng)函數(shù)Start(...)定義中,可以清晰地看出cloudhub在啟動(dòng)時(shí)主要做了如下幾件事:

  1. 接受beehiveContext.Context的通信框架實(shí)例,并保存(config.Context = c);

  2. 初始化edgecontroller的配置(initHubConfig());

  3. 實(shí)例化并啟動(dòng)UpstreamController

     upstream, err := controller.NewUpstreamController()
     if err != nil {
         klog.Errorf("new upstream controller failed with error: %s", err)
         os.Exit(1)
     }
     upstream.Start(ctx)
    
  4. 實(shí)例化并啟動(dòng)DownstreamController

     downstream, err := controller.NewDownstreamController()
     if err != nil {
         klog.Warningf("new downstream controller failed with error: %s", err)
         os.Exit(1)
     }
     downstream.Start(ctx)   
    

下面深入分析UpstreamController和DownstreamController都具體做了哪些事:

UpstreamController

順著UpstreamController的實(shí)例化函數(shù)找到UpstreamController struct定義:

kubeedge/cloud/pkg/edgecontroller/upstream.go

// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
    kubeClient   *kubernetes.Clientset
    messageLayer messagelayer.MessageLayer

    // message channel
    nodeStatusChan            chan model.Message
    podStatusChan             chan model.Message
    secretChan                chan model.Message
    configMapChan             chan model.Message
    serviceChan               chan model.Message
    endpointsChan             chan model.Message
    persistentVolumeChan      chan model.Message
    persistentVolumeClaimChan chan model.Message
    volumeAttachmentChan      chan model.Message
    queryNodeChan             chan model.Message
    updateNodeChan            chan model.Message
}   

看到上面UpstreamController的定義,估計(jì)正在閱讀本文的同學(xué)已經(jīng)在猜,UpstreamController是不是負(fù)責(zé)處理edge節(jié)點(diǎn)上報(bào)的nodeStatus、podStatus、secret、configMap、service、endpoints、persistentVolume、persistentVolumeClaim、volumeAttachment等資源的信息的,恭喜你猜對(duì)了,UpstreamController就是干這個(gè)事的。

以上就是UpstreamController功能模塊的剖析,如果有同學(xué)對(duì)于UpstreamController具體怎么處理edge節(jié)點(diǎn)上報(bào)的nodeStatus、podStatus、secret、configMap、service、endpoints、persistentVolume、persistentVolumeClaim、volumeAttachment等資源的信息的的感興趣,可以在本文的基礎(chǔ)上自行剖析,如果想讓筆者分析,請(qǐng)?jiān)u論區(qū)留言。

DownstreamController

順著DownstreamController的實(shí)例化函數(shù)找到DownstreamController struct定義:

kubeedge/cloud/pkg/edgecontroller/downstream.go

// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct {
    kubeClient   *kubernetes.Clientset
    messageLayer messagelayer.MessageLayer

    podManager *manager.PodManager

    configmapManager *manager.ConfigMapManager

    secretManager *manager.SecretManager

    nodeManager *manager.NodesManager

    serviceManager *manager.ServiceManager

    endpointsManager *manager.EndpointsManager

    lc *manager.LocationCache
}

看到上面DownstreamController的定義,和UpstreamController的功能,同學(xué)們應(yīng)該也可以對(duì)DownstreamController的具體功能猜到幾分了,對(duì)DownstreamController就是監(jiān)聽(tīng)cloud端pod、configmap、secret、node、service和endpoints等資源的事件,并下發(fā)到edge節(jié)點(diǎn)的。

以上就是DownstreamController功能模塊的剖析,如果有同學(xué)對(duì)于DownstreamController具體怎么處理edge節(jié)點(diǎn)上報(bào)的pod、configmap、secret、node、service和endpoints等資源的信息的的感興趣,可以在本文的基礎(chǔ)上自行剖析,如果想讓筆者分析,請(qǐng)?jiān)u論區(qū)留言。

devicecontroller剖析

從“cloudcore功能模塊之間通信的消息框架”已經(jīng)分析到各功能模塊的啟動(dòng)了,devicecontroller功能模塊啟動(dòng)函數(shù)的具體內(nèi)容如下:

kubeedge/cloud/pkg/devicecontroller/module.go

// Start controller
func (dctl *DeviceController) Start(c *beehiveContext.Context) {
    var ctx context.Context
    config.Context = c

    ctx, dctl.cancel = context.WithCancel(context.Background())

    initConfig()

    downstream, err := controller.NewDownstreamController()
    if err != nil {
        klog.Errorf("New downstream controller failed with error: %s", err)
        os.Exit(1)
    }
    upstream, err := controller.NewUpstreamController(downstream)
    if err != nil {
        klog.Errorf("new upstream controller failed with error: %s", err)
        os.Exit(1)
    }

    downstream.Start(ctx)
    // wait for downstream controller to start and load deviceModels and devices
    // TODO think about sync
    time.Sleep(1 * time.Second)
    upstream.Start(ctx)
}

看上面devicecontroller的啟動(dòng)函數(shù)是否似曾相識(shí)相識(shí),那是因?yàn)樗蚭dgecontroller的啟動(dòng)函數(shù)的邏輯基本相同,所以對(duì)于devicecontroller的剖析,同學(xué)可以參考“edgecontrller剖析“。

到此“kubeedge源碼分析系列之cloudcore”就全部結(jié)束了,大家在根據(jù)本文去閱讀kubeedge的源碼時(shí),一定要時(shí)刻提醒自己,cloudcore中的個(gè)model之間時(shí)可以通過(guò)“beehive的context消息通信框架”進(jìn)行相互通信的,否則,會(huì)產(chǎn)生一些困惑。

本文是“之江實(shí)驗(yàn)室端邊云操作系統(tǒng)團(tuán)隊(duì)” kubeedge源碼分析系列的第一篇,接下來(lái)會(huì)對(duì)各組件的源碼進(jìn)行系統(tǒng)分析。如果有機(jī)會(huì)我們團(tuán)隊(duì)也會(huì)積極解決kubeedge的issue和實(shí)現(xiàn)新的feature。

這是我們“之江實(shí)驗(yàn)室端邊云操作系統(tǒng)團(tuán)隊(duì)”維護(hù)的“之江實(shí)驗(yàn)室kubeedge源碼分析群“微信群,歡迎大家的參與?。?!

之江實(shí)驗(yàn)室kubeedge源碼分析群二維碼入口

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容