分布式爬取豆瓣電影

1 前言

前一陣子看了不少關(guān)于分布式爬蟲系統(tǒng)的設(shè)計(jì)相關(guān)的博客,現(xiàn)在也想寫個(gè)練練手,就拿大家都喜歡看的豆瓣電影做個(gè)測試好了,代碼的框架結(jié)構(gòu)如圖所示


分布式結(jié)構(gòu)圖.png

編程之前需要熟悉:

  1. redis基本安裝和使用(python redis庫)
  2. MongoDB基本安裝和使用(python mongoengine庫)
  3. RabbitMQ消息隊(duì)列的基本安裝和使用(pyhton pika庫)
  4. Linux系統(tǒng)的screen 命令 ?。。》浅1阌趘ps管理
    服務(wù)端程序基于python3 開發(fā)
    爬蟲客戶端基于python3和scrapy開發(fā)
    開發(fā)之前研究了下豆瓣的電影類目下網(wǎng)頁格式
https://movie.douban.com/j/new_search_subjects?sort=T&range=0,10&tags=電影&start=7100

start 從 0 到9979,指的是第一條數(shù)據(jù)的序號,每次會(huì)返回20條數(shù)據(jù),總共有1萬條電影信息,我們請求的返回格式如下


請求返回的格式

,然后響應(yīng)數(shù)據(jù)的url,就可以通過
bloom-filter過濾后存到我們新的任務(wù)隊(duì)列中,理想狀態(tài)下 100500次請求后,我們的數(shù)據(jù)里就會(huì)有10000條電影信息了(實(shí)際上爬出來了9982條和18條404被和諧的,然而豆瓣反爬真的很厲害,兩臺(tái)機(jī)器爬了一天多才完成任務(wù),速度問題后面會(huì)講,主要是筆者沒有穩(wěn)定的ip池,免費(fèi)的不好用以及客戶端太少并且豆瓣ip訪問頻率過高就返回302或403的反爬蟲策略太為嚴(yán)格導(dǎo)致的。。。插句題外話,爬小電影網(wǎng)站時(shí)不用分布式,一臺(tái)機(jī)器一天就爬了13萬個(gè)左右番號信息)

2 代碼基本解析

爬取數(shù)據(jù)比較重要的地方有下面幾塊
1.redis的使用 (任務(wù)進(jìn)出管理和bloom-filter)
2.mongoDB的使用 (電影數(shù)據(jù)存儲(chǔ)和記錄未完成信息)
3.RabbitMQ的使用 (用于爬蟲客戶端和服務(wù)端rpc通訊,發(fā)布和完成任務(wù))

2.1 redis 管理任務(wù)和去重

筆者將url任務(wù)分為a和b兩個(gè)優(yōu)先級,a>b ,將啟動(dòng)的url(豆瓣的電影列表) 存在 arank的redis set里面,爬下來的電影詳情url 經(jīng)過bloom-filter去重后存到brank的redis set里面。
大體代碼如下
redis_controller.py

#encoding=utf-8
import datetime
import traceback
from collections import Iterable

import redis
from hashlib import md5

from sql_model import monogo_controller

#連接redis
pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)
arank_str = 'arank'
brank_str = 'brank'
url_limit = 5
r = redis.Redis(connection_pool=pool)


def get_out_urls():
    '''
    取出給爬蟲客戶端的任務(wù)url
    至多五個(gè)
    :return:
    '''
    arank_data_len = r.scard(arank_str)
    outdata = []
    #從arank等級的redis里面尋找是否有任務(wù)
    if arank_data_len > 0:
        for i in range(url_limit):
            popdata = r.spop(arank_str)
            if popdata is not None:
                outdata.append(popdata)
                try:
                    #返回給客戶端時(shí),將未完成任務(wù)存在mongoDB中
                    #完成后再刪除
                    monogo_controller.TempJob(_id=popdata,work_start=datetime.datetime.now()).save(force_insert=True)
                except:
                    traceback.print_exc()
                    pass
            else:
                break
    #arank等級的redis里面沒有任務(wù)時(shí)
    # 尋找brank是否有任務(wù)
    elif r.scard(brank_str) > 0:
        for i in range(url_limit):
            popdata = r.spop(brank_str)
            if popdata is not None:
                outdata.append(popdata)
                try:
                    monogo_controller.TempJob(_id=popdata,work_start=datetime.datetime.now()).save(force_insert=True)
                except:
                    traceback.print_exc()
                    pass

            else:
                break
    #arank和brank里面都沒有任務(wù)時(shí),取出mongodb里面超過1h還未完成的任務(wù)
    else:
        
        for mogoi in range(5):
            timejobs = monogo_controller.TempJob.objects(
                work_start__lt=(datetime.datetime.now() - datetime.timedelta(hours=1))
            ).limit(1).modify(work_start=datetime.datetime.now())
            if not timejobs:
                break
            outdata.append(timejobs._id)
    return outdata

