Kuberneters源碼分析 - Ingress nginx 主流程

Ingress Nginx的系統(tǒng)架構(gòu)

nginx-ingress-controller.png

Ingress Nginx的主流程邏輯

  • 解析命令行參數(shù)
    一個常見的命令行如下所示
    /nginx-ingress-controller --default-backend-service=ingress-nginx/default-http-backend --configmap=ingress-nginx/nginx-configuration --tcp-services-configmap=ingress-nginx/tcp-services --udp-services-configmap=ingress-nginx/udp-services --publish-service=ingress-nginx/ingress-nginx --annotations-prefix=nginx.ingress.kubernetes.io
  • 顯示nginx的版本號
    實際調(diào)用命令為:nginx -v或者nginx -V
  • 創(chuàng)建 API Server客戶端
    有兩種模式來獲取APIServer的客戶端對象:
    第一種是指定配置APIServerHost、配置KubeConfigFie;
    第二種是InCluster模式,這種一般是基于k8s調(diào)用Ingress Nginx運行,作為容器啟動,會從環(huán)境變量中讀取相關(guān)參數(shù):
    1)KUBERNETES_SERVICE_HOST 例如:KUBERNETES_PORT=tcp://10.96.0.1:443
    2)/var/run/secrets/kubernetes.io/serviceaccount 目錄下的token, ca.crt文件
  • 解析并驗證是否存在指定的缺省后端服務(wù)的名字空間和Service名稱
    這個是ingress-nginx的默認后端,用來將未知請求全部負載到這個默認后端上,這個默認后端會返回404頁面。
  • 創(chuàng)建偽SSL證書
    TODO:具體用處待分析
  • 普羅米修斯監(jiān)控初始化
  • 創(chuàng)建并啟動NGINXController對象,后面
  • 啟動HTTP服務(wù),主要用于健康檢查、指標(biāo)查看、Profile功能

創(chuàng)建NGINXController對象

這個功能在NewNGINXController方法中完成,代碼的分布解析如下:

  • 創(chuàng)建并啟動事件廣播
    eventBroadcaster := record.NewBroadcaster()  // 創(chuàng)建事件廣播對象
    eventBroadcaster.StartLogging(glog.Infof)         // 啟動事件日志記錄功能
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{  // 啟動日志sink功能,同步到API Server
        Interface: config.Client.CoreV1().Events(config.Namespace),
    })

上面的代碼創(chuàng)建并啟動了事件廣播對象,事件產(chǎn)生器是在NginxController對象構(gòu)建中創(chuàng)建

    n := &NGINXController{
            ......
            recorder :=  eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
              Component: "nginx-ingress-controller",
            })
            ......
       }

       // 產(chǎn)生事件的代碼,譬如創(chuàng)建了一個Ingress
       recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
  • 接下來會獲取系統(tǒng)名字服務(wù)器的IP信息,具體調(diào)用
    h, err := dns.GetSystemNameServers()

從 /etc/resolv.conf 文件中讀取dns的resolve ip信息

  • 創(chuàng)建NGINXController對象
    n := &NGINXController{
        isIPV6Enabled: ing_net.IsIPv6Enabled(),
        resolver:        h,
        cfg:             config,
        syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1),
        recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{
            Component: "nginx-ingress-controller",
        }),
        stopCh:   make(chan struct{}),
        updateCh: channels.NewRingChannel(1024),
        stopLock: &sync.Mutex{},
        fileSystem: fs,
        runningConfig: new(ingress.Configuration), // 運行時配置,剛開始為空,會在同步Ingress信息的時候,填充完整,見syncIngress方法
        Proxy: &TCPProxy{},
        metricCollector: mc,
    }

syncRateLimiter 成員是流控對象,k8s流控依賴于golang.org/x/time/rate中的頻率限制模塊,流量控制的接口如下:

type RateLimiter interface {
    // TryAccept returns true if a token is taken immediately. Otherwise,
    // it returns false.
    TryAccept() bool
    // Accept returns once a token becomes available.
    Accept()
    // Stop stops the rate limiter, subsequent calls to CanAccept will return false
    Stop()
    // QPS returns QPS of this rate limiter
    QPS() float32
}

實現(xiàn)流量控制,一般我們會設(shè)置流控的QPS,在執(zhí)行操作執(zhí)行,我們先調(diào)用一下流控的Accept函數(shù),等待令牌滿足,才接著往下執(zhí)行,這樣就達到了流控的效果。

updateCh成員是RingChannel實例對象,它實現(xiàn)了一個永遠不會阻塞寫操作的Channel接口。比如:如果當(dāng)我們寫入RingChannel時,RingChannel緩存滿了,那么Buffer中最老的數(shù)據(jù)就會被丟棄。

store.Storer

Storer 是一個接口,它封裝了一個獲取ingress、service、secrets和ingress annotations的方法。

