K8s所有的模塊的入口函數(shù)都位于kubernetes/cmd/ 目錄下。
kube-apiserver下的包結(jié)構(gòu)
main包下是apiserver
app目錄下是app包,子目錄options下是options包。

入口main 函數(shù)
- main函數(shù)
//設(shè)置隨之?dāng)?shù)生成器的seed值
rand.Seed(time.Now().UTC().UnixNano())
//生成一個(gè)ServerRunOptions實(shí)例
s := options.NewServerRunOptions()
s.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
//初始化log模塊
verflag.PrintAndExitIfRequested()
//Run函數(shù)會(huì)調(diào)用CreateKubeAPIServer 創(chuàng)建一個(gè)kubeAPIServer結(jié)構(gòu)體實(shí)例
//NeverStop is a <-chan struct{}
if err := app.Run(s, wait.NeverStop); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
main 函數(shù)中最前面一堆都是初始化,真正的入口是app.Run
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {}
上面是Run函數(shù)在app package中的聲明。第一個(gè)參數(shù)是一個(gè)ServerRunOptions結(jié)構(gòu)的指針。第二個(gè)參數(shù)是一個(gè)只讀的struct chanel。
- ServerRunOptions結(jié)構(gòu)體解讀
type ServerRunOptions struct {
GenericServerRunOptions *genericoptions.ServerRunOptions
Etcd *genericoptions.EtcdOptions
SecureServing *genericoptions.SecureServingOptions
InsecureServing *kubeoptions.InsecureServingOptions
Audit *genericoptions.AuditOptions
Features *genericoptions.FeatureOptions
Admission *genericoptions.AdmissionOptions
Authentication *kubeoptions.BuiltInAuthenticationOptions
Authorization *kubeoptions.BuiltInAuthorizationOptions
CloudProvider *kubeoptions.CloudProviderOptions
StorageSerialization *kubeoptions.StorageSerializationOptions
APIEnablement *kubeoptions.APIEnablementOptions
AllowPrivileged bool
EnableLogsHandler bool
EventTTL time.Duration
KubeletConfig kubeletclient.KubeletClientConfig
KubernetesServiceNodePort int
MasterCount int
MaxConnectionBytesPerSec int64
ServiceClusterIPRange net.IPNet // TODO: make this a list
ServiceNodePortRange utilnet.PortRange
SSHKeyfile string
SSHUser string
ProxyClientCertFile string
ProxyClientKeyFile string
EnableAggregatorRouting bool
} //ServerRunOptions 的結(jié)構(gòu)定義.
genericoptiions是options.go 中import的vendor下的option包的別名
genericoptions.ServerRunOptions
type ServerRunOptions struct {
AdvertiseAddress net.IP
CorsAllowedOriginList []string
ExternalHost string
MaxRequestsInFlight int
MaxMutatingRequestsInFlight int
MinRequestTimeout int
TargetRAMMB int
WatchCacheSizes []string
}
AdvertiseAddress apiserver 監(jiān)聽(tīng)的ip地址默認(rèn)為hostip
CoreAllowedOriginlist 允許訪問(wèn)的域可以使用正則限定
...
具體的各個(gè)參數(shù)的含義可以通過(guò)apiserver --help 查看
etcd etcd集群相關(guān)的參數(shù)
SecureServing 安全監(jiān)聽(tīng)的相關(guān)參數(shù)默認(rèn)初始化為0.0.0.0:6443
InsecureServing 默認(rèn)127.0.0.1:8080
...
- wait 中定義的包級(jí)變量NerverStop
var NeverStop <-chan struct{} = make(chan struct{})
在main函數(shù)中通過(guò)NewServerRunOptions使用默認(rèn)參數(shù)初始化一個(gè)ServerRunOptions實(shí)例的指針并傳遞給appPackage的Run函數(shù)
app.Run
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
//
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
if err != nil {
return err
}
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
if err != nil {
return err
}
// TPRs are enabled and not yet beta, since this these are the successor, they fall under the same enablement rule
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, runOptions)
if err != nil {
return err
}
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.EmptyDelegate)
if err != nil {
return err
}
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, sharedInformers, apiExtensionsConfig.CRDRESTOptionsGetter)
if err != nil {
return err
}
// if we're starting up a hacked up version of this API server for a weird test case,
// just start the API server as is because clients don't get built correctly when you do this
if len(os.Getenv("KUBE_API_VERSIONS")) > 0 {
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(kubeAPIServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return kubeAPIServer.GenericAPIServer.PrepareRun().Run(stopCh)
}
kubeAPIServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport)
if err != nil {
return err
}
aggregatorConfig.ProxyTransport = proxyTransport
aggregatorConfig.ServiceResolver = serviceResolver
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, sharedInformers, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return err
}
if insecureServingOptions != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh); err != nil {
return err
}
}
return aggregatorServer.GenericAPIServer.PrepareRun().Run(stopCh)
app.run 函數(shù)通過(guò)一系列的調(diào)用最終啟用一個(gè)sync loop 。
在經(jīng)過(guò)一系列初始化后先獲取環(huán)境變量KUBE_API_VERSIONS。如果設(shè)置了這個(gè)環(huán)境變量切insecureSeveringOptions不為nil。
- NonBlockingRun函數(shù)
kubeserver.NonBlockingRun(insecureServingOptions, insecureHandlerChain, stopCh)
然后NonBlockingRun又調(diào)用了同一個(gè)文件中的 serveInsecurely(insecureServingInfo, insecureHandler, internalStopCh)函數(shù),這里的insecureServingInfo和insecureHandle2個(gè)參數(shù)就是NonBlockingRun的前2參數(shù),internalStopCh是在NonBlockingRun中自己定義的一個(gè)struct{}channel,傳遞給insecureServingInfo只讀channel。
然后NonBlockingRun又啟動(dòng)一個(gè)goroutine.
go func() {
<-stopCh
close(internalStopCh)
}()
這里的stopCh是wait包里聲明的那個(gè)channel,由Run函數(shù)傳入。關(guān)閉的是NonBlockingRun函數(shù)傳遞給下面的函數(shù)的channel。
- serveInsecurely 函數(shù)
使用insecureServingInfo.BindAddress和insecureHandler實(shí)例化一個(gè)http.server結(jié)構(gòu)。然后調(diào)用server.RunServer(insecureServer, insecureServingInfo.BindNetwork, stopCh) RunServer函數(shù)位于k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/server/serve.go文件中注意這里的server.go 文件和cmd下的server.go 文件不是同一個(gè)。stopCh是在NonBlockingRun中定義的chan struct{}不是run函數(shù)傳遞進(jìn)來(lái)那個(gè)。 - RunServer 函數(shù)根據(jù)傳遞來(lái)的http.server 結(jié)構(gòu)中的Addr和network調(diào)用net.Listen 監(jiān)聽(tīng)。如果network為空設(shè)置為tcp,Addr空?qǐng)?bào)錯(cuò)退出。在RunServer中還定義了2個(gè)goroutine
go func() {
<-stopCh
ln.Close()
}()
上面這個(gè)go routine中如果stopCh解除阻塞后就關(guān)閉建立的tcp鏈接。注意這里的stopCh 是 NonBlockingRun make的 channel。RunServer 啟動(dòng)2個(gè)goroutine后直接返回serveInsecurely函數(shù)再返回NonBlockingRun函數(shù),在NonBlockingRun函數(shù)中如果返回錯(cuò)誤會(huì)close internalStopCh這channel,這樣就接觸了stopCh這個(gè)channel的阻塞。
go func() {
defer utilruntime.HandleCrash()
var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", tcpAddr.String())
select {
case <-stopCh:
glog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
這個(gè)goroutine利用剛RunServer的返回的Listener,生成一個(gè)tcpKeepAliveListener,然后再利用net/http 下的server.Server 建立一個(gè)https監(jiān)聽(tīng)。跟上一個(gè)goroutine一樣,這個(gè)goroutine在返回到NonBlockingRun中然后close channel后就接觸阻塞。并打印信息。