asyncio + asyncio 異步編程實例

協(xié)程用法

接下來,我們來了解下協(xié)程的實現(xiàn),從 Python 3.4 開始,Python 中加入了協(xié)程的概念,但這個版本的協(xié)程還是以生成器對象為基礎(chǔ)的,在 Python 3.5 則增加了 async/await,使得協(xié)程的實現(xiàn)更加方便。

Python 中使用協(xié)程最常用的庫莫過于 asyncio,所以本文會以 asyncio 為基礎(chǔ)來介紹協(xié)程的使用。

首先我們需要了解下面幾個概念。

event_loop:事件循環(huán),相當(dāng)于一個無限循環(huán),我們可以把一些函數(shù)注冊到這個事件循環(huán)上,當(dāng)滿足條件發(fā)生的時候,就會調(diào)用對應(yīng)的處理方法。
coroutine:中文翻譯叫協(xié)程,在 Python 中常指代為協(xié)程對象類型,我們可以將協(xié)程對象注冊到時間循環(huán)中,它會被事件循環(huán)調(diào)用。我們可以使用 async 關(guān)鍵字來定義一個方法,這個方法在調(diào)用時不會立即被執(zhí)行,而是返回一個協(xié)程對象。
task:任務(wù),它是對協(xié)程對象的進(jìn)一步封裝,包含了任務(wù)的各個狀態(tài)。
future:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果,實際上和 task 沒有本質(zhì)區(qū)別。
另外我們還需要了解 async/await 關(guān)鍵字,它是從 Python 3.5 才出現(xiàn)的,專門用于定義協(xié)程。其中,async 定義一個協(xié)程,await 用來掛起阻塞方法的執(zhí)行。

aiohttp
前面介紹的 asyncio 模塊內(nèi)部實現(xiàn)了對 TCP、UDP、SSL 協(xié)議的異步操作,但是對于 HTTP 請求的異步操作來說,我們就需要用到 aiohttp 來實現(xiàn)了。

aiohttp 是一個基于 asyncio 的異步 HTTP 網(wǎng)絡(luò)模塊,它既提供了服務(wù)端,又提供了客戶端。其中我們用服務(wù)端可以搭建一個支持異步處理的服務(wù)器,用于處理請求并返回響應(yīng),類似于 Django、Flask、Tornado 等一些 Web 服務(wù)器。而客戶端我們就可以用來發(fā)起請求,就類似于 requests 來發(fā)起一個 HTTP 請求然后獲得響應(yīng),但 requests 發(fā)起的是同步的網(wǎng)絡(luò)請求,而 aiohttp 則發(fā)起的是異步的。

本課時我們就主要來了解一下 aiohttp 客戶端部分的使用。

基本使用
基本實例
首先我們來看一個基本的 aiohttp 請求案例,代碼如下:

復(fù)制

import asyncio
async def fetch(session, url):
   async with session.get(url) as response:
       return await response.text(), response.status
async def main():
   async with aiohttp.ClientSession() as session:
       html, status = await fetch(session, 'https://cuiqingcai.com')
       print(f'html: {html[:100]}...')
       print(f'status: {status}')
if __name__ == '__main__':
   loop = asyncio.get_event_loop()
   loop.run_until_complete(main())

在這里我們使用 aiohttp 來爬取了我的個人博客,獲得了源碼和響應(yīng)狀態(tài)碼并輸出,運行結(jié)果如下:

復(fù)制

<html>
<head>
<meta charset="UTF-8">
<meta name="baidu-tc-verification" content=...
status: 200

這里網(wǎng)頁源碼過長,只截取輸出了一部分,可以看到我們成功獲取了網(wǎng)頁的源代碼及響應(yīng)狀態(tài)碼 200,也就完成了一次基本的 HTTP 請求,即我們成功使用 aiohttp 通過異步的方式進(jìn)行了網(wǎng)頁的爬取,當(dāng)然這個操作用之前我們所講的 requests 同樣也可以做到。

我們可以看到其請求方法的定義和之前有了明顯的區(qū)別,主要有如下幾點:

