2. daemon詳解——APIServer

APIServer

Server接收用戶通過client發(fā)來的請(qǐng)求,按照路由規(guī)則分發(fā),交給后端處理完畢后將結(jié)果返回至client

APIServer

之前說到start()中的NewDaemon()是初始化Daemon的核心邏輯,APIServer的創(chuàng)建在start()中。

  • 首先調(diào)用NewAPIServerConfig(cli),根據(jù)DaemonCli的配置生成APIServer的配置文件,比如log、Version、TLS,并將Cli.Config.Hosts的值置為1,然后將serverConfig返回。之后如果發(fā)現(xiàn)建立config失敗,則輸出錯(cuò)誤信息,反之進(jìn)入下個(gè)環(huán)節(jié)。
  • 第二步調(diào)用apiserver.New(serverConfig)新建server實(shí)例,server中包含了config、router、HTTPServer等,新建之后填充了config的部分,其他仍未初始化。
  • 第三步調(diào)用loadListeners(),在這個(gè)函數(shù)里根據(jù)DaemonCli里的HOSTS,對(duì)每一個(gè)可用的host,解析出protocol和address,調(diào)用listeners.Init()與Daemon建立socket連接,分配一個(gè)Daemon的端口監(jiān)聽socket。即與每個(gè)可用的Daemon建立連接(如果你嘗試一個(gè)APIServer連接到多個(gè)daemon的話,是可行的)。
  • 到此為止,已經(jīng)初始化了apiserver,之后會(huì)為APIServer初始化router。
func newAPIServerConfig(cli *DaemonCli) (*apiserver.Config, error) {
    serverConfig := &apiserver.Config{
        Logging:     true,
        SocketGroup: cli.Config.SocketGroup,
        Version:     dockerversion.Version,
        CorsHeaders: cli.Config.CorsHeaders,
    }

    if cli.Config.TLS {
        tlsOptions := tlsconfig.Options{
            CAFile:             cli.Config.CommonTLSOptions.CAFile,
            CertFile:           cli.Config.CommonTLSOptions.CertFile,
            KeyFile:            cli.Config.CommonTLSOptions.KeyFile,
            ExclusiveRootPools: true,
        }

        if cli.Config.TLSVerify {
            // server requires and verifies client's certificate
            tlsOptions.ClientAuth = tls.RequireAndVerifyClientCert
        }
        tlsConfig, err := tlsconfig.Server(tlsOptions)
        if err != nil {
            return nil, err
        }
        serverConfig.TLSConfig = tlsConfig
    }

    if len(cli.Config.Hosts) == 0 {
        cli.Config.Hosts = make([]string, 1)
    }

    return serverConfig, nil
}

func New(cfg *Config) *Server {
    return &Server{
        cfg: cfg,
    }
}

// Server contains instance details for the server
type Server struct {
    cfg           *Config
    servers       []*HTTPServer
    routers       []router.Router
    routerSwapper *routerSwapper
    middlewares   []middleware.Middleware
}

func loadListeners(cli *DaemonCli, serverConfig *apiserver.Config) ([]string, error) {
    var hosts []string
    for i := 0; i < len(cli.Config.Hosts); i++ {
        var err error
        if cli.Config.Hosts[i], err = dopts.ParseHost(cli.Config.TLS, cli.Config.Hosts[i]); err != nil {
            return nil, fmt.Errorf("error parsing -H %s : %v", cli.Config.Hosts[i], err)
        }

        protoAddr := cli.Config.Hosts[i]
        protoAddrParts := strings.SplitN(protoAddr, "://", 2)
        if len(protoAddrParts) != 2 {
            return nil, fmt.Errorf("bad format %s, expected PROTO://ADDR", protoAddr)
        }

        proto := protoAddrParts[0]
        addr := protoAddrParts[1]

        // It's a bad idea to bind to TCP without tlsverify.
        if proto == "tcp" && (serverConfig.TLSConfig == nil || serverConfig.TLSConfig.ClientAuth != tls.RequireAndVerifyClientCert) {
            logrus.Warn("[!] DON'T BIND ON ANY IP ADDRESS WITHOUT setting --tlsverify IF YOU DON'T KNOW WHAT YOU'RE DOING [!]")
        }
        ls, err := listeners.Init(proto, addr, serverConfig.SocketGroup, serverConfig.TLSConfig)
        if err != nil {
            return nil, err
        }
        ls = wrapListeners(proto, ls)
        // If we're binding to a TCP port, make sure that a container doesn't try to use it.
        if proto == "tcp" {
            if err := allocateDaemonPort(addr); err != nil {
                return nil, err
            }
        }
        logrus.Debugf("Listener created for HTTP on %s (%s)", proto, addr)
        hosts = append(hosts, protoAddrParts[1])
        cli.api.Accept(addr, ls...)
    }

    return hosts, nil
}

