Scrapy適合做全量爬取,但是,我們不是一次抓取完就完事了。很多情況,我們需要持續(xù)的跟進抓取的站點,增量抓取是最需要的。
Scrapy與Redis配合,在寫入數(shù)據(jù)庫之前,做唯一性過濾,實現(xiàn)增量爬取。
一、官方的去重Pipeline
官方文檔中有一個去重的過濾器:
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item
官方的這個過濾器的缺陷是只能確保單次抓取不間斷的情況下去重,因為其數(shù)據(jù)是保存在內(nèi)存中的,當一個爬蟲任務跑完后程序結(jié)束,內(nèi)存就清理掉了。再次運行時就失效了。
二、基于Redis的去重Pipeline
為了能夠多次爬取時去重,我們考慮用Redis,其快速的鍵值存取,對管道處理數(shù)據(jù)不會產(chǎn)生多少延時。
#pipelines.py
import pandas as pd
import redis
redis_db = redis.Redis(host=settings.REDIS_HOST, port=6379, db=4, password=settings.REDIS_PWD)
redis_data_dict = "f_uuids"
class DuplicatePipeline(object):
"""
去重(redis)
"""
def __init__(self):
if redis_db.hlen(redis_data_dict) == 0:
sql = "SELECT uuid FROM f_data"
df = pd.read_sql(sql, engine)
for uuid in df['uuid'].get_values():
redis_db.hset(redis_data_dict, uuid, 0)
def process_item(self, item, spider):
if redis_db.hexists(redis_data_dict, item['uuid']):
raise DropItem("Duplicate item found:%s" % item)
return item
- 首先,我們定義一個redis實例: redis_db和redis key:redis_data_dict。
- 在DuplicatePipeline的初始化函數(shù)init()中,對redis的key值做了初始化。當然,這步不是必須的,你可以不用實現(xiàn)。
- 在process_item函數(shù)中,判斷redis的hash表中存在該值uuid,則為重復item。
至于redis中為什么沒有用list而用hash? 主要是因為速度,hash判斷uuid是否存在比list快好幾個數(shù)據(jù)級。
特別是uuid的數(shù)據(jù)達到100w+時,hash的hexists函數(shù)速度優(yōu)勢更明顯。
最后別忘了在settings.py中加上:
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'fund_spider.pipelines.DuplicatePipeline': 200,
#'fund_spider.pipelines.MySQLStorePipeline': 300,
}
三、總結(jié)
本文不是真正意義上的增量爬取,而只是在數(shù)據(jù)存儲環(huán)節(jié),對數(shù)據(jù)唯一性作了處理,當然,這樣已經(jīng)滿足了大部分的需求。
后續(xù)我會實現(xiàn)不需要遍歷所有的網(wǎng)頁,判斷抓取到所有最新的item,就停止抓取。敬請關(guān)注!