type Storer interface {
    // GetBackendConfiguration returns the nginx configuration stored in a configmap
    GetBackendConfiguration() ngx_config.Configuration

    // GetConfigMap returns the ConfigMap matching key.
    GetConfigMap(key string) (*corev1.ConfigMap, error)

    // GetSecret returns the Secret matching key.
    GetSecret(key string) (*corev1.Secret, error)

    // GetService returns the Service matching key.
    GetService(key string) (*corev1.Service, error)

    // GetServiceEndpoints returns the Endpoints of a Service matching key.
    GetServiceEndpoints(key string) (*corev1.Endpoints, error)

    // GetIngress returns the Ingress matching key.
    GetIngress(key string) (*extensions.Ingress, error)

    // ListIngresses returns a list of all Ingresses in the store.
    ListIngresses() []*extensions.Ingress

    // GetIngressAnnotations returns the parsed annotations of an Ingress matching key.
    GetIngressAnnotations(key string) (*annotations.Ingress, error)

    // GetLocalSSLCert returns the local copy of a SSLCert
    GetLocalSSLCert(name string) (*ingress.SSLCert, error)

    // ListLocalSSLCerts returns the list of local SSLCerts
    ListLocalSSLCerts() []*ingress.SSLCert

    // GetAuthCertificate resolves a given secret name into an SSL certificate.
    // The secret must contain 3 keys named:
    //   ca.crt: contains the certificate chain used for authentication
    GetAuthCertificate(string) (*resolver.AuthSSLCert, error)

    // GetDefaultBackend returns the default backend configuration
    GetDefaultBackend() defaults.Backend

    // Run initiates the synchronization of the controllers
    Run(stopCh chan struct{})
}

Storer對象是與API Server溝通的入口,所以這塊是系統(tǒng)的關(guān)鍵,所有的數(shù)據(jù)變更都是從API Server過來,所以,Storer是驅(qū)動系統(tǒng)運行的關(guān)鍵模塊。Storer的具體的實現(xiàn)類為k8sStore,它的定義如下:

type k8sStore struct {
    isOCSPCheckEnabled bool

    // backendConfig contains the running configuration from the configmap
    // this is required because this rarely changes but is a very expensive
    // operation to execute in each OnUpdate invocation
    backendConfig ngx_config.Configuration

    // informer contains the cache Informers
    informers *Informer  // 封裝了所有關(guān)心的組件的通知機制

    // listers contains the cache.Store interfaces used in the ingress controller
    listers *Lister           // 從通知中獲取的對應(yīng)的只讀存儲信息

    // sslStore local store of SSL certificates (certificates used in ingress)
    // this is required because the certificates must be present in the
    // container filesystem
    sslStore *SSLCertTracker

    annotations annotations.Extractor // 提供了Ingress Annotations的名字與提取方法對

    // secretIngressMap contains information about which ingress references a
    // secret in the annotations.
    secretIngressMap ObjectRefMap  // 保存了每個Ingress引用了哪些secrets的信息

    filesystem file.Filesystem

    // updateCh
    updateCh *channels.RingChannel

    // mu protects against simultaneous invocations of syncSecret
    mu *sync.Mutex

    defaultSSLCertificate string
}

在k8sStore中封裝了幾個關(guān)鍵的對象Informers、Listener、updateCh, 其中Informers是關(guān)鍵驅(qū)動邏輯,它實時從API Server獲取各種資源的變化信息,針對不同類型的資源變化進行相應(yīng)的回調(diào)處理,最終回調(diào)函數(shù)都會形成事件放入updateCh隊列中去處理。

Informer

Informer封裝了ingress需要的SharedIndexInformers,用于與API Server交互,它是在Storer構(gòu)建時創(chuàng)建的,由于篇幅比較中,所以這里單獨拉出來探討。

SharedIndexInformer是基于一種共享的數(shù)據(jù)通知機制,共享數(shù)據(jù)通知對象的構(gòu)建是基于一個Factory來創(chuàng)建和返回。Factory會緩存創(chuàng)建過的對象,下次再次獲取同樣的對象時,會從緩存中換回。SharedIndexInformer基于兩項:底層數(shù)據(jù)(一般是API Server)和緩存數(shù)據(jù),當(dāng)數(shù)據(jù)發(fā)生變更時,在更新緩存的同時,可以同時向多個偵聽器發(fā)送通知回調(diào)處理。

type Informer struct {
    Ingress   cache.SharedIndexInformer
    Endpoint  cache.SharedIndexInformer
    Service   cache.SharedIndexInformer
    Secret    cache.SharedIndexInformer
    ConfigMap cache.SharedIndexInformer
}

從Informer的定義我們可以看出,系統(tǒng)關(guān)心的資源有:Ingress、EndPoint、Service、Secret、ConfigMap。

SharedIndexInformer的創(chuàng)建