router

初始化router的過程同樣在start()里。

  • 調(diào)用newRouterOptions()進(jìn)行參數(shù)的設(shè)置
  • 調(diào)用initRouter。建立了decoder進(jìn)行命令的解析,之后建立了多個(gè)router分別為不同的模塊服務(wù),比如checkpoint router, container router, image router, volume router, swarm router等等。接收消息后會(huì)根據(jù)消息的信令分配不同的router進(jìn)行處理。對(duì)于router的初始化都調(diào)用了NewRouter函數(shù)。比如container.NewRouter()里調(diào)用了r.initRoutes(),其中定義了router.NewGetRoute("/containers/json", r.getContainersJSON)這樣的配置,就為處理命令尋找到了相應(yīng)的文件。
  • 把所有的router初始化好,調(diào)用InitRouter。吐槽一下命名,只是i大小寫的區(qū)別都可以嗎!很容易看不懂的啊!大寫的比較牛逼,定義了API Server使用的main router,它通過調(diào)用CreateMux()中的makeHTTPHandler()來建立main router。這個(gè)函數(shù)的形參是responseWriter和Request,就非常好理解是干啥的了,它定義了context這個(gè)概念,這是一個(gè)全局的變量,然后通過handlerWithGlobalMiddlewares()定義了handlerFunc來處理request,通過WrapHandler()把回調(diào)函數(shù)注冊(cè)進(jìn)去了,之后通過mux.Vars()獲取了request相關(guān)的router的信息,于是請(qǐng)求的處理過程真正是通過handlerFunc(ctx, w, r, vars)實(shí)現(xiàn)的。
func (s *Server) makeHTTPHandler(handler httputils.APIFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // Define the context that we'll pass around to share info
        // like the docker-request-id.
        //
        // The 'context' will be used for global data that should
        // apply to all requests. Data that is specific to the
        // immediate function being called should still be passed
        // as 'args' on the function call.

        // use intermediate variable to prevent "should not use basic type
        // string as key in context.WithValue" golint errors
        var ki interface{} = dockerversion.UAStringKey
        ctx := context.WithValue(context.Background(), ki, r.Header.Get("User-Agent"))
        handlerFunc := s.handlerWithGlobalMiddlewares(handler)

        vars := mux.Vars(r)
        if vars == nil {
            vars = make(map[string]string)
        }

        if err := handlerFunc(ctx, w, r, vars); err != nil {
            statusCode := httputils.GetHTTPErrorStatusCode(err)
            if statusCode >= 500 {
                logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
            }
            httputils.MakeErrorHandler(err)(w, r)
        }
    }
}

func initRouter(opts routerOptions) {
    decoder := runconfig.ContainerDecoder{}

    routers := []router.Router{
        // we need to add the checkpoint router before the container router or the DELETE gets masked
        checkpointrouter.NewRouter(opts.daemon, decoder),
        container.NewRouter(opts.daemon, decoder),
        image.NewRouter(opts.daemon.ImageService()),
        systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildCache, opts.buildkit),
        volume.NewRouter(opts.daemon.VolumesService()),
        build.NewRouter(opts.buildBackend, opts.daemon),
        sessionrouter.NewRouter(opts.sessionManager),
        swarmrouter.NewRouter(opts.cluster),
        pluginrouter.NewRouter(opts.daemon.PluginManager()),
        distributionrouter.NewRouter(opts.daemon.ImageService()),
    }

    if opts.daemon.NetworkControllerEnabled() {
        routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))
    }

    if opts.daemon.HasExperimental() {
        for _, r := range routers {
            for _, route := range r.Routes() {
                if experimental, ok := route.(router.ExperimentalRoute); ok {
                    experimental.Enable()
                }
            }
        }
    }

    opts.api.InitRouter(routers...)
}