首先在導(dǎo)入庫的時候,我們除了必須要引入 aiohttp 這個庫之外,還必須要引入 asyncio 這個庫,因為要實現(xiàn)異步爬取需要啟動協(xié)程,而協(xié)程則需要借助于 asyncio 里面的事件循環(huán)來執(zhí)行。除了事件循環(huán),asyncio 里面也提供了很多基礎(chǔ)的異步操作。
異步爬取的方法的定義和之前有所不同,在每個異步方法前面統(tǒng)一要加 async 來修飾。
with as 語句前面同樣需要加 async 來修飾,在 Python 中,with as 語句用于聲明一個上下文管理器,能夠幫我們自動分配和釋放資源,而在異步方法中,with as 前面加上 async 代表聲明一個支持異步的上下文管理器。
對于一些返回 coroutine 的操作,前面需要加 await 來修飾,如 response 調(diào)用 text 方法,查詢 API 可以發(fā)現(xiàn)其返回的是 coroutine 對象,那么前面就要加 await;而對于狀態(tài)碼來說,其返回值就是一個數(shù)值類型,那么前面就不需要加 await。所以,這里可以按照實際情況處理,參考官方文檔說明,看看其對應(yīng)的返回值是怎樣的類型,然后決定加不加 await 就可以了。
最后,定義完爬取方法之后,實際上是 main 方法調(diào)用了 fetch 方法。要運行的話,必須要啟用事件循環(huán),事件循環(huán)就需要使用 asyncio 庫,然后使用 run_until_complete 方法來運行。
注意在 Python 3.7 及以后的版本中,我們可以使用 asyncio.run(main()) 來代替最后的啟動操作,不需要顯式聲明事件循環(huán),run 方法內(nèi)部會自動啟動一個事件循環(huán)。但這里為了兼容更多的 Python 版本,依然還是顯式聲明了事件循環(huán)。

URL 參數(shù)設(shè)置
對于 URL 參數(shù)的設(shè)置,我們可以借助于 params 參數(shù),傳入一個字典即可,示例如下:

復(fù)制

import asyncio
async def main():
   params = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.get('https://httpbin.org/get', params=params) as response:
           print(await response.text())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

運行結(jié)果如下:

復(fù)制

 "args": {
   "age": "25",
   "name": "germey"
 },
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Host": "httpbin.org",
   "User-Agent": "Python/3.7 aiohttp/3.6.2",
   "X-Amzn-Trace-Id": "Root=1-5e85eed2-d240ac90f4dddf40b4723ef0"
 },
 "origin": "17.20.255.122",
 "url": "https://httpbin.org/get?name=germey&age=25"
}

這里可以看到,其實際請求的 URL 為 https://httpbin.org/get?name=germey&age=25,其 URL 請求參數(shù)就對應(yīng)了 params 的內(nèi)容。

其他請求類型
另外 aiohttp 還支持其他的請求類型,如 POST、PUT、DELETE 等等,這個和 requests 的使用方式有點類似,示例如下:

復(fù)制

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

POST 數(shù)據(jù)
對于 POST 表單提交,其對應(yīng)的請求頭的 Content-type 為 application/x-www-form-urlencoded,我們可以用如下方式來實現(xiàn),代碼示例如下:

復(fù)制

import asyncio
async def main():
   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', data=data) as response:
           print(await response.text())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

運行結(jié)果如下:

復(fù)制

 "args": {},
 "data": "",
 "files": {},
 "form": {
   "age": "25",
   "name": "germey"
 },
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Content-Length": "18",
   "Content-Type": "application/x-www-form-urlencoded",
   "Host": "httpbin.org",
   "User-Agent": "Python/3.7 aiohttp/3.6.2",
   "X-Amzn-Trace-Id": "Root=1-5e85f0b2-9017ea603a68dc285e0552d0"
 },
 "json": null,
 "origin": "17.20.255.58",
 "url": "https://httpbin.org/post"
}

對于 POST JSON 數(shù)據(jù)提交,其對應(yīng)的請求頭的 Content-type 為 application/json,我們只需要將 post 方法的 data 參數(shù)改成 json 即可,代碼示例如下:

