kubernete apiserver 代碼分析

K8s所有的模塊的入口函數(shù)都位于kubernetes/cmd/ 目錄下。

kube-apiserver下的包結(jié)構(gòu)

main包下是apiserver
app目錄下是app包,子目錄options下是options包。


屏幕快照 2017-08-22 上午11.33.59.png

入口main 函數(shù)

  • main函數(shù)
    //設(shè)置隨之?dāng)?shù)生成器的seed值
    rand.Seed(time.Now().UTC().UnixNano())
    //生成一個(gè)ServerRunOptions實(shí)例
    s := options.NewServerRunOptions()
    s.AddFlags(pflag.CommandLine)

    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()
    //初始化log模塊
    verflag.PrintAndExitIfRequested()
    //Run函數(shù)會(huì)調(diào)用CreateKubeAPIServer 創(chuàng)建一個(gè)kubeAPIServer結(jié)構(gòu)體實(shí)例
    //NeverStop is a <-chan struct{}
    if err := app.Run(s, wait.NeverStop); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }
}

main 函數(shù)中最前面一堆都是初始化,真正的入口是app.Run
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {}
上面是Run函數(shù)在app package中的聲明。第一個(gè)參數(shù)是一個(gè)ServerRunOptions結(jié)構(gòu)的指針。第二個(gè)參數(shù)是一個(gè)只讀的struct chanel。

  • ServerRunOptions結(jié)構(gòu)體解讀
type ServerRunOptions struct {
    GenericServerRunOptions *genericoptions.ServerRunOptions
    Etcd                    *genericoptions.EtcdOptions
    SecureServing           *genericoptions.SecureServingOptions
    InsecureServing         *kubeoptions.InsecureServingOptions
    Audit                   *genericoptions.AuditOptions
    Features                *genericoptions.FeatureOptions
    Admission               *genericoptions.AdmissionOptions
    Authentication          *kubeoptions.BuiltInAuthenticationOptions
    Authorization           *kubeoptions.BuiltInAuthorizationOptions
    CloudProvider           *kubeoptions.CloudProviderOptions
    StorageSerialization    *kubeoptions.StorageSerializationOptions
    APIEnablement           *kubeoptions.APIEnablementOptions
    AllowPrivileged           bool
    EnableLogsHandler         bool
    EventTTL                  time.Duration
    KubeletConfig             kubeletclient.KubeletClientConfig
    KubernetesServiceNodePort int
    MasterCount               int
    MaxConnectionBytesPerSec  int64
    ServiceClusterIPRange     net.IPNet // TODO: make this a list
    ServiceNodePortRange      utilnet.PortRange
    SSHKeyfile                string
    SSHUser                   string
    ProxyClientCertFile string
    ProxyClientKeyFile  string
    EnableAggregatorRouting bool
} //ServerRunOptions 的結(jié)構(gòu)定義.

genericoptiions是options.go 中import的vendor下的option包的別名
genericoptions.ServerRunOptions

type ServerRunOptions struct {
    AdvertiseAddress net.IP

    CorsAllowedOriginList       []string
    ExternalHost                string
    MaxRequestsInFlight         int
    MaxMutatingRequestsInFlight int
    MinRequestTimeout           int
    TargetRAMMB                 int
    WatchCacheSizes             []string
}

AdvertiseAddress apiserver 監(jiān)聽(tīng)的ip地址默認(rèn)為hostip
CoreAllowedOriginlist 允許訪問(wèn)的域可以使用正則限定
...
具體的各個(gè)參數(shù)的含義可以通過(guò)apiserver --help 查看
etcd etcd集群相關(guān)的參數(shù)
SecureServing 安全監(jiān)聽(tīng)的相關(guān)參數(shù)默認(rèn)初始化為0.0.0.0:6443
InsecureServing 默認(rèn)127.0.0.1:8080
...

  • wait 中定義的包級(jí)變量NerverStop
    var NeverStop <-chan struct{} = make(chan struct{})
    在main函數(shù)中通過(guò)NewServerRunOptions使用默認(rèn)參數(shù)初始化一個(gè)ServerRunOptions實(shí)例的指針并傳遞給appPackage的Run函數(shù)

app.Run