middleware

那么,還需要關(guān)注一下middleware。它的初始化在start中的initMiddlewares(),形參用到了cli.api、serverConfig、pluginStore。

pluginStore在上一行的plugin.NewStore()完成初始化,它返回一個(gè)Store對(duì)象,其中最重要的是定義了注冊(cè)回調(diào)函數(shù)的接口

/* handlers are necessary for transition path of legacy plugins
* to the new model. Legacy plugins use Handle() for registering an
* activation callback.*/
handlers map[string][]func(string, *plugins.Client)

initMiddlewares定義了不同的中間件,包括:

  • NewExperimentalMiddleware: 返回一個(gè)experimental string(true或者false)標(biāo)志實(shí)驗(yàn)性功能是否可用
  • NewVersionMiddleware: 返回version信息,包括server、default、minVersion
  • NewCORSMiddleware: 不清楚是什么
  • authzMiddleware: authorizationPlugin is an internal adapter to docker plugin system,有更具體的解釋嗎?
// TODO: remove this from cli and return the authzMiddleware
func (cli *DaemonCli) initMiddlewares(s *apiserver.Server, cfg *apiserver.Config, pluginStore plugingetter.PluginGetter) error {
    v := cfg.Version

    exp := middleware.NewExperimentalMiddleware(cli.Config.Experimental)
    s.UseMiddleware(exp)

    vm := middleware.NewVersionMiddleware(v, api.DefaultVersion, api.MinVersion)
    s.UseMiddleware(vm)

    if cfg.CorsHeaders != "" {
        c := middleware.NewCORSMiddleware(cfg.CorsHeaders)
        s.UseMiddleware(c)
    }

    cli.authzMiddleware = authorization.NewMiddleware(cli.Config.AuthorizationPlugins, pluginStore)
    cli.Config.AuthzMiddleware = cli.authzMiddleware
    s.UseMiddleware(cli.authzMiddleware)
    return nil
}

重新回到上面的handlerWithGlobalMiddlewares()函數(shù),它內(nèi)部有一個(gè)循環(huán),遍歷了使用的middlewares,對(duì)每個(gè)middleware調(diào)用WrapHandler()

Middleware的作用docker的解釋是:

// Middleware is an interface to allow the use of ordinary functions as Docker API filters.
// Any struct that has the appropriate signature can be registered as a middleware.

WrapHandler對(duì)于不同的middleware有不同的實(shí)現(xiàn)版本,它的原型是

