Tornado應(yīng)用筆記05-異步客戶端

索引

本節(jié)內(nèi)容將分析Tornado內(nèi)置的異步HTTP客戶端, 包括源碼和Tornado官方的爬蟲demo. AsyncHTTPClientSimpleAsyncHTTPClient兩個(gè)類是異步客戶端的應(yīng)用層代碼, 下面就來分析其中的源碼, 更底層的代碼分析留待日后填坑.

簡(jiǎn)介與說明

Tornado內(nèi)置了一個(gè)沒有外部依賴的基于tornado iostreams的非阻塞 HTTP 1.1 客戶端,

默認(rèn)情況下, 為了限制掛起連接的數(shù)量, 單個(gè)IOLoop只有一個(gè)AsyncHTTPClient實(shí)例. 當(dāng)然也可以通過設(shè)置force_instance=True來取消這個(gè)限制, 然而并不建議這么做. 默認(rèn)情況下, 只有第一次調(diào)用下面這種方式初始化客戶端時(shí), 配置才會(huì)生效.

AsyncHTTPClient(max_clients=20, defaults=dict(user_agent="WtfAgent"))

原因在上面也提到了, AsyncHTTPClient 是"隱式復(fù)用"的, 除非你設(shè)置了force_instance取消"單實(shí)例"限制, 所以更推薦下面這種配置方式

tornado.httpclient.AsyncHTTPClient.configure(None,
    defaults=dict(
        user_agent="MyUserAgent"
     ), 
    max_clients=20,
)

max_clients是實(shí)例的最大同時(shí)請(qǐng)求數(shù), 超過限制的請(qǐng)求將會(huì)被放入隊(duì)列中, 需要注意的是, 在隊(duì)列中等待的時(shí)間也是計(jì)入超時(shí)計(jì)時(shí)中的, 比方說設(shè)定的超時(shí)時(shí)間是2秒, 如果排隊(duì)時(shí)間超過2秒 , 即便這個(gè)請(qǐng)求實(shí)際上并沒有發(fā)出, 那么也會(huì)被認(rèn)定為超時(shí)

源碼分析
AsyncHTTPClient

主要看fetch, 其內(nèi)部的操作如下:

  • 新建一個(gè)future
  • 統(tǒng)一請(qǐng)求格式, 如果輸入的是url那么需要封裝成HTTPRequest
  • 為請(qǐng)求添加回調(diào)(具體操作是, 先為future添加回調(diào), future完成后執(zhí)行自身回調(diào), future的回調(diào)再在ioloop注冊(cè)用戶定義的回調(diào))
  • 執(zhí)行fetch_impl, 這個(gè)函數(shù)實(shí)際上調(diào)用的是AsyncHTTPClient的子類SimpleAsyncHTTPClient 的方法, fetch_impl完成后會(huì)執(zhí)行future.set_result的回調(diào)
class AsyncHTTPClient(Configurable):

    def fetch(self, request, callback=None, raise_error=True, **kwargs):
        # 實(shí)際上并不需要顯式close, 除非用多實(shí)例, 一般情況不會(huì)關(guān)異步客戶端
        if self._closed:
            raise RuntimeError("fetch() called on closed AsyncHTTPClient")
        # 統(tǒng)一請(qǐng)求格式
        if not isinstance(request, HTTPRequest):
            request = HTTPRequest(url=request, **kwargs)
        request.headers = httputil.HTTPHeaders(request.headers)
        request = _RequestProxy(request, self.defaults)
        future = TracebackFuture()
        # 注冊(cè)回調(diào)
        if callback is not None:
            callback = stack_context.wrap(callback)

            def handle_future(future):
                exc = future.exception()
                if isinstance(exc, HTTPError) and exc.response is not None:
                    response = exc.response
                elif exc is not None:
                    response = HTTPResponse(
                        request, 599, error=exc,
                        request_time=time.time() - request.start_time)
                else:
                    response = future.result()
                # 注冊(cè)HTTP請(qǐng)求完成的回調(diào)
                self.io_loop.add_callback(callback, response)

            # 注冊(cè)future的回調(diào)
            future.add_done_callback(handle_future)

        # fetch_impl完成的回調(diào)
        def handle_response(response):
            if raise_error and response.error:
                future.set_exception(response.error)
            else:
                future.set_result(response)

        # "發(fā)送"請(qǐng)求, 返回`future`
        self.fetch_impl(request, handle_response)
        return future