func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {


    //
    // To help debugging, immediately log version
    glog.Infof("Version: %+v", version.Get())

    nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
    if err != nil {
        return err
    }

    kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
    if err != nil {
        return err
    }

    // TPRs are enabled and not yet beta, since this these are the successor, they fall under the same enablement rule
    // If additional API servers are added, they should be gated.
    apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
    if err != nil {
        return err
    }
    apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.EmptyDelegate)
    if err != nil {
        return err
    }

    kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)
    if err != nil {
        return err
    }

    // if we're starting up a hacked up version of this API server for a weird test case,
    // just start the API server as is because clients don't get built correctly when you do this
    if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
        if insecureServingOptions != nil {
            insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
            if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
                return err
            }
        }

        return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
    }
    kubeAPIServer.GenericAPIServer.PrepareRun()

    // aggregator comes last in the chain
    aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
    if err != nil {
        return err
    }
    aggregatorConfig.ProxyTransport = proxyTransport
    aggregatorConfig.ServiceResolver = serviceResolver
    aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
    if err != nil {
        // we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
        return err
    }

    if insecureServingOptions != nil {
        insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
        if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
            return err
        }
    }

    return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)

app.run 函數(shù)通過(guò)一系列的調(diào)用最終啟用一個(gè)sync loop 。
在經(jīng)過(guò)一系列初始化后先獲取環(huán)境變量KUBE_API_VERSIONS。如果設(shè)置了這個(gè)環(huán)境變量切insecureSeveringOptions不為nil。

  • NonBlockingRun函數(shù)
    kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh)
    然后NonBlockingRun又調(diào)用了同一個(gè)文件中的 serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh)函數(shù),這里的insecureServingInfo和insecureHandle2個(gè)參數(shù)就是NonBlockingRun的前2參數(shù),internalStopCh是在NonBlockingRun中自己定義的一個(gè)struct{}channel,傳遞給insecureServingInfo只讀channel。
    然后NonBlockingRun又啟動(dòng)一個(gè)goroutine.
    go func() {
        <-stopCh
        close(internalStopCh)
    }()

這里的stopCh是wait包里聲明的那個(gè)channel,由Run函數(shù)傳入。關(guān)閉的是NonBlockingRun函數(shù)傳遞給下面的函數(shù)的channel。

  • serveInsecurely 函數(shù)
    使用insecureServingInfo.BindAddress和insecureHandler實(shí)例化一個(gè)http.server結(jié)構(gòu)。然后調(diào)用server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) RunServer函數(shù)位于k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/serve.go文件中注意這里的server.go 文件和cmd下的server.go 文件不是同一個(gè)。stopCh是在NonBlockingRun中定義的chan struct{}不是run函數(shù)傳遞進(jìn)來(lái)那個(gè)。
  • RunServer 函數(shù)根據(jù)傳遞來(lái)的http.server 結(jié)構(gòu)中的Addr和network調(diào)用net.Listen 監(jiān)聽(tīng)。如果network為空設(shè)置為tcp,Addr空?qǐng)?bào)錯(cuò)退出。在RunServer中還定義了2個(gè)goroutine
    go func() {
        <-stopCh
        ln.Close()
    }()

上面這個(gè)go routine中如果stopCh解除阻塞后就關(guān)閉建立的tcp鏈接。注意這里的stopCh 是 NonBlockingRun make的 channel。RunServer 啟動(dòng)2個(gè)goroutine后直接返回serveInsecurely函數(shù)再返回NonBlockingRun函數(shù),在NonBlockingRun函數(shù)中如果返回錯(cuò)誤會(huì)close internalStopCh這channel,這樣就接觸了stopCh這個(gè)channel的阻塞。

    go func() {
        defer utilruntime.HandleCrash()

        var listener net.Listener
        listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
        if server.TLSConfig != nil {
            listener = tls.NewListener(listener, server.TLSConfig)
        }

        err := server.Serve(listener)

        msg := fmt.Sprintf("Stopped listening on %s", tcpAddr.String())
        select {
        case <-stopCh:
            glog.Info(msg)
        default:
            panic(fmt.Sprintf("%s due to error: %v", msg, err))
        }
    }()

這個(gè)goroutine利用剛RunServer的返回的Listener,生成一個(gè)tcpKeepAliveListener,然后再利用net/http 下的server.Server 建立一個(gè)https監(jiān)聽(tīng)。跟上一個(gè)goroutine一樣,這個(gè)goroutine在返回到NonBlockingRun中然后close channel后就接觸阻塞。并打印信息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容