索引
本節(jié)內(nèi)容將分析Tornado內(nèi)置的異步HTTP客戶端, 包括源碼和Tornado官方的爬蟲demo. AsyncHTTPClient與SimpleAsyncHTTPClient兩個(gè)類是異步客戶端的應(yīng)用層代碼, 下面就來分析其中的源碼, 更底層的代碼分析留待日后填坑.
簡(jiǎn)介與說明
Tornado內(nèi)置了一個(gè)沒有外部依賴的基于tornado iostreams的非阻塞 HTTP 1.1 客戶端,
默認(rèn)情況下, 為了限制掛起連接的數(shù)量, 單個(gè)IOLoop只有一個(gè)AsyncHTTPClient實(shí)例. 當(dāng)然也可以通過設(shè)置force_instance=True來取消這個(gè)限制, 然而并不建議這么做. 默認(rèn)情況下, 只有第一次調(diào)用下面這種方式初始化客戶端時(shí), 配置才會(huì)生效.
AsyncHTTPClient(max_clients=20, defaults=dict(user_agent="WtfAgent"))
原因在上面也提到了, AsyncHTTPClient 是"隱式復(fù)用"的, 除非你設(shè)置了force_instance取消"單實(shí)例"限制, 所以更推薦下面這種配置方式
tornado.httpclient.AsyncHTTPClient.configure(None,
defaults=dict(
user_agent="MyUserAgent"
),
max_clients=20,
)
max_clients是實(shí)例的最大同時(shí)請(qǐng)求數(shù), 超過限制的請(qǐng)求將會(huì)被放入隊(duì)列中, 需要注意的是, 在隊(duì)列中等待的時(shí)間也是計(jì)入超時(shí)計(jì)時(shí)中的, 比方說設(shè)定的超時(shí)時(shí)間是2秒, 如果排隊(duì)時(shí)間超過2秒 , 即便這個(gè)請(qǐng)求實(shí)際上并沒有發(fā)出, 那么也會(huì)被認(rèn)定為超時(shí)
源碼分析
AsyncHTTPClient
主要看fetch, 其內(nèi)部的操作如下:
- 新建一個(gè)
future - 統(tǒng)一請(qǐng)求格式, 如果輸入的是
url那么需要封裝成HTTPRequest - 為請(qǐng)求添加回調(diào)(具體操作是, 先為
future添加回調(diào),future完成后執(zhí)行自身回調(diào),future的回調(diào)再在ioloop注冊(cè)用戶定義的回調(diào)) - 執(zhí)行
fetch_impl, 這個(gè)函數(shù)實(shí)際上調(diào)用的是AsyncHTTPClient的子類SimpleAsyncHTTPClient的方法,fetch_impl完成后會(huì)執(zhí)行future.set_result的回調(diào)
class AsyncHTTPClient(Configurable):
def fetch(self, request, callback=None, raise_error=True, **kwargs):
# 實(shí)際上并不需要顯式close, 除非用多實(shí)例, 一般情況不會(huì)關(guān)異步客戶端
if self._closed:
raise RuntimeError("fetch() called on closed AsyncHTTPClient")
# 統(tǒng)一請(qǐng)求格式
if not isinstance(request, HTTPRequest):
request = HTTPRequest(url=request, **kwargs)
request.headers = httputil.HTTPHeaders(request.headers)
request = _RequestProxy(request, self.defaults)
future = TracebackFuture()
# 注冊(cè)回調(diào)
if callback is not None:
callback = stack_context.wrap(callback)
def handle_future(future):
exc = future.exception()
if isinstance(exc, HTTPError) and exc.response is not None:
response = exc.response
elif exc is not None:
response = HTTPResponse(
request, 599, error=exc,
request_time=time.time() - request.start_time)
else:
response = future.result()
# 注冊(cè)HTTP請(qǐng)求完成的回調(diào)
self.io_loop.add_callback(callback, response)
# 注冊(cè)future的回調(diào)
future.add_done_callback(handle_future)
# fetch_impl完成的回調(diào)
def handle_response(response):
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
# "發(fā)送"請(qǐng)求, 返回`future`
self.fetch_impl(request, handle_response)
return future
SimpleAsyncHTTPClient
客戶端是以隊(duì)列的形式管理HTTP請(qǐng)求的, 單個(gè)實(shí)例允許同時(shí)發(fā)起和處理的最大請(qǐng)求數(shù)(即max_clients)默認(rèn)是10, 其內(nèi)部工作流程如下:
- 將新的HTTP請(qǐng)求加入隊(duì)列
- 將請(qǐng)求放入"等待區(qū)", 判斷當(dāng)前"工作區(qū)"的請(qǐng)求數(shù), 如果超出可處理的請(qǐng)求數(shù)則為其在
ioloop注冊(cè)超時(shí)事件(超時(shí)事件是將請(qǐng)求移出等待區(qū), 并通過在ioloop注冊(cè)回調(diào)的方式拋出異常) - 處理"等待區(qū)"的請(qǐng)求, 先注銷
ioloop中相應(yīng)的超時(shí)事件(如果有的話), 然后將請(qǐng)求放入"工作區(qū)", 接著發(fā)出異步HTTP請(qǐng)求, 請(qǐng)求包含了釋放資源的回調(diào), 回調(diào)會(huì)將請(qǐng)求移出"工作區(qū)", 并重復(fù)這一步驟, 繼續(xù)處理"等待區(qū)"的請(qǐng)求, 直到"等待區(qū)"被清空.
class SimpleAsyncHTTPClient(AsyncHTTPClient):
def initialize(self, io_loop, max_clients=10,
hostname_mapping=None, max_buffer_size=104857600,
resolver=None, defaults=None, max_header_size=None,
max_body_size=None):
super(SimpleAsyncHTTPClient, self).initialize(io_loop,
defaults=defaults)
self.max_clients = max_clients
self.queue = collections.deque()
self.active = {}
self.waiting = {}
self.max_buffer_size = max_buffer_size
self.max_header_size = max_header_size
self.max_body_size = max_body_size
# TCPClient could create a Resolver for us, but we have to do it
# ourselves to support hostname_mapping.
if resolver:
self.resolver = resolver
self.own_resolver = False
else:
self.resolver = Resolver(io_loop=io_loop)
self.own_resolver = True
if hostname_mapping is not None:
self.resolver = OverrideResolver(resolver=self.resolver,
mapping=hostname_mapping)
self.tcp_client = TCPClient(resolver=self.resolver, io_loop=io_loop)
def fetch_impl(self, request, callback):
# 將請(qǐng)求加入`collections.deque()`隊(duì)列中
key = object()
self.queue.append((key, request, callback))
# 對(duì)于超出最大同時(shí)請(qǐng)求量的請(qǐng)求, 為其在ioloop注冊(cè)超時(shí)事件,
if not len(self.active) < self.max_clients:
timeout_handle = self.io_loop.add_timeout(
self.io_loop.time() + min(request.connect_timeout,
request.request_timeout),
functools.partial(self._on_timeout, key))
else:
timeout_handle = None
# 在`waiting`(等待發(fā)出的請(qǐng)求)中添加這`整個(gè)請(qǐng)求事件`(包括請(qǐng)求本身, 請(qǐng)求回調(diào), 超時(shí)處理)
self.waiting[key] = (request, callback, timeout_handle)
# 處理請(qǐng)求隊(duì)列
self._process_queue()
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (
len(self.active), len(self.queue)))
def _process_queue(self):
with stack_context.NullContext():
# 只能同時(shí)處理不超過 `max_clients` 的請(qǐng)求數(shù)
while self.queue and len(self.active) < self.max_clients:
# 從隊(duì)列中獲取在`等待`的請(qǐng)求,
key, request, callback = self.queue.popleft()
if key not in self.waiting:
continue
# 注銷超時(shí)事件(如果有)并退出`waiting`, 在`active`(正在處理的請(qǐng)求)中添加請(qǐng)求
self._remove_timeout(key)
self.active[key] = (request, callback)
# 構(gòu)建釋放資源的回調(diào)函數(shù)
release_callback = functools.partial(self._release_fetch, key)
# `真正`發(fā)送HTTP請(qǐng)求的操作
self._handle_request(request, release_callback, callback)
def _handle_request(self, request, release_callback, final_callback):
self._connection_class()(
self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.tcp_client,
self.max_header_size, self.max_body_size)
def _release_fetch(self, key):
# 完成請(qǐng)求后釋放資源, 退出`active`
del self.active[key]
self._process_queue()
def _remove_timeout(self, key):
# 注銷超時(shí)事件(如果有)并退出`waiting`
if key in self.waiting:
request, callback, timeout_handle = self.waiting[key]
if timeout_handle is not None:
self.io_loop.remove_timeout(timeout_handle)
del self.waiting[key]
def _on_timeout(self, key):
# 退出`waiting`并注冊(cè)超時(shí)響應(yīng)回調(diào)
request, callback, timeout_handle = self.waiting[key]
self.queue.remove((key, request, callback))
timeout_response = HTTPResponse(
request, 599, error=HTTPError(599, "Timeout"),
request_time=self.io_loop.time() - request.start_time)
self.io_loop.add_callback(callback, timeout_response)
del self.waiting[key]
官方爬蟲demo(源碼地址)
這個(gè)爬蟲來自Tornado源碼附帶的demo, 使用其內(nèi)置的隊(duì)列, 協(xié)程和異步HTTP客戶端實(shí)現(xiàn)一個(gè)"全站"爬蟲. 理解這個(gè)demo對(duì)于理解協(xié)程和異步客戶端都有一定幫助.
先簡(jiǎn)單說明這個(gè)爬蟲的工作原理:
- 創(chuàng)建一個(gè)隊(duì)列, 用于存放等待爬取的網(wǎng)頁(yè)url, 并將網(wǎng)站的"根url"放入隊(duì)列中
- 開啟10個(gè)
worker(可以理解為10個(gè)異步客戶端)爬網(wǎng)站, 并為這個(gè)爬蟲任務(wù)設(shè)置一個(gè)超時(shí)時(shí)間, 如果超時(shí)將拋出異常終止任務(wù) - 每個(gè)
worker的工作流程大致為:- 從隊(duì)列獲取一個(gè)新的url加入到"工作區(qū)",
- 爬取新url對(duì)應(yīng)的頁(yè)面
- 獲取新頁(yè)面中的所有url
- 將本次已爬取的url加入"完成區(qū)", 移出"工作區(qū)"
- 按照規(guī)定篩選前面獲取到的新url, 然后將篩選的結(jié)果加入隊(duì)列
- 重復(fù)第一步, 直到隊(duì)列清空
#!/usr/bin/env python
import time
from datetime import timedelta
try:
from HTMLParser import HTMLParser
from urlparse import urljoin, urldefrag
except ImportError:
from html.parser import HTMLParser
from urllib.parse import urljoin, urldefrag
from tornado import httpclient, gen, ioloop, queues
# 設(shè)定根url和最大worker數(shù)
base_url = 'http://www.tornadoweb.org/en/stable/'
concurrency = 10
@gen.coroutine
def get_links_from_url(url):
# 爬取網(wǎng)頁(yè)內(nèi)容并獲取網(wǎng)頁(yè)中所有去尾的(去除錨點(diǎn)"#"及其后續(xù)內(nèi)容)url
try:
response = yield httpclient.AsyncHTTPClient().fetch(url)
print('fetched %s' % url)
html = response.body if isinstance(response.body, str) \
else response.body.decode()
urls = [urljoin(url, remove_fragment(new_url))
for new_url in get_links(html)]
except Exception as e:
print('Exception: %s %s' % (e, url))
raise gen.Return([])
raise gen.Return(urls)
def remove_fragment(url):
# 去除url中錨點(diǎn)"#"及其后續(xù)內(nèi)容
pure_url, frag = urldefrag(url)
return pure_url
def get_links(html):
# 獲取網(wǎng)頁(yè)中所有<a>標(biāo)簽url
class URLSeeker(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.urls = []
def handle_starttag(self, tag, attrs):
href = dict(attrs).get('href')
if href and tag == 'a':
self.urls.append(href)
url_seeker = URLSeeker()
url_seeker.feed(html)
return url_seeker.urls
@gen.coroutine
def main():
# 創(chuàng)建url隊(duì)列, "工作區(qū)" 和 "完成區(qū)"
q = queues.Queue()
start = time.time()
fetching, fetched = set(), set()
@gen.coroutine
def fetch_url():
# 從隊(duì)列中獲取新的url
current_url = yield q.get()
try:
# 當(dāng)前url已經(jīng)在爬取(即url在"工作區(qū)中"), 就退出
if current_url in fetching:
return
print('fetching %s' % current_url)
# 將url加入"工作區(qū)", 爬取內(nèi)容, 完成后加入"完成區(qū)"
fetching.add(current_url)
urls = yield get_links_from_url(current_url)
fetched.add(current_url)
# 篩選新的url并將其加入隊(duì)列
for new_url in urls:
# Only follow links beneath the base URL
if new_url.startswith(base_url):
yield q.put(new_url)
# 完成后調(diào)用`q.task_done`, 隊(duì)列中的未完成任務(wù)計(jì)數(shù)將減1, 與`q.get`一一對(duì)應(yīng)
# 為的是配合`q.join`, 當(dāng)計(jì)數(shù)為0時(shí), `q.join`會(huì)結(jié)束"等待"
finally:
q.task_done()
# worker
@gen.coroutine
def worker():
while True:
yield fetch_url()
# 為隊(duì)列添加第一個(gè)url("任務(wù)")
q.put(base_url)
# 啟動(dòng)workers, 然后"等待"隊(duì)列清空, 時(shí)間為300秒, 超時(shí)會(huì)拋出異常
# 完成后檢查"url隊(duì)列"和"完成區(qū)"是否一致, 最后輸出任務(wù)耗時(shí)
for _ in range(concurrency):
worker()
# 這里使用了`.join`阻塞等待, 直到隊(duì)列被清空才會(huì)恢復(fù), 繼續(xù)往下走
yield q.join(timeout=timedelta(seconds=300))
assert fetching == fetched
print('Done in %d seconds, fetched %s URLs.' % (
time.time() - start, len(fetched)))
if __name__ == '__main__':
import logging
logging.basicConfig()
# 啟動(dòng)任務(wù)
io_loop = ioloop.IOLoop.current()
io_loop.run_sync(main)
本節(jié)內(nèi)容就是這些, 下節(jié)內(nèi)容將通過兩種方式實(shí)現(xiàn)的聊天室demo, 介紹WebSocket與長(zhǎng)輪詢?cè)赥ornado中的實(shí)現(xiàn)方法.
NEXT ===> Tornado應(yīng)用筆記06-WebSocket與長(zhǎng)輪詢