SimpleAsyncHTTPClient

客戶端是以隊(duì)列的形式管理HTTP請(qǐng)求的, 單個(gè)實(shí)例允許同時(shí)發(fā)起和處理的最大請(qǐng)求數(shù)(即max_clients)默認(rèn)是10, 其內(nèi)部工作流程如下:

  • 將新的HTTP請(qǐng)求加入隊(duì)列
  • 將請(qǐng)求放入"等待區(qū)", 判斷當(dāng)前"工作區(qū)"的請(qǐng)求數(shù), 如果超出可處理的請(qǐng)求數(shù)則為其在ioloop注冊(cè)超時(shí)事件(超時(shí)事件是將請(qǐng)求移出等待區(qū), 并通過在ioloop注冊(cè)回調(diào)的方式拋出異常)
  • 處理"等待區(qū)"的請(qǐng)求, 先注銷ioloop中相應(yīng)的超時(shí)事件(如果有的話), 然后將請(qǐng)求放入"工作區(qū)", 接著發(fā)出異步HTTP請(qǐng)求, 請(qǐng)求包含了釋放資源的回調(diào), 回調(diào)會(huì)將請(qǐng)求移出"工作區(qū)", 并重復(fù)這一步驟, 繼續(xù)處理"等待區(qū)"的請(qǐng)求, 直到"等待區(qū)"被清空.
class SimpleAsyncHTTPClient(AsyncHTTPClient):
    def initialize(self, io_loop, max_clients=10,
                   hostname_mapping=None, max_buffer_size=104857600,
                   resolver=None, defaults=None, max_header_size=None,
                   max_body_size=None):

        super(SimpleAsyncHTTPClient, self).initialize(io_loop,
                                                      defaults=defaults)
        self.max_clients = max_clients
        self.queue = collections.deque()
        self.active = {}
        self.waiting = {}
        self.max_buffer_size = max_buffer_size
        self.max_header_size = max_header_size
        self.max_body_size = max_body_size
        # TCPClient could create a Resolver for us, but we have to do it
        # ourselves to support hostname_mapping.
        if resolver:
            self.resolver = resolver
            self.own_resolver = False
        else:
            self.resolver = Resolver(io_loop=io_loop)
            self.own_resolver = True
        if hostname_mapping is not None:
            self.resolver = OverrideResolver(resolver=self.resolver,
                                             mapping=hostname_mapping)
        self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)

    def fetch_impl(self, request, callback):

        # 將請(qǐng)求加入`collections.deque()`隊(duì)列中
        key = object()
        self.queue.append((key, request, callback))

        # 對(duì)于超出最大同時(shí)請(qǐng)求量的請(qǐng)求, 為其在ioloop注冊(cè)超時(shí)事件,
        if not len(self.active) < self.max_clients:
            timeout_handle = self.io_loop.add_timeout(
                self.io_loop.time() + min(request.connect_timeout,
                                          request.request_timeout),
                functools.partial(self._on_timeout, key))
        else:
            timeout_handle = None

        # 在`waiting`(等待發(fā)出的請(qǐng)求)中添加這`整個(gè)請(qǐng)求事件`(包括請(qǐng)求本身, 請(qǐng)求回調(diào), 超時(shí)處理)
        self.waiting[key] = (request, callback, timeout_handle)

        # 處理請(qǐng)求隊(duì)列
        self._process_queue()
        if self.queue:
            gen_log.debug("max_clients limit reached, request queued. "
                          "%d active, %d queued requests." % (
                              len(self.active), len(self.queue)))

    def _process_queue(self):
        with stack_context.NullContext():
            # 只能同時(shí)處理不超過 `max_clients` 的請(qǐng)求數(shù)
            while self.queue and len(self.active) < self.max_clients:
                # 從隊(duì)列中獲取在`等待`的請(qǐng)求,
                key, request, callback = self.queue.popleft()
                if key not in self.waiting:
                    continue
                # 注銷超時(shí)事件(如果有)并退出`waiting`, 在`active`(正在處理的請(qǐng)求)中添加請(qǐng)求
                self._remove_timeout(key)
                self.active[key] = (request, callback)
                # 構(gòu)建釋放資源的回調(diào)函數(shù)
                release_callback = functools.partial(self._release_fetch, key)
                # `真正`發(fā)送HTTP請(qǐng)求的操作
                self._handle_request(request, release_callback, callback)

    def _handle_request(self, request, release_callback, final_callback):
        self._connection_class()(
            self.io_loop, self, request, release_callback,
            final_callback, self.max_buffer_size, self.tcp_client,
            self.max_header_size, self.max_body_size)

    def _release_fetch(self, key):
        # 完成請(qǐng)求后釋放資源, 退出`active`
        del self.active[key]
        self._process_queue()

    def _remove_timeout(self, key):
        # 注銷超時(shí)事件(如果有)并退出`waiting`
        if key in self.waiting:
            request, callback, timeout_handle = self.waiting[key]
            if timeout_handle is not None:
                self.io_loop.remove_timeout(timeout_handle)
            del self.waiting[key]

    def _on_timeout(self, key):
        # 退出`waiting`并注冊(cè)超時(shí)響應(yīng)回調(diào)
        request, callback, timeout_handle = self.waiting[key]
        self.queue.remove((key, request, callback))
        timeout_response = HTTPResponse(
            request, 599, error=HTTPError(599, "Timeout"),
            request_time=self.io_loop.time() - request.start_time)
        self.io_loop.add_callback(callback, timeout_response)
        del self.waiting[key]