復(fù)制

   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', json=data) as response:
           print(await response.text())

運行結(jié)果如下:

復(fù)制

 "args": {},
 "data": "{\"name\": \"germey\", \"age\": 25}",
 "files": {},
 "form": {},
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Content-Length": "29",
   "Content-Type": "application/json",
   "Host": "httpbin.org",
   "User-Agent": "Python/3.7 aiohttp/3.6.2",
   "X-Amzn-Trace-Id": "Root=1-5e85f03e-c91c9a20c79b9780dbed7540"
 },
 "json": {
   "age": 25,
   "name": "germey"
 },
 "origin": "17.20.255.58",
 "url": "https://httpbin.org/post"
}

響應(yīng)字段
對于響應(yīng)來說,我們可以用如下的方法分別獲取響應(yīng)的狀態(tài)碼、響應(yīng)頭、響應(yīng)體、響應(yīng)體二進(jìn)制內(nèi)容、響應(yīng)體 JSON 結(jié)果,代碼示例如下:

復(fù)制

import asyncio
async def main():
   data = {'name': 'germey', 'age': 25}
   async with aiohttp.ClientSession() as session:
       async with session.post('https://httpbin.org/post', data=data) as response:
           print('status:', response.status)
           print('headers:', response.headers)
           print('body:', await response.text())
           print('bytes:', await response.read())
           print('json:', await response.json())
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

運行結(jié)果如下:

復(fù)制

headers: <CIMultiDictProxy('Date': 'Thu, 02 Apr 2020 14:13:05 GMT', 'Content-Type': 'application/json', 'Content-Length': '503', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
 "args": {},
 "data": "",
 "files": {},
 "form": {
   "age": "25",
   "name": "germey"
 },
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Content-Length": "18",
   "Content-Type": "application/x-www-form-urlencoded",
   "Host": "httpbin.org",
   "User-Agent": "Python/3.7 aiohttp/3.6.2",
   "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"
 },
 "json": null,
 "origin": "17.20.255.58",
 "url": "https://httpbin.org/post"
}
bytes: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "age": "25", \n    "name": "germey"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "18", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "httpbin.org", \n    "User-Agent": "Python/3.7 aiohttp/3.6.2", \n    "X-Amzn-Trace-Id": "Root=1-5e85f2f1-f55326ff5800b15886c8e029"\n  }, \n  "json": null, \n  "origin": "17.20.255.58", \n  "url": "https://httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': 'germey'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '18', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'User-Agent': 'Python/3.7 aiohttp/3.6.2', 'X-Amzn-Trace-Id': 'Root=1-5e85f2f1-f55326ff5800b15886c8e029'}, 'json': None, 'origin': '17.20.255.58', 'url': 'https://httpbin.org/post'}

這里我們可以看到有些字段前面需要加 await,有的則不需要。其原則是,如果其返回的是一個 coroutine 對象(如 async 修飾的方法),那么前面就要加 await,具體可以看 aiohttp 的 API,其鏈接為:https://docs.aiohttp.org/en/stable/client_reference.html。

超時設(shè)置
對于超時的設(shè)置,我們可以借助于 ClientTimeout 對象,比如這里我要設(shè)置 1 秒的超時,可以這么來實現(xiàn):

復(fù)制

import asyncio
async def main():
   timeout = aiohttp.ClientTimeout(total=1)
   async with aiohttp.ClientSession(timeout=timeout) as session:
       async with session.get('https://httpbin.org/get') as response:
           print('status:', response.status)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

如果在 1 秒之內(nèi)成功獲取響應(yīng)的話,運行結(jié)果如下:

復(fù)制200
如果超時的話,會拋出 TimeoutError 異常,其類型為 asyncio.TimeoutError,我們再進(jìn)行異常捕獲即可。

另外 ClientTimeout 對象聲明時還有其他參數(shù),如 connect、socket_connect 等,詳細(xì)說明可以參考官方文檔:https://docs.aiohttp.org/en/stable/client_quickstart.html#timeouts。

