scrapy使用kafka

參考https://github.com/tenlee2012/scrapy-kafka-redis

Scrpay-Kafka-Redis

在有大量請求堆積的情況下,即使用了Bloomfilter算法,使用scrapy-redis仍然會占用大量內(nèi)存,本項目參考scrapy-redis

特點

  • 支持分布式
  • 使用Redis作為去重隊列
    同時使用Bloomfilter去重算法,降低了內(nèi)存占用,但是增加了可去重數(shù)量
  • 使用Kafka作為請求隊列
    可支持大量請求堆積,容量和磁盤大小相關(guān),而不是和運行內(nèi)存相關(guān)
  • 由于Kafka的特性,不支持優(yōu)先隊列,只支持先進先出隊列

依賴

  • Python 3.0+
  • Redis >= 2.8
  • Scrapy >= 1.5
  • kafka-python >= 1.4.0

使用

  • pip install scrapy-kafka-redis
  • 配置settings.py
    必須要添加在settings.py的內(nèi)容
# 啟用Kafka調(diào)度存儲請求隊列
SCHEDULER = "scrapy_kafka_redis.scheduler.Scheduler"

# 使用BloomFilter作為去重隊列
DUPEFILTER_CLASS = "scrapy_kafka_redis.dupefilter.BloomFilter"

其他可選參數(shù)的默認(rèn)值

# 單獨使用情況下,去重隊列在redis中存儲的key
DUPEFILTER_KEY = 'dupefilter:%(timestamp)s'

REDIS_CLS = redis.StrictRedis
REDIS_ENCODING = 'utf-8'
REDIS_URL = 'redis://localhost:6378/1'

REDIS_PARAMS = {
    'socket_timeout': 30,
    'socket_connect_timeout': 30,
    'retry_on_timeout': True,
    'encoding': REDIS_ENCODING,
}

KAFKA_BOOTSTRAP_SERVERS=['localhost:9092']
# 調(diào)度隊列的默認(rèn)TOPIC
SCHEDULER_QUEUE_TOPIC = '%(spider)s-requests'
# 默認(rèn)使用的調(diào)度隊列
SCHEDULER_QUEUE_CLASS = 'scrapy_kafka_redis.queue.KafkaQueue'
# 去重隊列在redis中存儲的key名
SCHEDULER_DUPEFILTER_KEY = '%(spider)s:dupefilter'
# 調(diào)度器使用的去重算法
SCHEDULER_DUPEFILTER_CLASS = 'scrapy_kafka_redis.dupefilter.BloomFilter'
# BloomFilter的塊個數(shù)
BLOOM_BLOCK_NUM = 1

# start urls使用的TOPIC
START_URLS_TOPIC = '%(name)s-start_urls'

KAFKA_BOOTSTRAP_SERVERS = None
# 構(gòu)造請求隊列的Kafka生產(chǎn)者
KAFKA_REQUEST_PRODUCER_PARAMS = {
    'api_version': (0, 10, 1),
    'value_serializer': dumps
}
# 構(gòu)造請求隊列的Kafka消費者
KAFKA_REQUEST_CONSUMER_PARAMS = {
    'group_id': 'requests',
    'api_version': (0, 10, 1),
    'value_deserializer': loads
}
# 構(gòu)造開始隊列的Kafka消費者
KAFKA_START_URLS_CONSUMER_PARAMS = {
    'group_id': 'start_url',
    'api_version': (0, 10, 1),
    'value_deserializer': lambda m: m.decode('utf-8'),
}
  • spiders 使用
import scrapy
from scrapy_kafka_redis.spiders import KafkaSpider

class DemoSpider(KafkaSpider):
    name = "demo"
    def parse(self, response):
        pass
  • 創(chuàng)建Topic
    根據(jù)需要創(chuàng)建的分布式scrapy實例,設(shè)置topic的分區(qū)數(shù),比如
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic demo-start_urls

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --replication-factor 1 --topic demo-requests
  • 發(fā)送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo-start_urls

建議手動創(chuàng)建Topic并指定分區(qū)數(shù)

  • 運行分布式scrapy

參考:

scrapy-redis
Bloomfilter

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容