官方爬蟲demo(源碼地址)

這個(gè)爬蟲來自Tornado源碼附帶的demo, 使用其內(nèi)置的隊(duì)列, 協(xié)程和異步HTTP客戶端實(shí)現(xiàn)一個(gè)"全站"爬蟲. 理解這個(gè)demo對(duì)于理解協(xié)程和異步客戶端都有一定幫助.

先簡(jiǎn)單說明這個(gè)爬蟲的工作原理:

  • 創(chuàng)建一個(gè)隊(duì)列, 用于存放等待爬取的網(wǎng)頁(yè)url, 并將網(wǎng)站的"根url"放入隊(duì)列中
  • 開啟10個(gè)worker(可以理解為10個(gè)異步客戶端)爬網(wǎng)站, 并為這個(gè)爬蟲任務(wù)設(shè)置一個(gè)超時(shí)時(shí)間, 如果超時(shí)將拋出異常終止任務(wù)
  • 每個(gè)worker的工作流程大致為:
    • 從隊(duì)列獲取一個(gè)新的url加入到"工作區(qū)",
    • 爬取新url對(duì)應(yīng)的頁(yè)面
    • 獲取新頁(yè)面中的所有url
    • 將本次已爬取的url加入"完成區(qū)", 移出"工作區(qū)"
    • 按照規(guī)定篩選前面獲取到的新url, 然后將篩選的結(jié)果加入隊(duì)列
    • 重復(fù)第一步, 直到隊(duì)列清空
#!/usr/bin/env python

import time
from datetime import timedelta

try:
    from HTMLParser import HTMLParser
    from urlparse import urljoin, urldefrag
except ImportError:
    from html.parser import HTMLParser
    from urllib.parse import urljoin, urldefrag

from tornado import httpclient, gen, ioloop, queues

# 設(shè)定根url和最大worker數(shù)
base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10