def puturl(rank,urls):
    '''
    將任務(wù)存入redis
    :param rank: 任務(wù)等級
    :param urls: 鏈接
    :return:
    '''
    assert isinstance(urls,Iterable)
    for url in urls:
        #進(jìn)行bloomfilter 過濾
        if not bf.isContains(url.encode()):
            bf.insert(url.encode())
            if rank==arank_str:
                r.sadd(arank_str,url)
            else:
                r.sadd(brank_str,url)
                
def puturl_safe(rank,urls):
    '''
    不經(jīng)過bloomfilter,直接將任務(wù)放入redis,用于手動(dòng)造初始化數(shù)據(jù)url
    :param rank:
    :param urls:
    :return:
    '''
    #安全的加入種子URL
    assert isinstance(urls,Iterable)
    for url in urls:
        if rank==arank_str:
            r.sadd(arank_str,url)
        else:
            r.sadd(brank_str,url)




class SimpleHash(object):
    '''
    bloomfilter使用的hash算法
    網(wǎng)上找到
    '''
    def __init__(self, cap, seed):
        self.cap = cap
        self.seed = seed

    def hash(self, value):
        ret = 0
        for i in range(len(value)):
            ret += self.seed * ret + ord(value[i])
        return (self.cap - 1) & ret


class BloomFilter(object):
    def __init__(self, blockNum=1, key='doubanbloomfilter'):
        """
        初始化布隆過濾器
        :param blockNum: one blockNum for about 90,000,000; if you have more strings for filtering, increase it.
        :param key: the key's name in Redis
        """
        self.server = r
        self.bit_size = 1 << 31  # Redis的String類型最大容量為512M,現(xiàn)使用256M
        self.seeds = [5, 7, 11, 13, 31, 37, 61]
        self.key = key
        self.blockNum = blockNum
        self.hashfunc = []
        for seed in self.seeds:
            self.hashfunc.append(SimpleHash(self.bit_size, seed))

    def isContains(self, str_input):
        '''
        str_input是否有,沒有的話會(huì)自動(dòng)入庫
        :param str_input: 
        :return: 
        '''
        if not str_input:
            return False
        m5 = md5()
        m5.update(str_input)
        str_input = m5.hexdigest()
        ret = True
        name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
        for f in self.hashfunc:
            loc = f.hash(str_input)
            ret = ret & self.server.getbit(name, loc)
        return ret

    def insert(self, str_input):
        '''
        將hash出來的幾個(gè)指存入redis數(shù)據(jù)庫中的bit中
        :param str_input: 
        :return: 
        '''
        m5 = md5()
        m5.update(str_input)
        str_input = m5.hexdigest()
        name = self.key + str(int(str_input[0:2], 16) % self.blockNum)
        for f in self.hashfunc:
            loc = f.hash(str_input)
            self.server.setbit(name, loc, 1)

bf = BloomFilter()

2.2 mongoDB(電影數(shù)據(jù)存儲(chǔ)和記錄未完成信息)

數(shù)據(jù)庫使用的是mongoDB,用python mongoengine 插件 進(jìn)行ORM管理,數(shù)據(jù)類如下

#電影數(shù)據(jù)類
import datetime

import mongoengine
import time
from mongoengine import StringField,DateTimeField,ListField,LongField,FloatField,IntField
#連接MongoDB
mongoengine.connect('douban',username='pig',password='pig@123456',authentication_source="admin")

class TempJob(mongoengine.Document):
    '''
    未完成任務(wù)數(shù)據(jù)ORM類
    '''
    _id= StringField(required=True,unique=True,primary_key=True)
    work_start=DateTimeField(required=True,default=datetime.datetime.now())