并發(fā)限制
由于 aiohttp 可以支持非常大的并發(fā),比如上萬、十萬、百萬都是能做到的,但這么大的并發(fā)量,目標(biāo)網(wǎng)站是很可能在短時間內(nèi)無法響應(yīng)的,而且很可能瞬時間將目標(biāo)網(wǎng)站爬掛掉。所以我們需要控制一下爬取的并發(fā)量。

在一般情況下,我們可以借助于 asyncio 的 Semaphore 來控制并發(fā)量,代碼示例如下:

復(fù)制

import aiohttp
CONCURRENCY = 5
URL = 'https://www.baidu.com'
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():
   async with semaphore:
       print('scraping', URL)
       async with session.get(URL) as response:
           await asyncio.sleep(1)
           return await response.text()
async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
   await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

在這里我們聲明了 CONCURRENCY 代表爬取的最大并發(fā)量為 5,同時聲明爬取的目標(biāo) URL 為百度。接著我們借助于 Semaphore 創(chuàng)建了一個信號量對象,賦值為 semaphore,這樣我們就可以用它來控制最大并發(fā)量了。怎么使用呢?我們這里把它直接放置在對應(yīng)的爬取方法里面,使用 async with 語句將 semaphore 作為上下文對象即可。這樣的話,信號量可以控制進(jìn)入爬取的最大協(xié)程數(shù)量,最大數(shù)量就是我們聲明的 CONCURRENCY 的值。

在 main 方法里面,我們聲明了 10000 個 task,傳遞給 gather 方法運行。倘若不加以限制,這 10000 個 task 會被同時執(zhí)行,并發(fā)數(shù)量太大。但有了信號量的控制之后,同時運行的 task 的數(shù)量最大會被控制在 5 個,這樣就能給 aiohttp 限制速度了。

在這里,aiohttp 的基本使用就介紹這么多,更詳細(xì)的內(nèi)容還是推薦你到官方文檔查閱,鏈接:https://docs.aiohttp.org/。

爬取實戰(zhàn)
上面我們介紹了 aiohttp 的基本用法之后,下面我們來根據(jù)一個實例實現(xiàn)異步爬蟲的實戰(zhàn)演練吧。

本次我們要爬取的網(wǎng)站是:https://dynamic5.scrape.cuiqingcai.com/,頁面如圖所示。

這是一個書籍網(wǎng)站,整個網(wǎng)站包含了數(shù)千本書籍信息,網(wǎng)站是 JavaScript 渲染的,數(shù)據(jù)可以通過 Ajax 接口獲取到,并且接口沒有設(shè)置任何反爬措施和加密參數(shù),另外由于這個網(wǎng)站比之前的電影案例網(wǎng)站數(shù)據(jù)量大一些,所以更加適合做異步爬取。

本課時我們要完成的目標(biāo)有:

使用 aiohttp 完成全站的書籍?dāng)?shù)據(jù)爬取。
將數(shù)據(jù)通過異步的方式保存到 MongoDB 中。
在本課時開始之前,請確保你已經(jīng)做好了如下準(zhǔn)備工作:

安裝好了 Python(最低為 Python 3.6 版本,最好為 3.7 版本或以上),并能成功運行 Python 程序。
了解了 Ajax 爬取的一些基本原理和模擬方法。
了解了異步爬蟲的基本原理和 asyncio 庫的基本用法。
了解了 aiohttp 庫的基本用法。
安裝并成功運行了 MongoDB 數(shù)據(jù)庫,并安裝了異步存儲庫 motor。
注:這里要實現(xiàn) MongoDB 異步存儲,需要異步 MongoDB 存儲庫,叫作 motor,安裝命令為:pip3 install motor

頁面分析
在之前我們講解了 Ajax 的基本分析方法,本課時的站點結(jié)構(gòu)和之前 Ajax 分析的站點結(jié)構(gòu)類似,都是列表頁加詳情頁的結(jié)構(gòu),加載方式都是 Ajax,所以我們能輕松分析到如下信息:

