tornado流程第二次分析

先上代碼例子:

import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web

from tornado.options import define, options
define("port", default=8000, help="run on the given port", type=int)

class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        greeting = self.get_argument('greeting', 'Hello')
        self.write(greeting + ', friendly user!')

if __name__ == "__main__":
    tornado.options.parse_command_line()
    app = tornado.web.Application(handlers=[(r"/", IndexHandler)])
    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

關(guān)于tornado,它既是web服務(wù)器(看成nginx服務(wù)),又是web框架(看成一個(gè)wsgi程序),不過(guò)官方推薦兩個(gè)一起用才能發(fā)揮最大功效。所以不管是tornado服務(wù)器,加上其他的wsgi程序(比如django), 還是nginx + WSGI + tornado框架都不能很好的發(fā)揮tornado的能力。

基于上面的代碼,程序都是按照順序執(zhí)行,那么入口一般都是最后一個(gè),所以這里也就是start(),先了解start前的準(zhǔn)備工作,start之前最近的準(zhǔn)備工作listen()函數(shù),那先看listen做了什么?

    def listen(self, port, address=""):
        """Starts accepting connections on the given port.

        This method may be called more than once to listen on multiple ports.
        `listen` takes effect immediately; it is not necessary to call
        `TCPServer.start` afterwards.  It is, however, necessary to start
        the `.IOLoop`.
        """
        sockets = bind_sockets(port, address=address)
        self.add_sockets(sockets)

bind_sockets函數(shù)作用其實(shí)就是socket編程過(guò)程中的前2步組合(創(chuàng)建socket,綁定socket到對(duì)應(yīng)的端口),然后再看self.add_sockets(sockets)

    def add_sockets(self, sockets):
        """Makes this server start accepting connections on the given sockets.

        The ``sockets`` parameter is a list of socket objects such as
        those returned by `~tornado.netutil.bind_sockets`.
        `add_sockets` is typically used in combination with that
        method and `tornado.process.fork_processes` to provide greater
        control over the initialization of a multi-process server.
        """
        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            self._handlers[sock.fileno()] = add_accept_handler(
                sock, self._handle_connection)

這里重點(diǎn)關(guān)注self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),其實(shí)他的作用就是把這個(gè)socket加入ioloop的循環(huán)隊(duì)列,當(dāng)ioloop監(jiān)聽(tīng)到這個(gè)socket有新的連接請(qǐng)求的時(shí)候(READ事件)就調(diào)用回調(diào)函數(shù)self._handle_connection去處理。add_accept_handler函數(shù)的源碼如下:

def add_accept_handler(sock, callback):
    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    `.IOLoop` handlers.

    A callable is returned which, when called, will remove the `.IOLoop`
    event handler and stop processing further incoming connections.

    .. versionchanged:: 5.0
       The ``io_loop`` argument (deprecated since version 4.1) has been removed.

    .. versionchanged:: 5.0
       A callable is returned (``None`` was returned before).
    """
    io_loop = IOLoop.current()
    removed = [False]

    def accept_handler(fd, events):
        # More connections may come in while we're handling callbacks;
        # to prevent starvation of other tasks we must limit the number
        # of connections we accept at a time.  Ideally we would accept
        # up to the number of connections that were waiting when we
        # entered this method, but this information is not available
        # (and rearranging this method to call accept() as many times
        # as possible before running any callbacks would have adverse
        # effects on load balancing in multiprocess configurations).
        # Instead, we use the (default) listen backlog as a rough
        # heuristic for the number of connections we can reasonably
        # accept at once.
        for i in xrange(_DEFAULT_BACKLOG):
            if removed[0]:
                # The socket was probably closed
                return
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK indicate we have accepted every
                # connection that is available.
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED indicates that there was a connection
                # but it was closed while still in the accept queue.
                # (observed on FreeBSD).
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            set_close_exec(connection.fileno())
            callback(connection, address)

    def remove_handler():
        io_loop.remove_handler(sock)
        removed[0] = True

    io_loop.add_handler(sock, accept_handler, IOLoop.READ)
    return remove_handler

重點(diǎn)先關(guān)注這一句:io_loop.add_handler(sock, accept_handler, IOLoop.READ),也就是將監(jiān)聽(tīng)了端口的socket加到ioloop的循環(huán)隊(duì)列中,監(jiān)聽(tīng)事件為IOLoop.READ,并有可讀事件發(fā)生(比如http請(qǐng)求)時(shí)執(zhí)行回調(diào)accept_handler,而accept_handler函數(shù)的作用就是處理socket.accept接收到的socket,然后用回調(diào)函數(shù)callback處理socket,那么這里的callback函數(shù)是啥呢?就是add_accept_handler函數(shù)的參數(shù),那再回到上面分析的add_sockets函數(shù)中self._handlers[sock.fileno()] = add_accept_handler(sock, self._handle_connection),所以callback其實(shí)就是self._handle_connection啦,也就是該端口的所有的socket請(qǐng)求都是由self._handle_connection去處理的。簡(jiǎn)化其處理流程,其實(shí)就是:
tornado/tcpserver.py中的TcpServer(代碼中的HttpServer繼承了TcpServer)類(lèi)的listen函數(shù)將監(jiān)聽(tīng)端口的socket加到了ioloop循環(huán)中,監(jiān)聽(tīng)事件為可讀,當(dāng)有讀的事件發(fā)生時(shí),執(zhí)行self._handle_connection回調(diào)。
接下來(lái)看回調(diào)函數(shù)self._handle_connection。

    def _handle_connection(self, connection, address):
        if self.ssl_options is not None:
            assert ssl, "Python 2.6+ and OpenSSL required for SSL"
            try:
                connection = ssl_wrap_socket(connection,
                                             self.ssl_options,
                                             server_side=True,
                                             do_handshake_on_connect=False)
            except ssl.SSLError as err:
                if err.args[0] == ssl.SSL_ERROR_EOF:
                    return connection.close()
                else:
                    raise
            except socket.error as err:
                # If the connection is closed immediately after it is created
                # (as in a port scan), we can get one of several errors.
                # wrap_socket makes an internal call to getpeername,
                # which may return either EINVAL (Mac OS X) or ENOTCONN
                # (Linux).  If it returns ENOTCONN, this error is
                # silently swallowed by the ssl module, so we need to
                # catch another error later on (AttributeError in
                # SSLIOStream._do_ssl_handshake).
                # To test this behavior, try nmap with the -sT flag.
                # https://github.com/tornadoweb/tornado/pull/750
                if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL):
                    return connection.close()
                else:
                    raise
        try:
            if self.ssl_options is not None:
                stream = SSLIOStream(connection,
                                     max_buffer_size=self.max_buffer_size,
                                     read_chunk_size=self.read_chunk_size)
            else:
                stream = IOStream(connection,
                                  max_buffer_size=self.max_buffer_size,
                                  read_chunk_size=self.read_chunk_size)

            future = self.handle_stream(stream, address)
            if future is not None:
                IOLoop.current().add_future(gen.convert_yielded(future),
                                            lambda f: f.result())
        except Exception:
            app_log.error("Error in connection callback", exc_info=True)

其實(shí)就是創(chuàng)建讀取socket數(shù)據(jù)的stream,然后調(diào)用self.handle_stream函數(shù)去處理stream。TcpServer類(lèi)中的self.handle_stream是一個(gè)虛函數(shù),需要繼承它的類(lèi)自己去實(shí)現(xiàn),其代碼如下:

    def handle_stream(self, stream, address):
        """Override to handle a new `.IOStream` from an incoming connection.

        This method may be a coroutine; if so any exceptions it raises
        asynchronously will be logged. Accepting of incoming connections
        will not be blocked by this coroutine.

        If this `TCPServer` is configured for SSL, ``handle_stream``
        may be called before the SSL handshake has completed. Use
        `.SSLIOStream.wait_for_handshake` if you need to verify the client's
        certificate or use NPN/ALPN.

        .. versionchanged:: 4.2
           Added the option for this method to be a coroutine.
        """
        raise NotImplementedError()

這樣設(shè)計(jì)也更加靈活,不同的上層協(xié)議有自己不同的處理,比如http或者ftp。開(kāi)頭的代碼例子中用的是HttpServer,所以這里關(guān)注HttpServer類(lèi)的handle_stream函數(shù)

    def handle_stream(self, stream, address):
        context = _HTTPRequestContext(stream, address,
                                      self.protocol,
                                      self.trusted_downstream)
        conn = HTTP1ServerConnection(
            stream, self.conn_params, context)
        self._connections.add(conn)
        conn.start_serving(self)

這里的處理其實(shí)也就是,根據(jù)獲取到的數(shù)據(jù)新建上下文信息,創(chuàng)建一個(gè)新的http連接服務(wù),然后將連接記錄到httpserver對(duì)象中,以便在關(guān)閉服務(wù)或者連接的時(shí)候找到這個(gè)連接對(duì)象。后續(xù)的操作都是對(duì)應(yīng)這個(gè)HTTP1ServerConnection對(duì)象,一個(gè)http請(qǐng)求會(huì)有一個(gè)這樣的HTTP1ServerConnection對(duì)象。接下來(lái)看看conn.start_serving(self)都做了什么。

    def start_serving(self, delegate):
        """Starts serving requests on this connection.

        :arg delegate: a `.HTTPServerConnectionDelegate`
        """
        assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
        self._serving_future = self._server_request_loop(delegate)
        # Register the future on the IOLoop so its errors get logged.
        self.stream.io_loop.add_future(self._serving_future,
                                       lambda f: f.result())

簡(jiǎn)單來(lái)說(shuō)就是根據(jù)HTTP1ServerConnection的信息,生成一個(gè)HTTP1Connection,并在循環(huán)中處理這個(gè)HTTP1Connection,這個(gè)循環(huán)用一個(gè)新的協(xié)程去處理,然后不斷的接收stream的數(shù)據(jù),然后處理stream的數(shù)據(jù)。具體見(jiàn)self._server_request_loop代碼

    @gen.coroutine
    def _server_request_loop(self, delegate):
        try:
            while True:
                conn = HTTP1Connection(self.stream, False,
                                       self.params, self.context)
                request_delegate = delegate.start_request(self, conn)
                try:
                    ret = yield conn.read_response(request_delegate)
                except (iostream.StreamClosedError,
                        iostream.UnsatisfiableReadError):
                    return
                except _QuietException:
                    # This exception was already logged.
                    conn.close()
                    return
                except Exception:
                    gen_log.error("Uncaught exception", exc_info=True)
                    conn.close()
                    return
                if not ret:
                    return
                yield gen.moment
        finally:
            delegate.on_close(self)

關(guān)鍵點(diǎn)在于delegate.start_request函數(shù)。這個(gè)delegate是HTTPServer類(lèi)中handle_stream函數(shù)中的conn.start_serving(self)中的self,所以self也就是HTTPServer類(lèi),所以看HTTPServer中的start_request就好了。

    def start_request(self, server_conn, request_conn):
        if isinstance(self.request_callback, httputil.HTTPServerConnectionDelegate):
            delegate = self.request_callback.start_request(server_conn, request_conn)
        else:
            delegate = _CallableAdapter(self.request_callback, request_conn)

        if self.xheaders:
            delegate = _ProxyAdapter(delegate, request_conn)

        return delegate

start_request函數(shù)中的self.request_callback就是例子中Application,這個(gè)在HTTPServer的init函數(shù)中可以看到。Application是Application(ReversibleRouter),繼承自Router(httputil.HTTPServerConnectionDelegate),Router繼承自httputil.HTTPServerConnectionDelegate,所以這里會(huì)走delegate = self.request_callback.start_request(server_conn, request_conn)這里,所以也就是走Application類(lèi)的start_request函數(shù)。tornado/web.py中的Application類(lèi)是沒(méi)有start_request函數(shù)的,所以這里去它的父類(lèi)中尋找,在Router類(lèi)中可以找到

class Router(httputil.HTTPServerConnectionDelegate):
    """Abstract router interface."""

    def find_handler(self, request, **kwargs):
        # type: (httputil.HTTPServerRequest, typing.Any)->httputil.HTTPMessageDelegate
        """Must be implemented to return an appropriate instance of `~.httputil.HTTPMessageDelegate`
        that can serve the request.
        Routing implementations may pass additional kwargs to extend the routing logic.

        :arg httputil.HTTPServerRequest request: current HTTP request.
        :arg kwargs: additional keyword arguments passed by routing implementation.
        :returns: an instance of `~.httputil.HTTPMessageDelegate` that will be used to
            process the request.
        """
        raise NotImplementedError()

    def start_request(self, server_conn, request_conn):
        return _RoutingDelegate(self, server_conn, request_conn)

那么到這里,再回看_server_request_loop函數(shù),request_delegate = delegate.start_request(self, conn)調(diào)用鏈已經(jīng)返回,request_delegate就是_RoutingDelegate類(lèi)的實(shí)例。

    @gen.coroutine
    def _server_request_loop(self, delegate):
        try:
            while True:
                conn = HTTP1Connection(self.stream, False,
                                       self.params, self.context)
                request_delegate = delegate.start_request(self, conn)
                try:
                    ret = yield conn.read_response(request_delegate)
                except (iostream.StreamClosedError,
                        iostream.UnsatisfiableReadError):
                    return
                except _QuietException:
                    # This exception was already logged.
                    conn.close()
                    return
                except Exception:
                    gen_log.error("Uncaught exception", exc_info=True)
                    conn.close()
                    return
                if not ret:
                    return
                yield gen.moment
        finally:
            delegate.on_close(self)

函數(shù)中的request_delegate = delegate.start_request(self, conn),就表明了request_delegate 是一個(gè)_RoutingDelegate對(duì)象,接下來(lái)分析ret = yield conn.read_response(request_delegate),在這之前,先看一下_RoutingDelegate類(lèi)的定義。

_RoutingDelegate類(lèi)的定義如下:

class _RoutingDelegate(httputil.HTTPMessageDelegate):
    def __init__(self, router, server_conn, request_conn):
        self.server_conn = server_conn
        self.request_conn = request_conn
        self.delegate = None
        self.router = router  # type: Router

    def headers_received(self, start_line, headers):
        request = httputil.HTTPServerRequest(
            connection=self.request_conn,
            server_connection=self.server_conn,
            start_line=start_line, headers=headers)

        self.delegate = self.router.find_handler(request) #這里的delegate是_HandlerDelegate
        if self.delegate is None:
            app_log.debug("Delegate for %s %s request not found",
                          start_line.method, start_line.path)
            self.delegate = _DefaultMessageDelegate(self.request_conn)

        return self.delegate.headers_received(start_line, headers) 

    def data_received(self, chunk):
        return self.delegate.data_received(chunk)

    def finish(self):
        self.delegate.finish()

    def on_connection_close(self):
        self.delegate.on_connection_close()

在了解了request_delegate就是_RoutingDelegate實(shí)例,所以接下來(lái)看看conn.read_response對(duì)request_delegate做了什么?

    def read_response(self, delegate):
        """Read a single HTTP response.

        Typical client-mode usage is to write a request using `write_headers`,
        `write`, and `finish`, and then call ``read_response``.

        :arg delegate: a `.HTTPMessageDelegate`

        Returns a `.Future` that resolves to None after the full response has
        been read.
        """
        if self.params.decompress:
            delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
        return self._read_message(delegate)

_GzipMessageDelegate類(lèi)其實(shí)就是對(duì)delegate,也就是_RoutingDelegate類(lèi)的封裝。不管怎么樣,看_read_message(delegate)做了什么處理就好了。

    @gen.coroutine
    def _read_message(self, delegate):
        need_delegate_close = False
        try:
            header_future = self.stream.read_until_regex(
                b"\r?\n\r?\n",
                max_bytes=self.params.max_header_size)
            if self.params.header_timeout is None:
                header_data = yield header_future
            else:
                try:
                    header_data = yield gen.with_timeout(
                        self.stream.io_loop.time() + self.params.header_timeout,
                        header_future,
                        quiet_exceptions=iostream.StreamClosedError)
                except gen.TimeoutError:
                    self.close()
                    raise gen.Return(False)
            start_line, headers = self._parse_headers(header_data)
            if self.is_client:
                start_line = httputil.parse_response_start_line(start_line)
                self._response_start_line = start_line
            else:
                start_line = httputil.parse_request_start_line(start_line)
                self._request_start_line = start_line
                self._request_headers = headers

            self._disconnect_on_finish = not self._can_keep_alive(
                start_line, headers)
            need_delegate_close = True
            with _ExceptionLoggingContext(app_log):
                header_future = delegate.headers_received(start_line, headers)
                if header_future is not None:
                    yield header_future
            if self.stream is None:
                # We've been detached.
                need_delegate_close = False
                raise gen.Return(False)
            skip_body = False
            if self.is_client:
                if (self._request_start_line is not None and
                        self._request_start_line.method == 'HEAD'):
                    skip_body = True
                code = start_line.code
                if code == 304:
                    # 304 responses may include the content-length header
                    # but do not actually have a body.
                    # http://tools.ietf.org/html/rfc7230#section-3.3
                    skip_body = True
                if code >= 100 and code < 200:
                    # 1xx responses should never indicate the presence of
                    # a body.
                    if ('Content-Length' in headers or
                            'Transfer-Encoding' in headers):
                        raise httputil.HTTPInputError(
                            "Response code %d cannot have body" % code)
                    # TODO: client delegates will get headers_received twice
                    # in the case of a 100-continue.  Document or change?
                    yield self._read_message(delegate)
            else:
                if (headers.get("Expect") == "100-continue" and
                        not self._write_finished):
                    self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
            if not skip_body:
                body_future = self._read_body(
                    start_line.code if self.is_client else 0, headers, delegate)
                if body_future is not None:
                    if self._body_timeout is None:
                        yield body_future
                    else:
                        try:
                            yield gen.with_timeout(
                                self.stream.io_loop.time() + self._body_timeout,
                                body_future,
                                quiet_exceptions=iostream.StreamClosedError)
                        except gen.TimeoutError:
                            gen_log.info("Timeout reading body from %s",
                                         self.context)
                            self.stream.close()
                            raise gen.Return(False)
            self._read_finished = True
            if not self._write_finished or self.is_client:
                need_delegate_close = False
                with _ExceptionLoggingContext(app_log):
                    delegate.finish()
            # If we're waiting for the application to produce an asynchronous
            # response, and we're not detached, register a close callback
            # on the stream (we didn't need one while we were reading)
            if (not self._finish_future.done() and
                    self.stream is not None and
                    not self.stream.closed()):
                self.stream.set_close_callback(self._on_connection_close)
                yield self._finish_future
            if self.is_client and self._disconnect_on_finish:
                self.close()
            if self.stream is None:
                raise gen.Return(False)
        except httputil.HTTPInputError as e:
            gen_log.info("Malformed HTTP message from %s: %s",
                         self.context, e)
            if not self.is_client:
                yield self.stream.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
            self.close()
            raise gen.Return(False)
        finally:
            if need_delegate_close:
                with _ExceptionLoggingContext(app_log):
                    delegate.on_connection_close()
            header_future = None
            self._clear_callbacks()
        raise gen.Return(True)

這個(gè)函數(shù)內(nèi)容很多,在這里面我們可以找到_read_message(delegate)中對(duì)delegate.headers_received和delegate.finish的調(diào)用,前面提到這里的delegate是_RoutingDelegate,再次回顧一下_RoutingDelegate的源碼:

class _RoutingDelegate(httputil.HTTPMessageDelegate):
    def __init__(self, router, server_conn, request_conn):
        self.server_conn = server_conn
        self.request_conn = request_conn
        self.delegate = None
        self.router = router  # type: Router

    def headers_received(self, start_line, headers):
        request = httputil.HTTPServerRequest(
            connection=self.request_conn,
            server_connection=self.server_conn,
            start_line=start_line, headers=headers)

        self.delegate = self.router.find_handler(request) #這里的delegate是_HandlerDelegate
        if self.delegate is None:
            app_log.debug("Delegate for %s %s request not found",
                          start_line.method, start_line.path)
            self.delegate = _DefaultMessageDelegate(self.request_conn)

        return self.delegate.headers_received(start_line, headers) 

    def data_received(self, chunk):
        return self.delegate.data_received(chunk)

    def finish(self):
        self.delegate.finish()

    def on_connection_close(self):
        self.delegate.on_connection_close()

_RoutingDelegate中的headers_received函數(shù)會(huì)將收到的請(qǐng)求信息封裝成一個(gè)request對(duì)象,并且在headers_received函數(shù)在最后會(huì)調(diào)用通過(guò) self.router.find_handler(request)返回的對(duì)象self.delegate,進(jìn)而調(diào)用self.delegate的headers_received函數(shù)和data_received函數(shù)。那么這里的self.router.find_hander函數(shù)返回的self.delegate到底是什么呢?接著往下面看。
然后關(guān)注self.delegate = self.router.find_handler(request),這里的self.router其實(shí)就是Application,所以看看Application的find_handler函數(shù)做了什么?

    def find_handler(self, request, **kwargs):
        route = self.default_router.find_handler(request)
        if route is not None:
            return route

        if self.settings.get('default_handler_class'):
            return self.get_handler_delegate(
                request,
                self.settings['default_handler_class'],
                self.settings.get('default_handler_args', {}))

        return self.get_handler_delegate(
            request, ErrorHandler, {'status_code': 404})

其實(shí)就是從Application的self.default_router中找出處理request請(qǐng)求的handler,這里的self.default_router在Application的init函數(shù)中有實(shí)現(xiàn):

        self.default_router = _ApplicationRouter(self, [
            Rule(AnyMatches(), self.wildcard_router)
        ])

但是從_ApplicationRouter類(lèi)中并沒(méi)有找到find_handler函數(shù),所以去它的父類(lèi)中找,在class RuleRouter(Router):中可以找到find_handler函數(shù)的定義:

    def find_handler(self, request, **kwargs):
        for rule in self.rules:
            target_params = rule.matcher.match(request)
            if target_params is not None:
                if rule.target_kwargs:
                    target_params['target_kwargs'] = rule.target_kwargs

                delegate = self.get_target_delegate(
                    rule.target, request, **target_params)

                if delegate is not None:
                    return delegate

        return None

首先這里的self.rules是什么?self.rules是經(jīng)過(guò)self.add_rules(rules)處理的,self.rules = [ self.wildcard_router],所以 rule.target = self.wildcard_router。此時(shí)的rule.matcher是AnyMachers,所以target_params是空{(diào)}不等于None,所以執(zhí)行delegate = self.get_target_delegate,這時(shí)候的self.get_target_delegate函數(shù)是class _ApplicationRouter(ReversibleRuleRouter):類(lèi)中的get_target_delegate函數(shù),而不是父類(lèi)RuleRouter中的(繼承的時(shí)候,子類(lèi)同名函數(shù)覆蓋父類(lèi)的),所以看class _ApplicationRouter(ReversibleRuleRouter)類(lèi)中的get_target_delegate函數(shù)。

    def get_target_delegate(self, target, request, **target_params):
        if isclass(target) and issubclass(target, RequestHandler):
            return self.application.get_handler_delegate(request, target, **target_params)

        return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)

這里的target是self.wildcard_router,是一個(gè)_ApplicationRouter類(lèi)的實(shí)例,所以走了super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params),也就是調(diào)用RuleRouter的get_target_delegate

    def get_target_delegate(self, target, request, **target_params):
        """Returns an instance of `~.httputil.HTTPMessageDelegate` for a
        Rule's target. This method is called by `~.find_handler` and can be
        extended to provide additional target types.

        :arg target: a Rule's target.
        :arg httputil.HTTPServerRequest request: current request.
        :arg target_params: additional parameters that can be useful
            for `~.httputil.HTTPMessageDelegate` creation.
        """
        if isinstance(target, Router):
            return target.find_handler(request, **target_params)

        elif isinstance(target, httputil.HTTPServerConnectionDelegate):
            return target.start_request(request.server_connection, request.connection)

        elif callable(target):
            return _CallableAdapter(
                partial(target, **target_params), request.connection
            )

        return None

這里的target是self.wildcard_router = _ApplicationRouter(self, handlers),也還是_ApplicationRouter,他是Router的子類(lèi),所以直接走了第一個(gè)target.find_handler(request, **target_params),這里的target.find_handler還是上面的_ApplicationRouter的find_handler。但是self.wildcard_router = _ApplicationRouter(self, handlers)不一樣的是,他的參數(shù)rules不再是Rule對(duì)象,而是一個(gè)用戶(hù)自己定義的RequestHandler列表(就是用戶(hù)自定義的路由),所以在_ApplicationRouter的父類(lèi)中class RuleRouter(Router):中,用戶(hù)自定義的路由handlers經(jīng)過(guò)self.add_rules(rules)處理的得到self.rules(上面的self.rules = [Rule(AnyMatches(), self.wildcard_router)]也是一樣)。

class RuleRouter(Router):
    """Rule-based router implementation."""

    def __init__(self, rules=None):
        """Constructs a router from an ordered list of rules::

            RuleRouter([
                Rule(PathMatches("/handler"), Target),
                # ... more rules
            ])

        You can also omit explicit `Rule` constructor and use tuples of arguments::

            RuleRouter([
                (PathMatches("/handler"), Target),
            ])

        `PathMatches` is a default matcher, so the example above can be simplified::

            RuleRouter([
                ("/handler", Target),
            ])

        In the examples above, ``Target`` can be a nested `Router` instance, an instance of
        `~.httputil.HTTPServerConnectionDelegate` or an old-style callable,
        accepting a request argument.

        :arg rules: a list of `Rule` instances or tuples of `Rule`
            constructor arguments.
        """
        self.rules = []  # type: typing.List[Rule]
        if rules:
            self.add_rules(rules)

    def add_rules(self, rules):
        """Appends new rules to the router.

        :arg rules: a list of Rule instances (or tuples of arguments, which are
            passed to Rule constructor).
        """
        for rule in rules:
            if isinstance(rule, (tuple, list)):
                assert len(rule) in (2, 3, 4)
                if isinstance(rule[0], basestring_type):
                    rule = Rule(PathMatches(rule[0]), *rule[1:])
                else:
                    rule = Rule(*rule)

            self.rules.append(self.process_rule(rule))

在add_rules函數(shù)里面,這時(shí)候的rules就是我們定義的一個(gè)路由handlers,而for循環(huán)的rule是一個(gè)元組,即開(kāi)頭給的代碼樣例中的app = tornado.web.Application(handlers=[(r"/", IndexHandler)])中的(r"/", IndexHandler),rule[0]是一個(gè)路徑字符串,所以最后的執(zhí)行rule = Rule(PathMatches(rule[0]), *rule[1:]),這時(shí)候rule.targe就成了*rule[1:],也就是自己寫(xiě)的IndexHandler這個(gè)基于RequestHandler的類(lèi)。這時(shí)候再回到_ApplicationRouter的find_handler函數(shù),這一次的rule.matcher是PatchMatches,所以看看PatchMatches的match函數(shù)。

class PathMatches(Matcher):
    """Matches requests with paths specified by ``path_pattern`` regex."""

    def __init__(self, path_pattern):
        if isinstance(path_pattern, basestring_type):
            if not path_pattern.endswith('$'):
                path_pattern += '$'
            self.regex = re.compile(path_pattern)
        else:
            self.regex = path_pattern

        assert len(self.regex.groupindex) in (0, self.regex.groups), \
            ("groups in url regexes must either be all named or all "
             "positional: %r" % self.regex.pattern)

        self._path, self._group_count = self._find_groups()

    def match(self, request):
        match = self.regex.match(request.path)
        if match is None:
            return None
        if not self.regex.groups:
            return {}

        path_args, path_kwargs = [], {}

        # Pass matched groups to the handler.  Since
        # match.groups() includes both named and
        # unnamed groups, we want to use either groups
        # or groupdict but not both.
        if self.regex.groupindex:
            path_kwargs = dict(
                (str(k), _unquote_or_none(v))
                for (k, v) in match.groupdict().items())
        else:
            path_args = [_unquote_or_none(s) for s in match.groups()]

        return dict(path_args=path_args, path_kwargs=path_kwargs)

總結(jié)來(lái)說(shuō),就是根據(jù)patch制定的正則表達(dá)式,然后從request里面的url去匹配,沒(méi)有匹配上返回None,find_handler函數(shù)執(zhí)行下一個(gè)循環(huán)繼續(xù)找,直到找到或者找完。這里假如找到,那么此時(shí)的targe就是RequestHandler的子類(lèi)Indexhandler,所以再?gòu)腳ApplicationRouter的get_target_delegate類(lèi)中就知道了這次返回的target是self.application.get_handler_delegate(request, target, **target_params)的返回值。

    def get_target_delegate(self, target, request, **target_params):
        if isclass(target) and issubclass(target, RequestHandler):
            return self.application.get_handler_delegate(request, target, **target_params)

        return super(_ApplicationRouter, self).get_target_delegate(target, request, **target_params)

self.application.get_handler_delegate調(diào)用的就是Application的get_handler_delegate,而Application的get_handler_delegate函數(shù)定義如下:

    def get_handler_delegate(self, request, target_class, target_kwargs=None,
                             path_args=None, path_kwargs=None):
        """Returns `~.httputil.HTTPMessageDelegate` that can serve a request
        for application and `RequestHandler` subclass.

        :arg httputil.HTTPServerRequest request: current HTTP request.
        :arg RequestHandler target_class: a `RequestHandler` class.
        :arg dict target_kwargs: keyword arguments for ``target_class`` constructor.
        :arg list path_args: positional arguments for ``target_class`` HTTP method that
            will be executed while handling a request (``get``, ``post`` or any other).
        :arg dict path_kwargs: keyword arguments for ``target_class`` HTTP method.
        """
        return _HandlerDelegate(
            self, request, target_class, target_kwargs, path_args, path_kwargs)

它返回的是_HandlerDelegate類(lèi)。也就是說(shuō)self.router.find_handler(request)返回的self.delegate 就是_HandlerDelegate。再次回顧一下_RoutingDelegate的源碼:

class _RoutingDelegate(httputil.HTTPMessageDelegate):
    def __init__(self, router, server_conn, request_conn):
        self.server_conn = server_conn
        self.request_conn = request_conn
        self.delegate = None
        self.router = router  # type: Router

    def headers_received(self, start_line, headers):
        request = httputil.HTTPServerRequest(
            connection=self.request_conn,
            server_connection=self.server_conn,
            start_line=start_line, headers=headers)

        self.delegate = self.router.find_handler(request) #這里的delegate是_HandlerDelegate
        if self.delegate is None:
            app_log.debug("Delegate for %s %s request not found",
                          start_line.method, start_line.path)
            self.delegate = _DefaultMessageDelegate(self.request_conn)

        return self.delegate.headers_received(start_line, headers) 

    def data_received(self, chunk):
        return self.delegate.data_received(chunk)

    def finish(self):
        self.delegate.finish()

    def on_connection_close(self):
        self.delegate.on_connection_close()

得到_read_message(delegate)函數(shù)內(nèi)部的調(diào)用鏈關(guān)系是:_read_message調(diào)用了_RoutingDelegate的headers_received和delegate.finish,也就調(diào)用了_HandlerDelegate的headers_received和delegate.finish。
先直接看_HandlerDelegate類(lèi)的定義,重點(diǎn)關(guān)注handler_class,headers_received和data_received函數(shù):

class _HandlerDelegate(httputil.HTTPMessageDelegate):
    def __init__(self, application, request, handler_class, handler_kwargs,
                 path_args, path_kwargs):
        self.application = application
        self.connection = request.connection
        self.request = request
        self.handler_class = handler_class
        self.handler_kwargs = handler_kwargs or {}
        self.path_args = path_args or []
        self.path_kwargs = path_kwargs or {}
        self.chunks = []
        self.stream_request_body = _has_stream_request_body(self.handler_class)

    def headers_received(self, start_line, headers):
        if self.stream_request_body:
            self.request.body = Future()
            return self.execute()

    def data_received(self, data):
        if self.stream_request_body:
            return self.handler.data_received(data)
        else:
            self.chunks.append(data)
    
    def finish(self):
        if self.stream_request_body:
            future_set_result_unless_cancelled(self.request.body, None)
        else:
            self.request.body = b''.join(self.chunks)
            self.request._parse_body()
            self.execute()

這里關(guān)注delegate.excute也就是_HandlerDelegate類(lèi)的excute函數(shù),他是通往調(diào)用tornado框架中用戶(hù)自定義RequestHandler的調(diào)用入口了。

    def execute(self):
        # If template cache is disabled (usually in the debug mode),
        # re-compile templates and reload static files on every
        # request so you don't need to restart to see changes
        if not self.application.settings.get("compiled_template_cache", True):
            with RequestHandler._template_loader_lock:
                for loader in RequestHandler._template_loaders.values():
                    loader.reset()
        if not self.application.settings.get('static_hash_cache', True):
            StaticFileHandler.reset()

        self.handler = self.handler_class(self.application, self.request,
                                          **self.handler_kwargs)
        transforms = [t(self.request) for t in self.application.transforms]

        if self.stream_request_body:
            self.handler._prepared_future = Future()
        # Note that if an exception escapes handler._execute it will be
        # trapped in the Future it returns (which we are ignoring here,
        # leaving it to be logged when the Future is GC'd).
        # However, that shouldn't happen because _execute has a blanket
        # except handler, and we cannot easily access the IOLoop here to
        # call add_future (because of the requirement to remain compatible
        # with WSGI)
        self.handler._execute(transforms, *self.path_args,
                              **self.path_kwargs)
        # If we are streaming the request body, then execute() is finished
        # when the handler has prepared to receive the body.  If not,
        # it doesn't matter when execute() finishes (so we return None)
        return self.handler._prepared_future

這里重點(diǎn)關(guān)注self.handler = self.handler_class(self.application, self.request,**self.handler_kwargs)和self.handler._execute函數(shù)。剛剛上面分析了_ApplicationRouter類(lèi)的find_handler函數(shù)如下:

    def find_handler(self, request, **kwargs):
        for rule in self.rules:
            target_params = rule.matcher.match(request)
            if target_params is not None:
                if rule.target_kwargs:
                    target_params['target_kwargs'] = rule.target_kwargs

                delegate = self.get_target_delegate(
                    rule.target, request, **target_params)

                if delegate is not None:
                    return delegate

        return None

因?yàn)閐elegate = self.get_target_delegate,所以delegate為_(kāi)HandlerDelegate的實(shí)例,他的初始化參數(shù)handler_class=rule.targe,也就是根據(jù)路由Router獲取到的自己定義的IndexHandler類(lèi)。而self.handler._execute的定義如下:

    @gen.coroutine
    def _execute(self, transforms, *args, **kwargs):
        """Executes this request with the given output transforms."""
        self._transforms = transforms
        try:
            if self.request.method not in self.SUPPORTED_METHODS:
                raise HTTPError(405)
            self.path_args = [self.decode_argument(arg) for arg in args]
            self.path_kwargs = dict((k, self.decode_argument(v, name=k))
                                    for (k, v) in kwargs.items())
            # If XSRF cookies are turned on, reject form submissions without
            # the proper cookie
            if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
                    self.application.settings.get("xsrf_cookies"):
                self.check_xsrf_cookie()

            result = self.prepare()
            if result is not None:
                result = yield result
            if self._prepared_future is not None:
                # Tell the Application we've finished with prepare()
                # and are ready for the body to arrive.
                future_set_result_unless_cancelled(self._prepared_future, None)
            if self._finished:
                return

            if _has_stream_request_body(self.__class__):
                # In streaming mode request.body is a Future that signals
                # the body has been completely received.  The Future has no
                # result; the data has been passed to self.data_received
                # instead.
                try:
                    yield self.request.body
                except iostream.StreamClosedError:
                    return

            method = getattr(self, self.request.method.lower())
            result = method(*self.path_args, **self.path_kwargs)
            if result is not None:
                result = yield result
            if self._auto_finish and not self._finished:
                self.finish()
        except Exception as e:
            try:
                self._handle_request_exception(e)
            except Exception:
                app_log.error("Exception in exception handler", exc_info=True)
            finally:
                # Unset result to avoid circular references
                result = None
            if (self._prepared_future is not None and
                    not self._prepared_future.done()):
                # In case we failed before setting _prepared_future, do it
                # now (to unblock the HTTP server).  Note that this is not
                # in a finally block to avoid GC issues prior to Python 3.4.
                self._prepared_future.set_result(None)

重點(diǎn)關(guān)注

 method = getattr(self, self.request.method.lower())
 result = method(*self.path_args, **self.path_kwargs)

getattr(self, self.request.method.lower())中的self就是self.handler_class=rule.targe的實(shí)例,到了這里,也就是IndexHandler類(lèi),到此,tornado的請(qǐng)求流程就到了我們自己寫(xiě)的函數(shù)處理了啦。
總結(jié)一下流程就是:

tcpserver(listen) ->httpserver(request_delegate = delegate.start_request(self, conn)) -> Application -> _RoutingDelegate -> ret = yield conn.read_response(request_delegate) -> _HandlerDelegate -> requestHandler(get post xxx) 

請(qǐng)求響應(yīng)的數(shù)據(jù)怎么寫(xiě)回socket以及tornado中的高并發(fā)現(xiàn)在哪兒,下次再接著分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,545評(píng)論 19 139
  • (代碼縮進(jìn)有點(diǎn)問(wèn)題 大家可以看源碼) tornado有許多關(guān)于如何處理路由列表的源碼分析的博客,關(guān)鍵在與調(diào)用了Ap...
    lpj24閱讀 1,546評(píng)論 0 2
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 178,983評(píng)論 25 709
  • 蘭庭閱讀 161評(píng)論 0 0
  • 這個(gè)夏天 告別黑色六月 迎著夏日的驕陽(yáng) 以著懵懂之心前行 但卻伴隨著無(wú)盡的落寞與現(xiàn)實(shí)的骨感 這個(gè)夏天 似失去了它原...
    秋_半閱讀 251評(píng)論 0 0

友情鏈接更多精彩內(nèi)容