scrapy_redis分布式實(shí)現(xiàn)了一套自己的組件,其中也提供了Redis數(shù)據(jù)存儲(chǔ)的數(shù)據(jù)管道,位于scrapy_redis.pipelines,這篇文章主要分析器源碼及其工作流程,源碼如下:
from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread
from . import connection, defaults
default_serialize = ScrapyJSONEncoder().encode
class RedisPipeline(object):
"""Pushes serialized item into a redis list/queue
Settings
--------
REDIS_ITEMS_KEY : str
Redis key where to store items.
REDIS_ITEMS_SERIALIZER : str
Object path to serializer function.
"""
def __init__(self, server,
key=defaults.PIPELINE_KEY,
serialize_func=default_serialize):
"""Initialize pipeline.
Parameters
----------
server : StrictRedis
Redis client instance.
key : str
Redis key where to store items.
serialize_func : callable
Items serializer function.
"""
self.server = server
self.key = key
self.serialize = serialize_func
@classmethod
def from_settings(cls, settings):
params = {
'server': connection.from_settings(settings),
}
if settings.get('REDIS_ITEMS_KEY'):
params['key'] = settings['REDIS_ITEMS_KEY']
if settings.get('REDIS_ITEMS_SERIALIZER'):
params['serialize_func'] = load_object(
settings['REDIS_ITEMS_SERIALIZER']
)
return cls(**params)
@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
關(guān)于scrapy自定義數(shù)據(jù)管道在此前文章已經(jīng)說過詳見《scrapy中數(shù)據(jù)處理的兩個(gè)模塊:Item Pipeline與Exporter》,本篇文章闡述RedisPipeline的實(shí)現(xiàn)。
Redis類初始化參數(shù),server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize,其中第一個(gè)參數(shù)sever是Redis客戶端的實(shí)例、key是scrapy_defaults默認(rèn)的配置格式如下:
import redis
# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'
PIPELINE_KEY = '%(spider)s:items'
REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
'socket_timeout': 30,
'socket_connect_timeout': 30,
'retry_on_timeout': True,
'encoding': REDIS_ENCODING,
}
SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'
START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False
serialize_func是序列化Item的函數(shù)默認(rèn)是json.dumps。
那我們對(duì)其工作原理就有了大致的認(rèn)識(shí),pipeline初始化傳入一個(gè)Redis客戶端、一個(gè)key、一個(gè)序列化函數(shù);繼續(xù)向下看,from_settings、from_crawler都是讀取setting.py文件,完成初始化,核心函數(shù)process_item在下。
process_item源碼為:
def process_item(self, item, spider):
return deferToThread(self._process_item, item, spider)
def _process_item(self, item, spider):
key = self.item_key(item, spider)
data = self.serialize(item)
self.server.rpush(key, data)
return item
def item_key(self, item, spider):
"""Returns redis key based on given spider.
Override this function to use a different key depending on the item
and/or spider.
"""
return self.key % {'spider': spider.name}
process_主要實(shí)現(xiàn)了一個(gè)deferToThread方法,該方法作用是返回一個(gè)deferred對(duì)象,不過回調(diào)函數(shù)在另一個(gè)線程處理,主要用于數(shù)據(jù)庫/文件讀取操作。繼續(xù)看deferToThread
def deferToThread(f, *args, **kwargs):
"""
Run a function in a thread and return the result as a Deferred.
@param f: The function to call.
@param *args: positional arguments to pass to f.
@param **kwargs: keyword arguments to pass to f.
@return: A Deferred which fires a callback with the result of f,
or an errback with a L{twisted.python.failure.Failure} if f throws
an exception.
"""
from twisted.internet import reactor
return deferToThreadPool(reactor, reactor.getThreadPool(),
f, *args, **kwargs)
主要使用twisted.internet的 reactor模式,反應(yīng)堆(reactor)模式,這種模式在單線程環(huán)境中調(diào)度多個(gè)事件源產(chǎn)生的事件到它們各自的事件處理例程中去,在這里實(shí)現(xiàn)一個(gè)線程池的效果,達(dá)到最后異步寫入的效果。
