[istio源碼分析][pilot] pilot之configController (mcp client)

1. 前言

轉(zhuǎn)載請(qǐng)說(shuō)明原文出處, 尊重他人勞動(dòng)成果!

源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)
在前面幾篇文章中已經(jīng)分析了galley的整個(gè)流程, galley中最終把從source(fs, k8s) 中獲得的數(shù)據(jù)會(huì)從mcp serverpushmcp client, 那本文將會(huì)分析pilotconfigController是如何使用mcp client來(lái)接收數(shù)據(jù)并如何處理的.

2. ConfigController

先看一下configControllerpilot是如何初始化的.

// pilot/cmd/pilot-discovery/main.go
var (
    serverArgs = bootstrap.PilotArgs{
        CtrlZOptions:     ctrlz.DefaultOptions(),
        KeepaliveOptions: keepalive.DefaultOption(),
    }
    ...
)
...
discoveryServer, err := bootstrap.NewServer(serverArgs)
...

// pilot/pkg/bootstrap/server.go
func NewServer(args PilotArgs) (*Server, error) {
    if err := s.initMesh(&args); err != nil {
        return nil, fmt.Errorf("mesh: %v", err)
    }
    ...
    if err := s.initConfigController(&args); err != nil {
        return nil, fmt.Errorf("config controller: %v", err)
    }
}

對(duì)分析不影響的代碼直接刪減了.

func (s *Server) initConfigController(args *PilotArgs) error {
    if len(s.mesh.ConfigSources) > 0 {
        // 如果有config source的配置 則配置mcp client
        if err := s.initMCPConfigController(args); err != nil {
            return err
        }
    } 
    ...
    // Create the config store.
    s.istioConfigStore = model.MakeIstioStore(s.configController)
    return nil
}

1. 可以看到s.istioConfigStore實(shí)質(zhì)上就是s.configController.
2. 主要關(guān)注mcp configuration, 關(guān)于mesh配置信息可以參考 [istio源碼分析] istio源碼開(kāi)發(fā)調(diào)試版簡(jiǎn)單安裝 .

2.1 initMCPConfigController

func (s *Server) initMCPConfigController(args *PilotArgs) error {
    clientNodeID := ""
    collections := make([]sink.CollectionOptions, len(model.IstioConfigTypes))
    for i, t := range model.IstioConfigTypes {
        // 都是istio crd資源 沒(méi)有原生的k8s資源 比如pod, service等
        collections[i] = sink.CollectionOptions{Name: t.Collection, Incremental: false}
    }

    options := coredatamodel.Options{
        DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
        // 后面會(huì)用到
        ClearDiscoveryServerCache: func() {
            s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
        },
    }
    ...
    for _, configSource := range s.mesh.ConfigSources {
        if strings.Contains(configSource.Address, fsScheme+"://") {
            ...
        }
        // 設(shè)置安全訪問(wèn)的情況 在以后分析policy的時(shí)候會(huì)用到
        securityOption := grpc.WithInsecure()
        if configSource.TlsSettings != nil &&
            configSource.TlsSettings.Mode != istio_networking_v1alpha3.TLSSettings_DISABLE {
            ...
        }
        ...
        conn, err := grpc.DialContext(
            ctx, configSource.Address,
            securityOption, msgSizeOption, keepaliveOption, initialWindowSizeOption, initialConnWindowSizeOption)
        ...
        // 創(chuàng)建一個(gè)controller
        mcpController := coredatamodel.NewController(options)
        sinkOptions := &sink.Options{
            CollectionOptions: collections,
            Updater:           mcpController,
            ID:                clientNodeID,
            Reporter:          reporter,
        }
        // 創(chuàng)建mcp client
        cl := mcpapi.NewResourceSourceClient(conn)
        mcpClient := sink.NewClient(cl, sinkOptions)
        configz.Register(mcpClient)
        clients = append(clients, mcpClient)

        conns = append(conns, conn)
        // 將該controller加入到configStores
        configStores = append(configStores, mcpController)
    }
    ...
    // Wrap the config controller with a cache.
    aggregateMcpController, err := configaggregate.MakeCache(configStores)
    if err != nil {
        return err
    }
    s.configController = aggregateMcpController
    return nil
}

