Item Pipeline(項目管道)
在一個項目被spider抓取后,它被發(fā)送到Item Pipeline,Item Pipeline通過順序執(zhí)行的幾個組件處理它,決定該項目是否應(yīng)該繼續(xù)通過Pipeline或被丟棄并且不再處理。
Item Pipeline的典型用途是:
- 清理HTML數(shù)據(jù)
- 驗證已刪除的數(shù)據(jù)(檢查項目是否包含某些字段)
- 檢查重復(fù)項(并刪除它們)
- 將已刪除的項目存儲在數(shù)據(jù)庫中
編寫自己的項目管道
每個項管道組件都是一個必須實現(xiàn)以下方法的Python類:
process_item(self, item, spider)
為每個Item Pipeline組件調(diào)用此方法。process_item()必須:返回帶數(shù)據(jù)的dict,返回Item(或任何后代類)對象,返回Twisted Deferred或引發(fā)DropItem異常。 丟棄的項目不再由其他管道組件處理。
open_spider(self, spider) 打開spider時會調(diào)用此方法。
close_spider(self, spider) 當spider關(guān)閉時調(diào)用此方法。
from_crawler(cls, crawler)
如果存在,則調(diào)用此類方法以從a創(chuàng)建管道實例Crawler。它必須返回管道的新實例。Crawler對象提供對所有Scrapy核心組件的訪問,如設(shè)置和信號; 它是管道訪問它們并將其功能掛鉤到Scrapy的一種方式。
激活I(lǐng)tem Pipeline組件
要激活I(lǐng)tem Pipeline組件,必須將settings.py中的ITEM_PIPELINES設(shè)置打開,如下例所示:
ITEM_PIPELINES = {
'myproject.pipelines.PricePipeline': 300,
'myproject.pipelines.JsonWriterPipeline': 800,
}
此設(shè)置中為類分配的整數(shù)值決定了它們運行的??順序:較低值優(yōu)先級高。習(xí)慣上在0-1000范圍內(nèi)定義這些數(shù)字。
項目管道示例
1、 價格驗證和丟棄物品沒有價格
調(diào)整 price那些不包含增值稅(price_excludes_vat屬性)的項目的屬性,并刪除那些不包含價格的項目:
from scrapy.exceptions import DropItem
class PricePipeline(object):
vat_factor = 1.15
def process_item(self, item, spider):
if item.get('price'):
if item.get('price_excludes_vat'):
item['price'] = item['price'] * self.vat_factor
return item
else:
raise DropItem("Missing price in %s" % item)
2、 將項目寫入JSON文件
以下管道將所有已刪除的項目(來自所有蜘蛛)存儲到一個items.jl文件中,每行包含一個以JSON格式序列化的項目:
import json
class JsonWriterPipeline(object):
def open_spider(self, spider):
self.file = open('items.jl', 'w')
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(dict(item)) + "\n"
self.file.write(line)
return item
3、將項目寫入MongoDB
在這個例子中,我們將使用pymongo將項目寫入MongoDB。MongoDB地址和數(shù)據(jù)庫名稱在Scrapy設(shè)置中指定;MongoDB集合以item類命名。
這個例子的要點是展示如何使用from_crawler() 方法以及如何正確地清理資源:
import pymongo
class MongoPipeline(object):
collection_name = 'scrapy_items'
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get('MONGO_URI'),
mongo_db=crawler.settings.get('MONGO_DATABASE', 'items')
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
self.db[self.collection_name].insert_one(dict(item))
return item
4、截取項目的截圖
此示例演示如何從process_item()方法返回Deferred。它使用Splash渲染項目URL的屏幕截圖。Pipeline向本地運行的Splash實例發(fā)出請求。下載請求并延遲回調(diào)激活后,它會將項目保存到文件并將文件名添加到項目中。
import scrapy
import hashlib
from urllib.parse import quote
class ScreenshotPipeline(object):
"""Pipeline that uses Splash to render screenshot of
every Scrapy item."""
SPLASH_URL = "http://localhost:8050/render.png?url={}"
def process_item(self, item, spider):
encoded_item_url = quote(item["url"])
screenshot_url = self.SPLASH_URL.format(encoded_item_url)
request = scrapy.Request(screenshot_url)
dfd = spider.crawler.engine.download(request, spider)
dfd.addBoth(self.return_item, item)
return dfd
def return_item(self, response, item):
if response.status != 200:
# Error happened, return item.
return item
# Save screenshot to file, filename will be hash of url.
url = item["url"]
url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
filename = "{}.png".format(url_hash)
with open(filename, "wb") as f:
f.write(response.body)
# Store filename in item.
item["screenshot_filename"] = filename
return item
5、重復(fù)過濾
一個過濾器,用于查找重復(fù)項目,并刪除已處理的項目。假設(shè)我們的項目具有唯一ID,但我們的spider會返回具有相同ID的多個項目:
from scrapy.exceptions import DropItem
class DuplicatesPipeline(object):
def __init__(self):
self.ids_seen = set()
def process_item(self, item, spider):
if item['id'] in self.ids_seen:
raise DropItem("Duplicate item found: %s" % item)
else:
self.ids_seen.add(item['id'])
return item