列表頁的 Ajax 請求接口格式為:https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset={offset},limit 的值即為每一頁的書的個數(shù),offset 的值為每一頁的偏移量,其計算公式為 offset = limit * (page - 1) ,如第 1 頁 offset 的值為 0,第 2 頁 offset 的值為 18,以此類推。
列表頁 Ajax 接口返回的數(shù)據(jù)里 results 字段包含當(dāng)前頁 18 本書的信息,其中每本書的數(shù)據(jù)里面包含一個字段 id,這個 id 就是書本身的 ID,可以用來進(jìn)一步請求詳情頁。
詳情頁的 Ajax 請求接口格式為:https://dynamic5.scrape.cuiqingcai.com/api/book/{id},id 即為書的 ID,可以從列表頁的返回結(jié)果中獲取。
如果你掌握了 Ajax 爬取實戰(zhàn)一課時的內(nèi)容話,上面的內(nèi)容應(yīng)該很容易分析出來。如有難度,可以復(fù)習(xí)下之前的知識。

實現(xiàn)思路
其實一個完善的異步爬蟲應(yīng)該能夠充分利用資源進(jìn)行全速爬取,其思路是維護(hù)一個動態(tài)變化的爬取隊列,每產(chǎn)生一個新的 task 就會將其放入隊列中,有專門的爬蟲消費者從隊列中獲取 task 并執(zhí)行,能做到在最大并發(fā)量的前提下充分利用等待時間進(jìn)行額外的爬取處理。

但上面的實現(xiàn)思路整體較為煩瑣,需要設(shè)計爬取隊列、回調(diào)函數(shù)、消費者等機(jī)制,需要實現(xiàn)的功能較多。由于我們剛剛接觸 aiohttp 的基本用法,本課時也主要是了解 aiohttp 的實戰(zhàn)應(yīng)用,所以這里我們將爬取案例的實現(xiàn)稍微簡化一下。

在這里我們將爬取的邏輯拆分成兩部分,第一部分為爬取列表頁,第二部分為爬取詳情頁。由于異步爬蟲的關(guān)鍵點在于并發(fā)執(zhí)行,所以我們可以將爬取拆分為兩個階段:

第一階段為所有列表頁的異步爬取,我們可以將所有的列表頁的爬取任務(wù)集合起來,聲明為 task 組成的列表,進(jìn)行異步爬取。
第二階段則是拿到上一步列表頁的所有內(nèi)容并解析,拿到所有書的 id 信息,組合為所有詳情頁的爬取任務(wù)集合,聲明為 task 組成的列表,進(jìn)行異步爬取,同時爬取的結(jié)果也以異步的方式存儲到 MongoDB 里面。
因為兩個階段的拆分之后需要串行執(zhí)行,所以可能不能達(dá)到協(xié)程的最佳調(diào)度方式和資源利用情況,但也差不了很多。但這個實現(xiàn)思路比較簡單清晰,代碼實現(xiàn)也比較簡單,能夠幫我們快速了解 aiohttp 的基本使用。

基本配置
首先我們先配置一些基本的變量并引入一些必需的庫,代碼如下:

復(fù)制

import aiohttp
import logging
logging.basicConfig(level=logging.INFO,
                   format='%(asctime)s - %(levelname)s: %(message)s')
INDEX_URL = 'https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset={offset}'
DETAIL_URL = 'https://dynamic5.scrape.cuiqingcai.com/api/book/{id}'
PAGE_SIZE = 18
PAGE_NUMBER = 100
CONCURRENCY = 5

在這里我們導(dǎo)入了 asyncio、aiohttp、logging 這三個庫,然后定義了 logging 的基本配置。接著定義了 URL、爬取頁碼數(shù)量 PAGE_NUMBER、并發(fā)量 CONCURRENCY 等信息。

爬取列表頁
首先,第一階段我們就來爬取列表頁,還是和之前一樣,我們先定義一個通用的爬取方法,代碼如下:

復(fù)制