1. 關(guān)注options.ClearDiscoveryServerCache, 后面會(huì)用到.
2. coredatamodel.NewController(options)創(chuàng)建一個(gè)controller.
3. sink.NewClient(cl, sinkOptions)創(chuàng)建一個(gè)mcp client, 注意sinkOptions.Updater就是2.中創(chuàng)建的controller. 另外mcp server端在galley.
4. configaggregate.MakeCache(configStores) 是將所有的controller按照其支持的collection(比如virtualService對(duì)應(yīng)了哪些controller)進(jìn)行分類起來(lái).

2.2 mcp client

// pkg/mcp/sink/client_sink.go
func NewClient(client mcp.ResourceSourceClient, options *Options) *Client {
    return &Client{
        Sink:     New(options),
        reporter: options.Reporter,
        client:   client,
    }
}
// pkg/mcp/sink/sink.go
func New(options *Options) *Sink { // nolint: lll
    nodeInfo := &mcp.SinkNode{
        Id:          options.ID,
        Annotations: options.Metadata,
    }
    state := make(map[string]*perCollectionState)
    // state來(lái)自options.CollectionOptions
    for _, collection := range options.CollectionOptions {
        state[collection.Name] = &perCollectionState{
            versions:           make(map[string]string),
            requestIncremental: collection.Incremental,
        }
    }
    return &Sink{
        ...
    }
}

這里需要關(guān)注如下:
1. 可以看到Sink中的state來(lái)自options.CollectionOptions, 往上追溯到initMCPConfigControllermodel.IstioConfigTypes.

IstioConfigTypes = ConfigDescriptor{
        VirtualService,
        Gateway,
        ServiceEntry,
        DestinationRule,
        EnvoyFilter,
        Sidecar,
        HTTPAPISpec,
        HTTPAPISpecBinding,
        QuotaSpec,
        QuotaSpecBinding,
        AuthenticationPolicy,
        AuthenticationMeshPolicy,
        ServiceRole,
        ServiceRoleBinding,
        RbacConfig,
        ClusterRbacConfig,
    }

可以看到model.IstioConfigTypes中看到的都是istio中的一些crd資源, 也就是說(shuō)從galley中得到的config resource都是這些資源, 沒(méi)有k8s中的原生資源, 比如Pod等.

2.3 mcp client Start

// pkg/mcp/sink/client_sink.go
func (c *Client) Run(ctx context.Context) {
    ...
    for {
        // 建立連接
        for {
            ...
            stream, err := c.client.EstablishResourceStream(ctx)
            ...
        }
        // 處理
        err := c.ProcessStream(c.stream)
        ...
    }
}
// pkg/mcp/sink/sink.go
func (sink *Sink) ProcessStream(stream Stream) error {
    // send initial requests for each supported type
    // 為每一個(gè)支持的類型發(fā)送一個(gè)初始的請(qǐng)求
    initialRequests := sink.createInitialRequests()
    for {
        var req *mcp.RequestResources
        if len(initialRequests) > 0 {
            // 發(fā)送初始request
            req = initialRequests[0]
            initialRequests = initialRequests[1:]
        } else {
            // 從server端接收response
            resources, err := stream.Recv()
            if err != nil {
                if err != io.EOF {
                    sink.reporter.RecordRecvError(err, status.Code(err))
                    scope.Errorf("Error receiving MCP resource: %v", err)
                }
                return err
            }
            // client端處理后需要發(fā)送ACK/NACK
            // 所以處理response后組裝了一個(gè)request
            req = sink.handleResponse(resources)
        }

        sink.journal.RecordRequestResources(req)
        // 向server端發(fā)送request
        if err := stream.Send(req); err != nil {
            sink.reporter.RecordSendError(err, status.Code(err))
            scope.Errorf("Error sending MCP request: %v", err)
            return err
        }
    }
}

關(guān)于mcp中的clientserver之間的交互在 [istio源碼分析][galley] galley之下游(mcp) 中已經(jīng)有介紹, 這里再次說(shuō)明一下.