class MoiveDataModel(mongoengine.Document):
    '''
    電影數(shù)據(jù)類
    '''
    director= ListField(StringField())
    douban_id=LongField(unique=True,primary_key=True,required=True)
    tags=ListField(StringField())
    stars=ListField(StringField())
    desc=StringField(required=True)
    douban_remark=FloatField()
    imdb_tag=StringField()
    contry=StringField()
    language=StringField()
    publictime=DateTimeField()
    runtime=IntField()
    votes=IntField()
    title=StringField(required=True)


def delete(urls):
    #完成任務(wù)后刪除
    for url in urls:
        TempJob.objects(_id=url).delete()
2.3 rabbitmq 實(shí)現(xiàn)爬蟲客戶端和主服務(wù)端進(jìn)行RPC通訊

通過rabbitmq 實(shí)現(xiàn)rpc的方式通訊,大體邏輯就是爬蟲客戶端通過rpc請求服務(wù)端分發(fā)任務(wù),同時(shí)告知服務(wù)端任務(wù)完成情況和爬取到的數(shù)據(jù)對象,服務(wù)端收到請求時(shí),數(shù)據(jù)存到需要存到的地方,并且從redis和mongoDB找到下一批任務(wù)返回客戶端
rpc 服務(wù)端代碼如下

import json
import traceback

import pika

from main_server_side import redis_controller
from sql_model import monogo_controller
from sql_model.monogo_controller import MoiveDataModel
#連接MQ
#確保消息queue建立
cred = pika.PlainCredentials(username='pig', password='pig123')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='xx.xxx.xxx.xxx', credentials=cred))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue_douban')




def on_request(ch, method, props, body):
    '''
    收到客戶端請求的回調(diào)
    :param ch:
    :param method:
    :param props:
    :param body:
    :return:
    '''
    try:
        print("send_data")
        jsondata = json.loads(body.decode())
        print(jsondata)
        done_urls=jsondata.get("done")
        rankstr=jsondata.get('rankstr')
        rankurls=jsondata.get("new_urls")
        if done_urls is not None:
            print("del done_urls")
            print(done_urls)
            monogo_controller.delete(done_urls)
        if rankurls is not None:
            redis_controller.puturl(rankstr,rankurls)

        response=redis_controller.get_out_urls()
        print("response is :")
        print(response)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(
                             correlation_id=props.correlation_id
                             , content_type='application/json',
                             content_encoding='utf-8'),
                         body=json.dumps({"isok":True,"ans": response}))
        ch.basic_ack(delivery_tag=method.delivery_tag)
        result_map=jsondata.get("result_map")
        if result_map is not None:
            for mogodata in result_map:
                try:
                    print(type(mogodata))
                    MoiveDataModel(**mogodata).save()
                except:
                    traceback.print_exc()
                    pass

    except Exception as e:
        traceback.print_exc()

#設(shè)置每次只處理一次請求(單線程)
channel.basic_qos(prefetch_count=1,)
# 監(jiān)聽rpc_queue_douban
channel.basic_consume(on_request, queue='rpc_queue_douban')

print(" Awaiting DOUBAN RPC requests")
#等待請求
channel.start_consuming()

對應(yīng)的rpc客戶端設(shè)計(jì)如下

#!/usr/bin/env python
#encoding=utf-8
import json
import uuid

import pika

class RPCClient(object):
    
    def __init__(self):
        self.credentials = pika.PlainCredentials('pig', 'pig123')
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='xx.xx.xx.xx', credentials=self.credentials))
        self.channel = self.connection.channel()
        #設(shè)置回調(diào)為匿名唯一queue
        result_queue=self.channel.queue_declare(exclusive=True)
        self.callback_queue_name=result_queue.method.queue
        self.channel.basic_consume(self.onresponse,self.callback_queue_name,no_ack=True)
        self.responsedata=None
    def onresponse(self,channel, method, properties, body):
        if self.corrid == properties.correlation_id:
            self.responsedata=body
    
    def call(self,query_dict):
        #correlation_id生成一個(gè)uuid
        self.corrid=str(uuid.uuid4())
        
        self.channel.basic_publish(exchange='',routing_key='rpc_queue_douban',body=json.dumps(query_dict)
                                   ,properties=pika.BasicProperties(content_type='application/json',content_encoding='utf-8'
                                                                    ,correlation_id=self.corrid,reply_to=self.callback_queue_name))
        while self.responsedata is None:
            self.connection.process_data_events(time_limit=None)
        backresponse=self.responsedata
        self.responsedata=None
        return json.loads(backresponse.decode())