session = None
async def scrape_api(url):
   async with semaphore:
       try:
           logging.info('scraping %s', url)
           async with session.get(url) as response:
               return await response.json()
       except aiohttp.ClientError:
           logging.error('error occurred while scraping %s', url, exc_info=True)

在這里我們聲明了一個信號量,用來控制最大并發(fā)數(shù)量。

接著我們定義了 scrape_api 方法,該方法接收一個參數(shù) url。首先使用 async with 引入信號量作為上下文,接著調(diào)用了 session 的 get 方法請求這個 url,然后返回響應(yīng)的 JSON 格式的結(jié)果。另外這里還進(jìn)行了異常處理,捕獲了 ClientError,如果出現(xiàn)錯誤,會輸出異常信息。

接著,對于列表頁的爬取,實現(xiàn)如下:

復(fù)制

   url = INDEX_URL.format(offset=PAGE_SIZE * (page - 1))
   return await scrape_api(url)

這里定義了一個 scrape_index 方法用于爬取列表頁,它接收一個參數(shù)為 page,然后構(gòu)造了列表頁的 URL,將其傳給 scrape_api 方法即可。這里注意方法同樣需要用 async 修飾,調(diào)用的 scrape_api 方法前面需要加 await,因為 scrape_api 調(diào)用之后本身會返回一個 coroutine。另外由于 scrape_api 返回結(jié)果就是 JSON 格式,因此 scrape_index 的返回結(jié)果就是我們想要爬取的信息,不需要再額外解析了。

好,接著我們定義一個 main 方法,將上面的方法串聯(lián)起來調(diào)用一下,實現(xiàn)如下:

復(fù)制

async def main():
   global session
   session = aiohttp.ClientSession()
   scrape_index_tasks = [asyncio.ensure_future(scrape_index(page)) for page in range(1, PAGE_NUMBER + 1)]
   results = await asyncio.gather(*scrape_index_tasks)
   logging.info('results %s', json.dumps(results, ensure_ascii=False, indent=2))

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

這里我們首先聲明了 session 對象,即最初聲明的全局變量,將 session 作為全局變量的話我們就不需要每次在各個方法里面?zhèn)鬟f了,實現(xiàn)比較簡單。

接著我們定義了 scrape_index_tasks,它就是爬取列表頁的所有 task,接著我們調(diào)用 asyncio 的 gather 方法并傳入 task 列表,將結(jié)果賦值為 results,它是所有 task 返回結(jié)果組成的列表。

最后我們調(diào)用 main 方法,使用事件循環(huán)啟動該 main 方法對應(yīng)的協(xié)程即可。

運行結(jié)果如下:

復(fù)制

2020-04-03 03:45:54,707 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=18
2020-04-03 03:45:54,707 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=36
2020-04-03 03:45:54,708 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=54
2020-04-03 03:45:54,708 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=72
2020-04-03 03:45:56,431 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=90
2020-04-03 03:45:56,435 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset=108

可以看到這里就開始異步爬取了,并發(fā)量是由我們控制的,目前為 5,當(dāng)然也可以進(jìn)一步調(diào)高并發(fā)量,在網(wǎng)站能承受的情況下,爬取速度會進(jìn)一步加快。

最后 results 就是所有列表頁得到的結(jié)果,我們將其賦值為 results 對象,接著我們就可以用它來進(jìn)行第二階段的爬取了。

爬取詳情頁
第二階段就是爬取詳情頁并保存數(shù)據(jù)了,由于每個詳情頁對應(yīng)一本書,每本書需要一個 ID,而這個 ID 又正好存在 results 里面,所以下面我們就需要將所有詳情頁的 ID 獲取出來。

在 main 方法里增加 results 的解析代碼,實現(xiàn)如下:

復(fù)制

for index_data in results:
   if not index_data: continue
   for item in index_data.get('results'):
       ids.append(item.get('id'))

這樣 ids 就是所有書的 id 了,然后我們用所有的 id 來構(gòu)造所有詳情頁對應(yīng)的 task,來進(jìn)行異步爬取即可。

那么這里再定義一個爬取詳情頁和保存數(shù)據(jù)的方法,實現(xiàn)如下:

復(fù)制

MONGO_CONNECTION_STRING = 'mongodb://localhost:27017'
MONGO_DB_NAME = 'books'
MONGO_COLLECTION_NAME = 'books'
client = AsyncIOMotorClient(MONGO_CONNECTION_STRING)
db = client[MONGO_DB_NAME]
collection = db[MONGO_COLLECTION_NAME]
async def save_data(data):
   logging.info('saving data %s', data)
   if data:
       return await collection.update_one({
           'id': data.get('id')
       }, {
           '$set': data
       }, upsert=True)
async def scrape_detail(id):
   url = DETAIL_URL.format(id=id)
   data = await scrape_api(url)
   await save_data(data)

這里我們定義了 scrape_detail 方法用于爬取詳情頁數(shù)據(jù)并調(diào)用 save_data 方法保存數(shù)據(jù),save_data 方法用于將數(shù)據(jù)庫保存到 MongoDB 里面。

在這里我們用到了支持異步的 MongoDB 存儲庫 motor,MongoDB 的連接聲明和 pymongo 是類似的,保存數(shù)據(jù)的調(diào)用方法也是基本一致,不過整個都換成了異步方法。

好,接著我們就在 main 方法里面增加 scrape_detail 方法的調(diào)用即可,實現(xiàn)如下:

復(fù)制

await asyncio.wait(scrape_detail_tasks)
await session.close()

在這里我們先聲明了 scrape_detail_tasks,即所有詳情頁的爬取 task 組成的列表,接著調(diào)用了 asyncio 的 wait 方法調(diào)用執(zhí)行即可,當(dāng)然這里也可以用 gather 方法,效果是一樣的,只不過返回結(jié)果略有差異。最后全部執(zhí)行完畢關(guān)閉 session 即可。

一些詳情頁的爬取過程運行如下:

復(fù)制

2020-04-03 04:00:32,576 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2301475
2020-04-03 04:00:32,576 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2351866
2020-04-03 04:00:32,577 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2828384
2020-04-03 04:00:32,577 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/3040352
2020-04-03 04:00:32,578 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/3074810
2020-04-03 04:00:44,858 - INFO: saving data {'id': '3040352', 'comments': [{'id': '387952888', 'content': '溫馨文,青梅竹馬神馬的很有愛~'}, ..., {'id': '2005314253', 'content': '沈晉&秦央,文比較短,平平淡淡,貼近生活,短文的缺點不細(xì)膩'}], 'name': '那些風(fēng)花雪月', 'authors': ['\n            公子歡喜'], 'translators': [], 'publisher': '龍馬出版社', 'tags': ['公子歡喜', '耽美', 'BL', '小說', '現(xiàn)代', '校園', '耽美小說', '那些風(fēng)花雪月'], 'url': 'https://book.douban.com/subject/3040352/', 'isbn': '9789866685156', 'cover': 'https://img9.doubanio.com/view/subject/l/public/s3029724.jpg', 'page_number': None, 'price': None, 'score': '8.1', 'introduction': '', 'catalog': None, 'published_at': '2008-03-26T16:00:00Z', 'updated_at': '2020-03-21T16:59:39.584722Z'}
2020-04-03 04:00:44,859 - INFO: scraping https://dynamic5.scrape.cuiqingcai.com/api/book/2994915

最后我們觀察下,爬取到的數(shù)據(jù)也都保存到 MongoDB 數(shù)據(jù)庫里面了,如圖所示:
至此,我們就使用 aiohttp 完成了書籍網(wǎng)站的異步爬取。

總結(jié)
本課時的內(nèi)容較多,我們了解了 aiohttp 的基本用法,然后通過一個實例講解了 aiohttp 異步爬蟲的具體實現(xiàn)。學(xué)習(xí)過程我們可以發(fā)現(xiàn),相比普通的單線程爬蟲來說,使用異步可以大大提高爬取效率,后面我們也可以多多使用。
代碼轉(zhuǎn)載至:https://github.com/Germey/ScrapeDynamic5。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容