關(guān)于mcp可以參考 https://github.com/istio/api/tree/master/mcp, 這里用此圖可以增加理解

mcp.png

對(duì)比此圖和ProcessStream來(lái)進(jìn)行說(shuō)明:
1. client端為每一個(gè)支持的類型initialRequests發(fā)送一個(gè)初始的請(qǐng)求.
2. server端會(huì)返回一個(gè)response.
3. client端需要返回一個(gè)ACK/NACK, 所以ProcessStream中的sink.handleResponse(resources)中處理完response又構(gòu)造了一個(gè)新的request來(lái)返回給server端.

所以先看一下都發(fā)了哪些類型:

func (sink *Sink) createInitialRequests() []*mcp.RequestResources {
    sink.mu.Lock()

    initialRequests := make([]*mcp.RequestResources, 0, len(sink.state))
    // sink.state 來(lái)源自 initMCPConfigController中的model.IstioConfigTypes
    for collection, state := range sink.state {
        var initialResourceVersions map[string]string
        if state.requestIncremental {
            ...
        }
        req := &mcp.RequestResources{
            SinkNode:                sink.nodeInfo,
            Collection:              collection,
            InitialResourceVersions: initialResourceVersions,
            Incremental:             state.requestIncremental,
        }
        initialRequests = append(initialRequests, req)
    }
    sink.mu.Unlock()
    return initialRequests
}

可以看到發(fā)送的類型就是initMCPConfigController中的model.IstioConfigTypes.

2.4 handleResponse

func (sink *Sink) handleResponse(resources *mcp.Resources) *mcp.RequestResources {
    if handleResponseDoneProbe != nil {
        defer handleResponseDoneProbe()
    }
    // 必須是支持的類型
    state, ok := sink.state[resources.Collection]
    if !ok {
        errDetails := status.Errorf(codes.Unimplemented, "unsupported collection %v", resources.Collection)
        return sink.sendNACKRequest(resources, errDetails)
    }

    change := &Change{
        Collection:        resources.Collection,
        Objects:           make([]*Object, 0, len(resources.Resources)),
        Removed:           resources.RemovedResources,
        Incremental:       resources.Incremental,
        SystemVersionInfo: resources.SystemVersionInfo,
    }
    
    for _, resource := range resources.Resources {
        var dynamicAny types.DynamicAny
        if err := types.UnmarshalAny(resource.Body, &dynamicAny); err != nil {
            return sink.sendNACKRequest(resources, err)
        }

        // TODO - use galley metadata to verify collection and type_url match?
        object := &Object{
            TypeURL:  resource.Body.TypeUrl,
            Metadata: resource.Metadata,
            Body:     dynamicAny.Message,
        }
        change.Objects = append(change.Objects, object)
    }

    if err := sink.updater.Apply(change); err != nil {
        // 發(fā)送NACK
        errDetails := status.Error(codes.InvalidArgument, err.Error())
        return sink.sendNACKRequest(resources, errDetails)
    }
    ...
    // ACK
    sink.reporter.RecordRequestAck(resources.Collection, 0)
    req := &mcp.RequestResources{
        SinkNode:      sink.nodeInfo,
        Collection:    resources.Collection,
        ResponseNonce: resources.Nonce,
        Incremental:   useIncremental,
    }
    return req
}

1. 根據(jù)response發(fā)送回來(lái)的數(shù)據(jù)組裝成change, 并將該change作為參數(shù)調(diào)用sink.updater.Apply方法.
2. 有任何錯(cuò)誤會(huì)發(fā)送NACKserver端, 如果沒(méi)有錯(cuò)誤就發(fā)送ACKserver端.

sink.updater是什么呢? 在initMCPConfigController中可以看到:

        options := coredatamodel.Options{
            DomainSuffix: args.Config.ControllerOptions.DomainSuffix,
             // 后面會(huì)用到
            ClearDiscoveryServerCache: func() {
                s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})
             },
        }
        ...
        mcpController := coredatamodel.NewController(options)
        sinkOptions := &sink.Options{
            CollectionOptions: collections,
            Updater:           mcpController,
            ID:                clientNodeID,
            Reporter:          reporter,
        }
        // 創(chuàng)建mcp client
        cl := mcpapi.NewResourceSourceClient(conn)
        mcpClient := sink.NewClient(cl, sinkOptions)

