[istio源碼分析][galley] galley之下游(mcp)

1. 前言

轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!

源碼位置: https://github.com/nicktming/istio
分支: tming-v1.3.6 (基于1.3.6版本)

1. [istio源碼分析][galley] galley之上游(source)
2. [istio源碼分析][galley] galley之runtime
3. [istio源碼分析][galley] galley之下游(mcp)

在上文 [istio源碼分析][galley] galley之runtime 中分析了galley整個機(jī)制中一個承上啟下的組件, 本文將分析該組件的下游部分, 也就是mcp server端會承擔(dān)此部分, 所有對接的mcp client(比如pilot)將會接收到此信息.

2. server

可以先看看server端是如何初始化的.

// galley/pkg/server/components/processing.go
func NewProcessing(a *settings.Args) *Processing {
    d := snapshot.New(groups.IndexFunction)
    return &Processing{
        args:         a,
        distributor:  d,
        configzTopic: configz.CreateTopic(d),
    }
}

p.distributor 就是snapshot的一個實(shí)例(后面會對snapshot分析), 接著看Start()方法.

func (p *Processing) Start() (err error) {
    // TODO: cleanup

    ...
    types := p.getMCPTypes()
    processorCfg := runtime.Config{
        DomainSuffix:             p.args.DomainSuffix,
        Mesh:                     mesh,
        Schema:                   types,
        SynthesizeServiceEntries: p.args.EnableServiceDiscovery,
    }
    p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)

    grpcOptions := p.getServerGrpcOptions()

    p.stopCh = make(chan struct{})
    var checker source.AuthChecker = server.NewAllowAllChecker()
    ...
    grpc.EnableTracing = p.args.EnableGRPCTracing
    p.grpcServer = grpc.NewServer(grpcOptions...)

    p.reporter = mcpMetricReporter("galley")

    options := &source.Options{
        Watcher:            p.distributor,
        Reporter:           p.reporter,
        CollectionsOptions: source.CollectionOptionsFromSlice(types.Collections()),
        ConnRateLimiter:    mcprate.NewRateLimiter(time.Second, 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
    }

    md := grpcMetadata.MD{
        versionMetadataKey: []string{version.Info.Version},
    }
    if err := parseSinkMeta(p.args.SinkMeta, md); err != nil {
        return err
    }
    ...
    serverOptions := &source.ServerOptions{
        AuthChecker: checker,
        RateLimiter: rate.NewLimiter(rate.Every(time.Second), 100), // TODO(Nino-K): https://github.com/istio/istio/issues/12074
        Metadata:    md,
    }
    p.mcpSource = source.NewServer(options, serverOptions)
    ...
}

關(guān)注這幾個地方:
1. options中的Watcher就是p.distributor.
2. p.mcpSource = source.NewServer(options, serverOptions) 創(chuàng)建一個mcp server端.

// pkg/mcp/source/server_source.go
func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {
    s := &Server{
        src:         New(srcOptions),
        authCheck:   serverOptions.AuthChecker,
        rateLimiter: serverOptions.RateLimiter,
        metadata:    serverOptions.Metadata,
    }
    return s
}
// pkg/mcp/source/source.go
func New(options *Options) *Source {
    s := &Source{
        watcher:        options.Watcher,
        collections:    options.CollectionsOptions,
        reporter:       options.Reporter,
        requestLimiter: options.ConnRateLimiter,
    }
    return s
}

可以看到server端的src的對象都是從Processing.options里面來的.

2.1 EstablishResourceStream

// pkg/mcp/source/server_source.go
func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {
    ...
    err := s.src.ProcessStream(stream)
    code := status.Code(err)
    if code == codes.OK || code == codes.Canceled || err == io.EOF {
        return nil
    }
    return err
}

主要關(guān)注ProcessStream方法

2.2 ProcessStream

1. 通過newConnection為該stream建立連接.
2. 異步接收request, 通過channel(con.requestC傳遞request進(jìn)行處理)
3. 循環(huán)處理request, 從con.requestC中獲得request, 通過processClientRequest方法處理request, 從con.queue讀取需要返回給client端的response, 所以可想而知processClientRequest中會組裝response放到con.queue中.