2.4爬蟲客戶端 scrapy接入RPC

scrapy客戶端利用rpc通訊從服務(wù)端拿到任務(wù),通過xpath解析頁面拿到數(shù)據(jù),代碼如下

# -*- coding: utf-8 -*-
import json
import random

import re
import scrapy
import time
import logging
from urllib.parse import unquote
from scrapy import Request

from scrapy_client_side.scrapy_client_side.client_side import RPCClient

logging.basicConfig(filename='douban_spider.log', filemode="a", level=logging.ERROR)


class DoubanSpider(scrapy.Spider):
    name = 'douban_spider'
    urlpre = "https://movie.douban.com/"
    done_urls = []
    result_map = []
    rankstr = None
    new_urls = []
    #豆瓣觸發(fā)反爬機(jī)制時(shí)會(huì)返回403和302
    #這種時(shí)候爬蟲暫停兩個(gè)小時(shí)再爬取基本沒有異常
    handle_httpstatus_list = [403,302]
    
    
    
    def start_requests(self):
        while (True):
            #rpc請求成功后隨機(jī)停止30-60s,降低促發(fā)反爬蟲的概率
            if self.rankstr is None:
                try:
                    rpc_response = RPCClient().call({"query": "start"})
                except:
                    #rpc有時(shí)會(huì)和服務(wù)端連接失敗,等待1分后重試
                    time.sleep(60)
                    continue
            else:
                try:
                    rpc_response = RPCClient().call(
                        {"done": self.done_urls, "rankstr": self.rankstr, "new_urls": self.new_urls,
                         "result_map": self.result_map})
                    print("get data from server sleep ")
                    time.sleep(random.randint(30,40))
                except:
                    time.sleep(random.randint(55,65))
                    continue
            try:
                ansurls = rpc_response.get("ans")
                print("ansis:")
                print(ansurls)
                #每次將數(shù)據(jù)rpc提交給服務(wù)端后清理掉
                self.done_urls = []
                self.rankstr = None
                self.new_urls = []
                self.result_map = []
                if not ansurls :
                    time.sleep(30)
                else:
                    for url in ansurls:
                        print("yield")
                        yield Request(self.urlpre + url, callback=self.parse,errback=self.errback_httpbin)
            except:
                time.sleep(30)
                pass

    def errback_httpbin(self, failure):
        print(repr(failure))
    
    def parse(self, response):
        if not response.status==200:
            time.sleep(7200)
            yield Request(response.url, callback=self.parse,errback=self.errback_httpbin)
        elif response.url.count(r'j/new_search_subjects') > 0:
            resjson = json.loads(response.text)
            urls = (unquote(data.get("url").replace("https://movie.douban.com/","")) for data in resjson.get('data'))
            self.new_urls.extend(urls)
            self.rankstr = 'brank'
            self.done_urls.append(unquote(response.url.replace("https://movie.douban.com/","")))
        elif response.url.count(r'subject/') > 0:
            try:
                response_dict = {}
                # director = ListField(StringField)
                # douban_id = LongField(unique=True, primary_key=True, required=True)
                # tags = ListField(StringField)
                # stars = ListField(StringField)
                # desc = StringField(required=True)
                # douban_remark = FloatField()
                # imdb_tag = FloatField()
                # contry = StringField()
                # language = StringField()
                # publictime = DateTimeField()
                # runtime = IntField()
                # votes = IntField()
                response_dict["director"] = response.xpath("http://a[contains(@rel,'v:directedBy')]/text()").extract()
                response_dict["douban_id"] = int(response.xpath("http://a[@share-id]/@share-id").get())
                response_dict["tags"] = response.xpath("http://div[contains(@class,'tags-body')]/a/text()").extract()
                response_dict["stars"] = response.xpath("http://a[contains(@rel,'v:starring')]/text()").extract()
                response_dict["desc"] = "".join(
                    response.xpath("http://span[contains(@property,'v:summary')]/text()").extract()).replace("\u3000", " ")
                response_dict["douban_remark"] = float(
                    response.xpath("http://strong[contains(@property,'v:average')]/text()").get())
                response_dict["imdb_tag"] = response.xpath("http://a[contains(@href,'imdb')]/text()").get()
    
                response_dict["contry"] = response.xpath("http://span[contains(text(),'制片國家')]/following-sibling::text()").get()
    
                response_dict["language"] = response.xpath("http://span[contains(text(),'語言')]/following-sibling::text()").get()
    
                str = response.xpath("http://span[contains(@property,'v:initialReleaseDate')]/text()").get()
                try:
                    timestr = re.findall(r"\d{4}-\d{2}-\d{2}", str)[0]
                    response_dict["publictime"] = timestr
                except:
                    pass

                # 時(shí)長
                try:
                    response_dict["runtime"] = int(response.xpath("http://span[contains(@property,'v:runtime')]/@content").get())
                except:
                    response_dict["runtime"]=-1
                    pass

                response_dict["votes"] = int(response.xpath("http://span[contains(@property,'v:votes')]/text()").get())
                response_dict["title"] =response.xpath("http://title/text()").get().replace("\n","").replace('(豆瓣)',"").strip()
                print(response_dict)
                self.rankstr=""
                self.result_map.append(response_dict)
                self.done_urls.append(unquote(response.url.replace("https://movie.douban.com/", "")))
            except Exception as e:
                logging.exception("spider parse error")
                pass
        