sink.updater就是mcpController.

3. Controller

func NewController(options Options) CoreDataModel {
    descriptorsByMessageName := make(map[string]model.ProtoSchema, len(model.IstioConfigTypes))
    synced := make(map[string]bool)
    for _, descriptor := range model.IstioConfigTypes {
        // don't register duplicate descriptors for the same collection
        if _, ok := descriptorsByMessageName[descriptor.Collection]; !ok {
            descriptorsByMessageName[descriptor.Collection] = descriptor
            synced[descriptor.Collection] = false
        }
    }
    return &Controller{
        ...
    }
}

關(guān)注一下descriptorsByMessageName是如何生成的即可.

3.1 Apply

func (c *Controller) Apply(change *sink.Change) error {
    descriptor, ok := c.descriptorsByCollection[change.Collection]
    if !ok {
        return fmt.Errorf("apply type not supported %s", change.Collection)
    }

    schema, valid := c.ConfigDescriptor().GetByType(descriptor.Type)
    if !valid {
        return fmt.Errorf("descriptor type not supported %s", descriptor.Type)
    }

    c.syncedMu.Lock()
    c.synced[change.Collection] = true
    c.syncedMu.Unlock()

    // innerStore is [namespace][name]
    innerStore := make(map[string]map[string]*model.Config)
    // 根據(jù)change的信息生成以innerStore
    for _, obj := range change.Objects {
        //構(gòu)造innerStore
    }

    var prevStore map[string]map[string]*model.Config

    c.configStoreMu.Lock()
    prevStore = c.configStore[descriptor.Type]
    c.configStore[descriptor.Type] = innerStore
    c.configStoreMu.Unlock()

    if descriptor.Type == model.ServiceEntry.Type {
        c.serviceEntryEvents(innerStore, prevStore)
    } else {
        c.options.ClearDiscoveryServerCache()
    }

    return nil
}

1. 根據(jù)change構(gòu)造innerStore, 進(jìn)而更新該類型在c.configStore中的內(nèi)容.
2. 根據(jù)舊內(nèi)容prevStore和新內(nèi)容innerStore來(lái)做分發(fā)工作.

2.1 如果是ServiceEntry, 調(diào)用serviceEntryEvents方法.

func (c *Controller) serviceEntryEvents(currentStore, prevStore map[string]map[string]*model.Config) {
    dispatch := func(model model.Config, event model.Event) {}
    if handlers, ok := c.eventHandlers[model.ServiceEntry.Type]; ok {
        dispatch = func(model model.Config, event model.Event) {
            log.Debugf("MCP event dispatch: key=%v event=%v", model.Key(), event.String())
            for _, handler := range handlers {
                handler(model, event)
            }
        }
    }

    // add/update
    for namespace, byName := range currentStore {
        for name, config := range byName {
            if prevByNamespace, ok := prevStore[namespace]; ok {
                if prevConfig, ok := prevByNamespace[name]; ok {
                    if config.ResourceVersion != prevConfig.ResourceVersion {
                        dispatch(*config, model.EventUpdate)
                    }
                } else {
                    dispatch(*config, model.EventAdd)
                }
            } else {
                dispatch(*config, model.EventAdd)
            }
        }
    }
    ...
}
func (c *Controller) RegisterEventHandler(typ string, handler func(model.Config, model.Event)) {
    c.eventHandlers[typ] = append(c.eventHandlers[typ], handler)
}

通過(guò)注冊(cè)好了的handler來(lái)處理這些生成的事件.

2.2 如果不是ServiceEntry, 調(diào)用ClearDiscoveryServerCache方法.

s.EnvoyXdsServer.ConfigUpdate(&model.PushRequest{Full: true})

所以這個(gè)放到以后分析.

4. 總結(jié)

pilot.png

mcp server中接收數(shù)據(jù)后通過(guò)handleResponse調(diào)用controllerApply方法, 通過(guò)類型來(lái)進(jìn)行處理. 處理完向server端返回ACK/NACK.

5. 參考

1. istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容