@gen.coroutine
def get_links_from_url(url):
    # 爬取網(wǎng)頁(yè)內(nèi)容并獲取網(wǎng)頁(yè)中所有去尾的(去除錨點(diǎn)"#"及其后續(xù)內(nèi)容)url
    try:
        response = yield httpclient.AsyncHTTPClient().fetch(url)
        print('fetched %s' % url)

        html = response.body if isinstance(response.body, str) \
            else response.body.decode()
        urls = [urljoin(url, remove_fragment(new_url))
                for new_url in get_links(html)]
    except Exception as e:
        print('Exception: %s %s' % (e, url))
        raise gen.Return([])

    raise gen.Return(urls)


def remove_fragment(url):
    # 去除url中錨點(diǎn)"#"及其后續(xù)內(nèi)容
    pure_url, frag = urldefrag(url)
    return pure_url


def get_links(html):
    # 獲取網(wǎng)頁(yè)中所有<a>標(biāo)簽url
    class URLSeeker(HTMLParser):
        def __init__(self):
            HTMLParser.__init__(self)
            self.urls = []

        def handle_starttag(self, tag, attrs):
            href = dict(attrs).get('href')
            if href and tag == 'a':
                self.urls.append(href)

    url_seeker = URLSeeker()
    url_seeker.feed(html)
    return url_seeker.urls


@gen.coroutine
def main():
    # 創(chuàng)建url隊(duì)列, "工作區(qū)" 和 "完成區(qū)"
    q = queues.Queue()
    start = time.time()
    fetching, fetched = set(), set()

    @gen.coroutine
    def fetch_url():
        # 從隊(duì)列中獲取新的url
        current_url = yield q.get()
        try:
            # 當(dāng)前url已經(jīng)在爬取(即url在"工作區(qū)中"), 就退出
            if current_url in fetching:
                return

            print('fetching %s' % current_url)
            
            # 將url加入"工作區(qū)", 爬取內(nèi)容, 完成后加入"完成區(qū)"
            fetching.add(current_url)
            urls = yield get_links_from_url(current_url)
            fetched.add(current_url)

            # 篩選新的url并將其加入隊(duì)列
            for new_url in urls:
                # Only follow links beneath the base URL
                if new_url.startswith(base_url):
                    yield q.put(new_url)

        # 完成后調(diào)用`q.task_done`, 隊(duì)列中的未完成任務(wù)計(jì)數(shù)將減1, 與`q.get`一一對(duì)應(yīng)
        # 為的是配合`q.join`, 當(dāng)計(jì)數(shù)為0時(shí), `q.join`會(huì)結(jié)束"等待"
        finally:
            q.task_done()

    # worker
    @gen.coroutine
    def worker():
        while True:
            yield fetch_url()

    # 為隊(duì)列添加第一個(gè)url("任務(wù)")
    q.put(base_url)

    # 啟動(dòng)workers, 然后"等待"隊(duì)列清空, 時(shí)間為300秒, 超時(shí)會(huì)拋出異常
    # 完成后檢查"url隊(duì)列"和"完成區(qū)"是否一致, 最后輸出任務(wù)耗時(shí)
    for _ in range(concurrency):
        worker()

    # 這里使用了`.join`阻塞等待, 直到隊(duì)列被清空才會(huì)恢復(fù), 繼續(xù)往下走
    yield q.join(timeout=timedelta(seconds=300))
    assert fetching == fetched
    print('Done in %d seconds, fetched %s URLs.' % (
        time.time() - start, len(fetched)))


if __name__ == '__main__':
    import logging
    logging.basicConfig()
    # 啟動(dòng)任務(wù)
    io_loop = ioloop.IOLoop.current()
    io_loop.run_sync(main)

本節(jié)內(nèi)容就是這些, 下節(jié)內(nèi)容將通過兩種方式實(shí)現(xiàn)的聊天室demo, 介紹WebSocket與長(zhǎng)輪詢?cè)赥ornado中的實(shí)現(xiàn)方法.

NEXT ===> Tornado應(yīng)用筆記06-WebSocket與長(zhǎng)輪詢

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容