scrapy中scrapy_redis分布式內(nèi)置pipeline源碼及其工作原理

scrapy_redis分布式實(shí)現(xiàn)了一套自己的組件,其中也提供了Redis數(shù)據(jù)存儲(chǔ)的數(shù)據(jù)管道,位于scrapy_redis.pipelines,這篇文章主要分析器源碼及其工作流程,源碼如下:

from scrapy.utils.misc import load_object
from scrapy.utils.serialize import ScrapyJSONEncoder
from twisted.internet.threads import deferToThread

from . import connection, defaults

default_serialize = ScrapyJSONEncoder().encode

class RedisPipeline(object):
    """Pushes serialized item into a redis list/queue

    Settings
    --------
    REDIS_ITEMS_KEY : str
        Redis key where to store items.
    REDIS_ITEMS_SERIALIZER : str
        Object path to serializer function.

    """

    def __init__(self, server,
                 key=defaults.PIPELINE_KEY,
                 serialize_func=default_serialize):
        """Initialize pipeline.

        Parameters
        ----------
        server : StrictRedis
            Redis client instance.
        key : str
            Redis key where to store items.
        serialize_func : callable
            Items serializer function.

        """
        self.server = server
        self.key = key
        self.serialize = serialize_func

    @classmethod
    def from_settings(cls, settings):
        params = {
            'server': connection.from_settings(settings),
        }
        if settings.get('REDIS_ITEMS_KEY'):
            params['key'] = settings['REDIS_ITEMS_KEY']
        if settings.get('REDIS_ITEMS_SERIALIZER'):
            params['serialize_func'] = load_object(
                settings['REDIS_ITEMS_SERIALIZER']
            )

        return cls(**params)

    @classmethod
    def from_crawler(cls, crawler):
        return cls.from_settings(crawler.settings)

    def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """
        return self.key % {'spider': spider.name}

關(guān)于scrapy自定義數(shù)據(jù)管道在此前文章已經(jīng)說過詳見《scrapy中數(shù)據(jù)處理的兩個(gè)模塊:Item Pipeline與Exporter》,本篇文章闡述RedisPipeline的實(shí)現(xiàn)。

Redis類初始化參數(shù),server, key=defaults.PIPELINE_KEY, serialize_func=default_serialize,其中第一個(gè)參數(shù)sever是Redis客戶端的實(shí)例、key是scrapy_defaults默認(rèn)的配置格式如下:

import redis

# For standalone use.
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

PIPELINE_KEY = '%(spider)s:items'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
# Sane connection defaults.
REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

SCHEDULER_QUEUE_KEY = '%(spider)s:requests'
SCHEDULER_QUEUE_CLASS = 'scrapy_redis.queue.PriorityQueue'
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_redis.dupefilter.RFPDupeFilter'

START_URLS_KEY = '%(name)s:start_urls'
START_URLS_AS_SET = False

serialize_func是序列化Item的函數(shù)默認(rèn)是json.dumps。

那我們對(duì)其工作原理就有了大致的認(rèn)識(shí),pipeline初始化傳入一個(gè)Redis客戶端、一個(gè)key、一個(gè)序列化函數(shù);繼續(xù)向下看,from_settings、from_crawler都是讀取setting.py文件,完成初始化,核心函數(shù)process_item在下。

process_item源碼為:

 def process_item(self, item, spider):
        return deferToThread(self._process_item, item, spider)

    def _process_item(self, item, spider):
        key = self.item_key(item, spider)
        data = self.serialize(item)
        self.server.rpush(key, data)
        return item

    def item_key(self, item, spider):
        """Returns redis key based on given spider.

        Override this function to use a different key depending on the item
        and/or spider.

        """
        return self.key % {'spider': spider.name}

process_主要實(shí)現(xiàn)了一個(gè)deferToThread方法,該方法作用是返回一個(gè)deferred對(duì)象,不過回調(diào)函數(shù)在另一個(gè)線程處理,主要用于數(shù)據(jù)庫/文件讀取操作。繼續(xù)看deferToThread

def deferToThread(f, *args, **kwargs):
    """
    Run a function in a thread and return the result as a Deferred.

    @param f: The function to call.
    @param *args: positional arguments to pass to f.
    @param **kwargs: keyword arguments to pass to f.

    @return: A Deferred which fires a callback with the result of f,
    or an errback with a L{twisted.python.failure.Failure} if f throws
    an exception.
    """
    from twisted.internet import reactor
    return deferToThreadPool(reactor, reactor.getThreadPool(),
                             f, *args, **kwargs)

主要使用twisted.internet的 reactor模式,反應(yīng)堆(reactor)模式,這種模式在單線程環(huán)境中調(diào)度多個(gè)事件源產(chǎn)生的事件到它們各自的事件處理例程中去,在這里實(shí)現(xiàn)一個(gè)線程池的效果,達(dá)到最后異步寫入的效果。

image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Array是 JavaScript 的原生對(duì)象,同時(shí)也是一個(gè)構(gòu)造函數(shù),可以用它生成新的數(shù)組。Array作為構(gòu)造函數(shù)...
    隔壁老王z閱讀 418評(píng)論 0 0
  • 藤躺在地上,抬頭看見好高的樹,看到被樹枝切碎的天空,還看見葉子在風(fēng)里抖動(dòng),搖曳著細(xì)碎的陽光。 藤使勁抬頭,想站起來...
    土蕃小樹閱讀 382評(píng)論 0 0

友情鏈接更多精彩內(nèi)容