func (s *Source) ProcessStream(stream Stream) error {
    // 為該client建立連接
    con := s.newConnection(stream)

    defer s.closeConnection(con)
    // 接收request
    go con.receive()

    for {
        select {
        case <-con.queue.Ready():
            collection, item, ok := con.queue.Dequeue()
            if !ok {
                break
            }
            resp := item.(*WatchResponse)
            w, ok := con.watches[collection]
            if !ok {
                scope.Errorf("unknown collection in dequeued watch response: %v", collection)
                break // bug?
            }
            // the response may have been cleared before we got to it
            if resp != nil {
                if err := con.pushServerResponse(w, resp); err != nil {
                    return err
                }
            }
        case req, more := <-con.requestC:
            // 接收request 可想而知
            // receive方法主要是把request放到con.requestC中
            if !more {
                return con.reqError
            }
            if con.limiter != nil {
                if err := con.limiter.Wait(stream.Context()); err != nil {
                    return err
                }

            }
            // 處理request
            if err := con.processClientRequest(req); err != nil {
                return err
            }
        case <-con.queue.Done():
            // queue 關(guān)閉
            scope.Debugf("MCP: connection %v: stream done", con)
            return status.Error(codes.Unavailable, "server canceled watch")
        }
    }
}

先看一下如何建立連接的

// pkg/mcp/source/source.go
func (s *Source) newConnection(stream Stream) *connection {
    peerAddr := "0.0.0.0"

    peerInfo, ok := peer.FromContext(stream.Context())
    if ok {
        peerAddr = peerInfo.Addr.String()
    } else {
        scope.Warnf("No peer info found on the incoming stream.")
        peerInfo = nil
    }

    con := &connection{
        stream:   stream,
        peerAddr: peerAddr,
        requestC: make(chan *mcp.RequestResources),
        watches:  make(map[string]*watch),
        watcher:  s.watcher,
        id:       atomic.AddInt64(&s.nextStreamID, 1),
        reporter: s.reporter,
        limiter:  s.requestLimiter.Create(),
        queue:    internal.NewUniqueScheduledQueue(len(s.collections)),
    }

    // 為每個collection建立watch
    collections := make([]string, 0, len(s.collections))
    for i := range s.collections {
        collection := s.collections[i]
        w := &watch{
            ackedVersionMap: make(map[string]string),
            incremental:     collection.Incremental,
        }
        con.watches[collection.Name] = w
        collections = append(collections, collection.Name)
    }
    ...
    return con
}

可以看到主要是為了給每個collection建立一個watch對象. 看一下接收函數(shù)如何實(shí)現(xiàn)的.

func (con *connection) receive() {
    defer close(con.requestC)
    for {
        // 接收信息
        req, err := con.stream.Recv()
        if err != nil {
            if err == io.EOF {
                scope.Infof("MCP: connection %v: TERMINATED %q", con, err)
                return
            }
            con.reporter.RecordRecvError(err, status.Code(err))
            scope.Errorf("MCP: connection %v: TERMINATED with errors: %v", con, err)
            con.reqError = err
            return
        }
        select {
        // 寫入到channel con.requestC
        case con.requestC <- req:
        case <-con.queue.Done():
            scope.Debugf("MCP: connection %v: stream done", con)
            return
        case <-con.stream.Context().Done():
            scope.Debugf("MCP: connection %v: stream done, err=%v", con, con.stream.Context().Err())
            return
        }
    }
}

可以看到從client端接到request后會放入con.requestC這個channel. 所以現(xiàn)在回到ProcessStream方法中看看從con.requestC中收到request會如何操作, 會調(diào)用processClientRequest處理request.

2.3 processClientRequest