所有的SharedIndexInformer都是基于factory創(chuàng)建,如下所示:

    // create informers factory, enable and assign required informers
    infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})

    store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
    store.listers.Ingress.Store = store.informers.Ingress.GetStore()

    store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
    store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

    store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
    store.listers.Secret.Store = store.informers.Secret.GetStore()

    store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
    store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

    store.informers.Service = infFactory.Core().V1().Services().Informer()
    store.listers.Service.Store = store.informers.Service.GetStore()

每個資源都添加了一個事件處理器,負責(zé)處理資源變更事件,這里限于篇幅,只舉例Ingress的資源進行說明:

          ingEventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ing := obj.(*extensions.Ingress)
            if !class.IsValid(ing) {
                a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                glog.Infof("ignoring add for ingress %v based on annotation %v with value %v", ing.Name, class.IngressKey, a)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))  // 產(chǎn)生一個創(chuàng)建Ingress的事件通知

            store.extractAnnotations(ing)  // 提取Annotation信息
            store.updateSecretIngressMap(ing)  // 注意這里,保存的是該Ingress引用的Secrets信息
            store.syncSecrets(ing)  // 把Ingress相應(yīng)的Secrets信息同步到對應(yīng)的文件系統(tǒng)中,主要是TLS Secrets(包括證書和Key)
            updateCh.In() <- Event{ // 放入隊列,用于更新配置文件
                Type: CreateEvent,
                Obj:  obj,
            }
        },
        DeleteFunc: func(obj interface{}) {
            ing, ok := obj.(*extensions.Ingress)
            if !ok {
                // If we reached here it means the ingress was deleted but its final state is unrecorded.
                tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                    glog.Errorf("couldn't get object from tombstone %#v", obj)
                    return
                }
                ing, ok = tombstone.Obj.(*extensions.Ingress)
                if !ok {
                    glog.Errorf("Tombstone contained object that is not an Ingress: %#v", obj)
                    return
                }
            }
            if !class.IsValid(ing) {
                glog.Infof("ignoring delete for ingress %v based on annotation %v", ing.Name, class.IngressKey)
                return
            }
            recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))

            store.listers.IngressAnnotation.Delete(ing)

            key := k8s.MetaNamespaceKey(ing)
            store.secretIngressMap.Delete(key)

            updateCh.In() <- Event{
                Type: DeleteEvent,
                Obj:  obj,
            }
        },
        UpdateFunc: func(old, cur interface{}) {
            oldIng := old.(*extensions.Ingress)
            curIng := cur.(*extensions.Ingress)
            validOld := class.IsValid(oldIng)
            validCur := class.IsValid(curIng)
            if !validOld && validCur {
                glog.Infof("creating ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validOld && !validCur {
                glog.Infof("removing ingress %v based on annotation %v", curIng.Name, class.IngressKey)
                recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            } else if validCur && !reflect.DeepEqual(old, cur) {
                recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
            }

            store.extractAnnotations(curIng)
            store.updateSecretIngressMap(curIng)
            store.syncSecrets(curIng)

            updateCh.In() <- Event{
                Type: UpdateEvent,
                Obj:  cur,
            }
        },
    }

      // 添加事件處理器
      store.informers.Ingress.AddEventHandler(ingEventHandler)

資源事件變化處理是驅(qū)動Ingress Nginx運行的關(guān)鍵,這里先不打算在本篇文章中詳細描述了,如果有必要,將會另外寫一篇文章去講述。

啟動Informer

Informer的啟動在k8sStore的Run方法中驅(qū)動,啟動。

// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
    // start informers
    s.informers.Run(stopCh)

    if s.isOCSPCheckEnabled {
        go wait.Until(s.checkSSLChainIssues, 60*time.Second, stopCh)
    }
}

// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
    go i.Endpoint.Run(stopCh)
    go i.Service.Run(stopCh)
    go i.Secret.Run(stopCh)
    go i.ConfigMap.Run(stopCh)

    // wait for all involved caches to be synced before processing items
    // from the queue
    if !cache.WaitForCacheSync(stopCh,  // 需要等待除了Ingress之外的各種資源同步完成采取處理里面的
        i.Endpoint.HasSynced,
        i.Service.HasSynced,
        i.Secret.HasSynced,
        i.ConfigMap.HasSynced,
    ) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    }

    // in big clusters, deltas can keep arriving even after HasSynced
    // functions have returned 'true'
    time.Sleep(1 * time.Second)

    // we can start syncing ingress objects only after other caches are
    // ready, because ingress rules require content from other listers, and
    // 'add' events get triggered in the handlers during caches population.
    go i.Ingress.Run(stopCh) // 啟動Ingress Informer
    if !cache.WaitForCacheSync(stopCh,  
        i.Ingress.HasSynced,
    ) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
    }
}

Informers的啟動代碼比較清晰,首先啟動除了Ingress Informer之外的其他資源的Informer,并且等待cache同步完成后,采取啟動Ingress Informer,因為Ingress的規(guī)則需要其他資源的內(nèi)容。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容