type Middleware interface {
    WrapHandler(func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error) func(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error
}

開始提供服務(wù)

在創(chuàng)建好APIServer,連接上Daemon,創(chuàng)建、設(shè)置好Router之后,APIServer就可以向外提供服務(wù)了,它提供的是一個(gè)HTTP Server之上標(biāo)準(zhǔn)的RESTful-API,對(duì)用戶非常友好。
APIServer的真正運(yùn)行采用go routine的方式,go cli.api.wait()真正啟動(dòng)了APIServer。采用go routine的原因是:如果APIServer出錯(cuò),daemon會(huì)隨之退出,采用go routine可以在出錯(cuò)的時(shí)候嘗試重啟,保證docker的正常運(yùn)行。在wait函數(shù)中調(diào)用了serveAPI(),由于之前可能和多個(gè)Daemon建立了連接,所以對(duì)每個(gè)HTTP server,調(diào)用Serve()函數(shù)。
Serve函數(shù)為每個(gè)client的連接建立goroutine的服務(wù),服務(wù)會(huì)解析request,調(diào)用handler,返回結(jié)果。
該函數(shù)并不是docker開發(fā)者自己寫的,而是調(diào)用了GO語言的net/http庫。大概里面的函數(shù)有:readRequesthandler := sh.srv.Handler,ServeHTTP(ResponseWriter, *Request)等。由于是系統(tǒng)庫,所以先不看了。

// serveAPI loops through all initialized servers and spawns goroutine
// with Serve method for each. It sets createMux() as Handler also.
func (s *Server) serveAPI() error {
    var chErrors = make(chan error, len(s.servers))
    for _, srv := range s.servers {
        srv.srv.Handler = s.routerSwapper
        go func(srv *HTTPServer) {
            var err error
            logrus.Infof("API listen on %s", srv.l.Addr())
            if err = srv.Serve(); err != nil && strings.Contains(err.Error(), "use of closed network connection") {
                err = nil
            }
            chErrors <- err
        }(srv)
    }

    for range s.servers {
        err := <-chErrors
        if err != nil {
            return err
        }
    }
    return nil
}

總結(jié)

附上start中的高層流程:

func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
    serverConfig, err := newAPIServerConfig(cli)
    if err != nil {
        return fmt.Errorf("Failed to create API server: %v", err)
    }
    cli.api = apiserver.New(serverConfig)

    hosts, err := loadListeners(cli, serverConfig)
    if err != nil {
        return fmt.Errorf("Failed to load listeners: %v", err)
    }
...
    pluginStore := plugin.NewStore()
    if err := cli.initMiddlewares(cli.api, serverConfig, pluginStore); err != nil {
        logrus.Fatalf("Error creating middlewares: %v", err)
    }
...
    routerOptions, err := newRouterOptions(cli.Config, d)
    if err != nil {
        return err
    }
    routerOptions.api = cli.api
    routerOptions.cluster = c

    initRouter(routerOptions)
...
    serveAPIWait := make(chan error)
    go cli.api.Wait(serveAPIWait)
...
    errAPI := <-serveAPIWait

APIServer是daemon和docker client通信的接口,在daemon的初始化流程中優(yōu)先級(jí)非常高。通過初始化apiserver實(shí)例、router實(shí)例、middleware實(shí)例,系統(tǒng)已經(jīng)為APIServer提供了完整的運(yùn)行環(huán)境。Middleware為在docker和用戶之間做了一層隔離,為用戶提供標(biāo)準(zhǔn)的接口,通過middleware,回調(diào)函數(shù)被注冊(cè)進(jìn)router中。
真正運(yùn)行時(shí),采用goroutine的方式保證了保證了daemon的穩(wěn)定運(yùn)行,當(dāng)request到達(dá)APIServer中后,APIServer通過main router進(jìn)行查找,調(diào)用相應(yīng)的router,也就執(zhí)行了相應(yīng)的回調(diào)函數(shù)的調(diào)用,從而實(shí)現(xiàn)了對(duì)request的處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,564評(píng)論 19 139
  • Refer to: www.threemeal.com/blog/12/ 中間件 中間件是一個(gè)鉤子框架,它們可以介...
    蘭山小亭閱讀 16,784評(píng)論 9 164
  • 【公司】浙江康意潔具有限公司 【姓名】景桃桃 【組別】235期六項(xiàng)精進(jìn)【樂觀二組】 【日精進(jìn)打卡第013天】 【知...
    景桃桃閱讀 169評(píng)論 0 0
  • 榮辱兮禍福兮 幸得紅木一般的親人 亦得青瓷一般的友人 愿得白玉一般的戀人 與眾生共此生
    后來居士閱讀 296評(píng)論 0 1
  • 寫在前面:最近參加了好好花錢的年終總結(jié)征文比賽,經(jīng)過一個(gè)多月的角逐和評(píng)審,我最終獲得了一等獎(jiǎng),開心~撒花~ 現(xiàn)在將...
    查蘇bobo閱讀 504評(píng)論 0 2

友情鏈接更多精彩內(nèi)容