func (con *connection) processClientRequest(req *mcp.RequestResources) error {
    if isTriggerResponse(req) {
        return nil
    }

    collection := req.Collection

    con.reporter.RecordRequestSize(collection, con.id, internal.ProtoSize(req))
    // 取出watch
    w, ok := con.watches[collection]
    if !ok {
        return status.Errorf(codes.InvalidArgument, "unsupported collection %q", collection)
    }
    if req.ResponseNonce == "" || w.pending.GetNonce() == req.ResponseNonce {
        versionInfo := ""

        if w.pending == nil {
            // 發(fā)送請求(第一次發(fā)送)
            scope.Infof("MCP: connection %v: inc=%v WATCH for %v", con, req.Incremental, collection)
        } else {
            // 發(fā)送ACK或者NACK (第二次發(fā)送) 
            versionInfo = w.pending.SystemVersionInfo
            if req.ErrorDetail != nil {
                scope.Warnf("MCP: connection %v: NACK collection=%v version=%q with nonce=%q error=%#v inc=%v", // nolint: lll
                    con, collection, req.ResponseNonce, versionInfo, req.ErrorDetail, req.Incremental)
                con.reporter.RecordRequestNack(collection, con.id, codes.Code(req.ErrorDetail.Code))
            } else {
                scope.Infof("MCP: connection %v ACK collection=%v with version=%q nonce=%q inc=%v",
                    con, collection, versionInfo, req.ResponseNonce, req.Incremental)
                con.reporter.RecordRequestAck(collection, con.id)

                internal.UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)
            }

            // clear the pending request after we finished processing the corresponding response.
            w.pending = nil
        }

        if w.cancel != nil {
            w.cancel()
        }

        sr := &Request{
            SinkNode:    req.SinkNode,
            Collection:  collection,
            VersionInfo: versionInfo,
            incremental: req.Incremental,
        }
        // con.watcher = snapshot
        // snapshot的Watcher方法中會組裝response 并調(diào)用queueResponse方法將response入隊列
        w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)
    } else {
        ...
    }
    return nil
}

func (con *connection) queueResponse(resp *WatchResponse) {
    if resp == nil {
        con.queue.Close()
    } else {
        con.queue.Enqueue(resp.Collection, resp)
    }
}

關(guān)于mcp可以參考 https://github.com/istio/api/tree/master/mcp, 這里用此圖可以增加理解

mcp.png

從以上圖片和processClientRequest可以知道:
1. 第一次從client端發(fā)送request, 以后的內(nèi)容都會是從serverpushclient端.

這里先分析從client發(fā)送request然后server返回response最后client發(fā)送ACK的過程. 然后再分析server是如何主動push信息到client端并且client端返回ACK.

2. 更新該collection對應(yīng)的watch對象中的cancel方法.
3. 關(guān)注w.pending變量的作用.
4. response信息在snapshot中組裝后放入到了con.queue中.

可以看一下snapshotwatch方法

func (c *Cache) Watch(request *source.Request, pushResponse source.PushResponseFunc, peerAddr string) source.CancelWatchFunc { // nolint: lll
    group := c.groupIndex(request.Collection, request.SinkNode)
    c.mu.Lock()
    defer c.mu.Unlock()
    // 更新status
    info := c.fillStatus(group, request, peerAddr)
    collection := request.Collection
    // return an immediate response if a snapshot is available and the
    // requested version doesn't match.
    // 這個snapshots會在setSnapshot方法中更新
    if snapshot, ok := c.snapshots[group]; ok {

        version := snapshot.Version(request.Collection)
        scope.Debugf("Found snapshot for group: %q for %v @ version: %q",
            group, request.Collection, version)

        if version != request.VersionInfo {
            scope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)
            response := &source.WatchResponse{
                Collection: request.Collection,
                Version:    version,
                Resources:  snapshot.Resources(request.Collection),
                Request:    request,
            }
            // 放入到con.queue中
            pushResponse(response)
            return nil
        }
        info.synced[request.Collection][peerAddr] = true
    }
    c.watchCount++
    watchID := c.watchCount
    ...
    info.mu.Lock()
    // 更新watches
    info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}
    info.mu.Unlock()
    ...
    return cancel
}

1. 如果version不同的時候會通過pushResponse方法放入到con.queue中將response發(fā)送給client端.
2. 如果snapshots中沒有或者version沒有更新, 則會更新info.watches, 在setSnapshot方法中server端會pushclient端.

2.4 pushServerResponse

func (con *connection) pushServerResponse(w *watch, resp *WatchResponse) error {
    ...
    if incremental {
        added, removed = calculateDelta(resp.Resources, w.ackedVersionMap)
    } else {
        // resp.Resources就是snapshot快照里面的內(nèi)容
        for _, resource := range resp.Resources {
            added = append(added, *resource)
        }
    }
    msg := &mcp.Resources{
        SystemVersionInfo: resp.Version,
        Collection:        resp.Collection,
        Resources:         added,
        RemovedResources:  removed,
        Incremental:       incremental,
    }
    // increment nonce
    con.streamNonce++
    msg.Nonce = strconv.FormatInt(con.streamNonce, 10)
    if err := con.stream.Send(msg); err != nil {
        con.reporter.RecordSendError(err, status.Code(err))
        return err
    }
    scope.Debugf("MCP: connection %v: SEND collection=%v version=%v nonce=%v inc=%v",
        con, resp.Collection, resp.Version, msg.Nonce, msg.Incremental)
    // 在向client端發(fā)送成功后設(shè)置w.pending
    // 當(dāng)client端發(fā)送ACK/NACK的時候用于驗(yàn)證
    w.pending = msg
    return nil
}