可以通過python代碼調(diào)用爬蟲啟動(dòng),并且設(shè)置setting項(xiàng)

from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

from scrapy_client_side.scrapy_client_side.spiders.douban_spider import DoubanSpider
s=get_project_settings()
s.set("USER_AGENT",'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:58.0) Gecko/20100101 Firefox/58.0')
s.set("ROBOTSTXT_OBEY" , False)
DOWNLOAD_DELAY = 10
RANDOMIZE_DOWNLOAD_DELAY = True
s.set('DOWNLOAD_DELAY',DOWNLOAD_DELAY)
s.set('RANDOMIZE_DOWNLOAD_DELAY',RANDOMIZE_DOWNLOAD_DELAY)
s.set('CONCURRENT_REQUESTS',1)
s.set('DOWNLOAD_TIMEOUT',60)
process1 = CrawlerProcess(s)
process1.crawl(DoubanSpider)
process1.start()
2.5造初始化的url數(shù)據(jù)
from main_server_side import redis_controller

urlstep=[]
for i in range(0,9981,20):
    if i==9980:
        num=9979
    else:
        num=i
    urlstep.append("j/new_search_subjects?sort=T&range=0,10&tags=電影&start=%s"%(num))
redis_controller.puturl_safe(redis_controller.arank_str,urlstep)

關(guān)鍵代碼就是這些了,使用時(shí)稍微組織下代碼結(jié)構(gòu)就可以了,
scrapy項(xiàng)目用scrapy startproject xxxx 命令生成,直接python -m的方式啟動(dòng)rpc服務(wù)端代碼 和控制爬蟲的python腳本代碼運(yùn)行

3后記

分布式爬取數(shù)據(jù)筆者認(rèn)為解決了帶寬和ip限制的問題,在這種情況下爬取效率和vps數(shù)量成正比,因?yàn)閭€(gè)人vps空間不足沒有將下載的網(wǎng)頁緩存到主服務(wù)器或者別的oss服務(wù)器上(這一步筆者認(rèn)為是比較重要的,因?yàn)榫彺嫦聛砗?,?dāng)有別的字段要解析時(shí)速度快多)。這里寫一下也是記錄下自己的設(shè)計(jì)思路,也和各位讀者朋友探討下技術(shù)吧

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Scrapy,Python開發(fā)的一個(gè)快速,高層次的屏幕抓取和web抓取框架,用于抓取web站點(diǎn)并從頁面中提取結(jié)構(gòu)化...
    Evtion閱讀 6,192評論 12 18
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,635評論 19 139
  • 近年來隨人民生活水平的不斷提高,追求文化藝術(shù)的要求也逐漸成為經(jīng)濟(jì)發(fā)展的一個(gè)亮點(diǎn),中國國玉新疆和田籽玉是一枝獨(dú)秀。然...
    三哥說玉閱讀 441評論 0 0
  • 一個(gè)大腹便便身材的禿頭中年人走在車來車往的街邊。凌晨兩點(diǎn),這條街燈火通明,中年人抬頭嗦了一口氣看見了夜被...
    反面閱讀 261評論 0 0
  • 早起情況 這兩天都是11點(diǎn)就醒了,但是沒有起床,今天因?yàn)橘I的耳機(jī)到了,特意起床去拿的,但還是拖了50分鐘才洗漱完畢...
    Marco_Deng閱讀 401評論 0 0

友情鏈接更多精彩內(nèi)容