1.response組裝成mcp.Resources發(fā)送給client端.
2. 在向client端發(fā)送成功后設(shè)置w.pending, 當(dāng)client端發(fā)送ACK/NACK的時候server端會在processClientRequest方法用于判斷.
如果是ACK,會調(diào)用UpdateResourceVersionTracking(w.ackedVersionMap, w.pending)方法更新w.ackedVersionMap, w.ackedVersionMap 記錄著client端目前保存的內(nèi)容.

2.5 總結(jié)

現(xiàn)在來整體說一下整個流程.

mcp.png

1. 第一次由client端發(fā)送request.
2. 然后server端發(fā)送數(shù)據(jù)給client端.
3. 然后client端向server端發(fā)送ACK/NACK, server端根據(jù)反饋情況做處理. 比如反饋ACK時會更新w.ackedVersionMap.

接著server端會主動給client端發(fā)送數(shù)據(jù), 那何時發(fā)數(shù)據(jù)呢?這個時候就與 [istio源碼分析][galley] galley之runtime 中分析的有關(guān), 從 [istio源碼分析][galley] galley之runtime 中知道上游source把數(shù)據(jù)以事件形式交由runtime處理后交給p.distributor處理, 從Start()方法中知道p.distributor就是snapshot.

func NewProcessing(a *settings.Args) *Processing {
    d := snapshot.New(groups.IndexFunction)
    return &Processing{
        args:         a,
        distributor:  d,
        configzTopic: configz.CreateTopic(d),
    }
}
func (p *Processing) Start() (err error) {
    ...
    p.processor = runtime.NewProcessor(src, p.distributor, &processorCfg)
    ...
    return nil
}

p.distributor會通過SetSnapshot

// galley/pkg/runtime/processor.go
func (p *Processor) Start() error {
          case <-p.stateStrategy.Publish:
                scope.Debug("Processor.process: publish")
                // 將當(dāng)前state對象內(nèi)存中保存的對象建立一個快照
                s := p.state.buildSnapshot()
                // 該快照將交由distributor處理
                p.distributor.SetSnapshot(groups.Default, s)
            }
}
// pkg/mcp/snapshot/snapshot.go
func (c *Cache) SetSnapshot(group string, snapshot Snapshot) {
    c.mu.Lock()
    defer c.mu.Unlock()

    // update the existing entry
    c.snapshots[group] = snapshot

    // trigger existing watches for which version changed
    if info, ok := c.status[group]; ok {
        info.mu.Lock()
        defer info.mu.Unlock()
        // 遍歷所有的watches
        for id, watch := range info.watches {
            version := snapshot.Version(watch.request.Collection)
            if version != watch.request.VersionInfo {
                scope.Infof("SetSnapshot(): respond to watch %d for %v @ version %q",
                    id, watch.request.Collection, version)

                response := &source.WatchResponse{
                    Collection: watch.request.Collection,
                    Version:    version,
                    Resources:  snapshot.Resources(watch.request.Collection),
                    Request:    watch.request,
                }
                // 調(diào)用push方法
                // 將response放入到con.queue中 發(fā)送給client端
                watch.pushResponse(response)

                // discard the responseWatch
                delete(info.watches, id)

                scope.Debugf("SetSnapshot(): watch %d for %v @ version %q complete",
                    id, watch.request.Collection, version)
            }
        }
    }
}

info.watches是如何產(chǎn)生的呢?在snapshotwatch方法中會更新info.watch, 每次client發(fā)送ACK/NACK的時候都會更新info.watch. 所以當(dāng)上游有事件產(chǎn)生的時候都會觸發(fā)SetSnapshot進(jìn)而向clientpush信息.

3. 總結(jié)

1. istio 1.3.6源碼
2. https://cloud.tencent.com/developer/article/1409159

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容