
基于公司最近的業(yè)務,需要分析網(wǎng)絡輿情,得到較為準確的信息,需要開發(fā)一款通用式爬蟲,支持貼吧、微信、百度、域名、指定貼吧、指定關鍵字等抓取。本人故開發(fā)此項爬蟲代碼。
1.開發(fā)依賴環(huán)境 scrapy_redis+chrome+selenium(phantomjs也可以)
2.開發(fā)環(huán)境安裝(詳情見我的上一篇文章)
3.架構流程詳解
-
獲取需要抓取的關鍵字、域名、指定貼吧名字等建立一個mysql初始任務數(shù)據(jù)表,然后將這些數(shù)據(jù)插入到mysql中。詳情見下圖:
image.png
image.png
可以根據(jù)自己的需求進行配置。
2.爬蟲啟動過程中會去mysql拿下發(fā)任務進行抓取,crawled=0表示未抓取的任務,爬蟲成功啟動后會將crawled字段置為1表示任務開始抓取。當此任務結束時,爬蟲在調(diào)用close_spider函數(shù)時,會將crawled狀態(tài)更新為2,表示此項任務抓取結束。
st_status表示動靜態(tài)開關切換,0表示靜態(tài)、1表示動態(tài)。
crawltasktime:任務下發(fā)時間。
engine:1.表示微信搜狗引擎、2.表示百度引擎、3.表示貼吧 0.表示抓取域名。
keyword:表示抓取指定關鍵字。
domain: 表示抓取的指定域名。
depth:爬蟲抓取深度限制。
width:爬蟲抓取寬度限制。
accesspoint:CMWAP、CMNET爬取模式,多種模式中用“,”分隔,默認為CMNET。
totalpages:爬蟲抓取頁面數(shù)量限制。
CrawlFrequency:對任務爬取的頻率 0表示爬取一次,1表示無線循環(huán)抓取。
cycletime:爬蟲爬取時間限制,單位秒
repeattimes:爬蟲請求失敗重復請求次數(shù)設置。
interval:爬蟲重復撥測時間間隔。
company:任務所屬公司設置。
3.將抓取的文本、圖片的數(shù)據(jù)存入mongodb或者mysql數(shù)據(jù)庫。
4.進行數(shù)據(jù)分析提取相關數(shù)據(jù)。
4.scheduler 代碼開發(fā)詳解
因為我們所有的任務都是使用的同一款爬蟲,所以爬蟲在沒有請求的時候,我們不讓他繼續(xù)等待。故修改scrapy_redis源碼,讓爬蟲在沒有請求的情況下,自動等待15秒后關閉爬蟲。避免浪費內(nèi)存資源,節(jié)省空間!首先找到scrapy_redis的scheduler.py這個文件夾。這個文件里面的代碼主要作用就是收集指紋和處理請求,保證一個請求按一定(settings配置中的規(guī)則)的規(guī)則去消耗。
/usr/local/lib/python2.7/dist- packages/scrapy_redis/scheduler.py這是源代碼的路徑。多的不說,直接上圖。
# coding=utf-8
import redis
import importlib
import six
import datetime
import time
from scrapy.utils.misc import load_object
import os
from . import connection, defaults
# TODO: add SCRAPY_JOB support.
class Scheduler(object):
"""Redis-based scheduler
Settings
--------
SCHEDULER_PERSIST : bool (default: False)
Whether to persist or clear redis queue.
SCHEDULER_FLUSH_ON_START : bool (default: False)
Whether to flush redis queue on start.
SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)
How many seconds to wait before closing if no message is received.
SCHEDULER_QUEUE_KEY : str
Scheduler redis key.
SCHEDULER_QUEUE_CLASS : str
Scheduler queue class.
SCHEDULER_DUPEFILTER_KEY : str
Scheduler dupefilter redis key.
SCHEDULER_DUPEFILTER_CLASS : str
Scheduler dupefilter class.
SCHEDULER_SERIALIZER : str
Scheduler serializer.
"""
#++++
lostGetRequest = 0
def __init__(self, server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None):
"""Initialize scheduler.
Parameters
----------
server : Redis
The redis server instance.
persist : bool
Whether to flush requests when closing. Default is False.
flush_on_start : bool
Whether to flush requests on start. Default is False.
queue_key : str
Requests queue key.
queue_cls : str
Importable path to the queue class.
dupefilter_key : str
Duplicates filter key.
dupefilter_cls : str
Importable path to the dupefilter class.
idle_before_close : int
Timeout before giving up.
"""
if idle_before_close < 0:
raise TypeError("idle_before_close cannot be negative")
self.server = server
self.persist = persist
self.flush_on_start = flush_on_start
self.queue_key = queue_key
self.queue_cls = queue_cls
self.dupefilter_cls = dupefilter_cls
self.dupefilter_key = dupefilter_key
self.idle_before_close = idle_before_close
self.serializer = serializer
self.stats = None
def __len__(self):
return len(self.queue)
@classmethod
def from_settings(cls, settings):
kwargs = {
'persist': settings.getbool('SCHEDULER_PERSIST'),
'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),
'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),
}
# If these values are missing, it means we want to use the defaults.
optional = {
# TODO: Use custom prefixes for this settings to note that are
# specific to scrapy-redis.
'queue_key': 'SCHEDULER_QUEUE_KEY',
'queue_cls': 'SCHEDULER_QUEUE_CLASS',
'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',
# We use the default setting name to keep compatibility.
'dupefilter_cls': 'DUPEFILTER_CLASS',
'serializer': 'SCHEDULER_SERIALIZER',
}
for name, setting_name in optional.items():
val = settings.get(setting_name)
if val:
kwargs[name] = val
# Support serializer as a path to a module.
if isinstance(kwargs.get('serializer'), six.string_types):
kwargs['serializer'] = importlib.import_module(kwargs['serializer'])
server = connection.from_settings(settings)
# Ensure the connection is working.
server.ping()
return cls(server=server, **kwargs)
@classmethod
def from_crawler(cls, crawler):
instance = cls.from_settings(crawler.settings)
# FIXME: for now, stats are only supported from this constructor
instance.stats = crawler.stats
return instance
def open(self, spider):
self.spider = spider
print 'spider.name is %s' %(spider.redis_key)
try:
self.queue = load_object(self.queue_cls)(
server=self.server,
spider=spider,
#key=self.queue_key % {'spider': spider.name},
key=self.queue_key % {'spider': spider.host},
serializer=self.serializer,
)
except TypeError as e:
raise ValueError("Failed to instantiate queue class '%s': %s",
self.queue_cls, e)
try:
self.df = load_object(self.dupefilter_cls)(
server=self.server,
#key=self.dupefilter_key % {'spider': spider.name},
key=self.dupefilter_key % {'spider': spider.host},
debug=spider.settings.getbool('DUPEFILTER_DEBUG'),
)
except TypeError as e:
raise ValueError("Failed to instantiate dupefilter class '%s': %s",
self.dupefilter_cls, e)
if self.flush_on_start:
self.flush()
# notice if there are requests already in the queue to resume the crawl
if len(self.queue):
spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
def close(self, reason):
if not self.persist:
self.flush()
def flush(self):
self.df.clear()
self.queue.clear()
def enqueue_request(self, request):
if not request.dont_filter and self.df.request_seen(request):
self.df.log(request, self.spider)
return False
if self.stats:
self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)
self.queue.push(request)
return True
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)
if request and self.stats:
self.lostGetRequest = 0
self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)
if request is None:
self.lostGetRequest += 1
print "request is None, lostGetRequest = {%s}, time = {%s}" %(self.lostGetRequest,datetime.datetime.now())
if self.lostGetRequest >= 15:
print "request is None, close spider."
self.spider.crawler.engine.close_spider(self.spider, 'queue is empty')
return request
def has_pending_requests(self):
return len(self) > 0
代碼很簡單,當spider被初始化時,同時會初始化一個對應的scheduler對象,這個調(diào)度器對象通過讀取settings,配置好自己的調(diào)度容器queue和判重工具dupefilter。爬蟲在運行過程過,會源源不斷的通過引擎來和調(diào)度器協(xié)調(diào)工作,調(diào)度器將引擎分發(fā)的請求經(jīng)過一系列的入隊列、指紋收集等處理后,按settings中設置的爬取策略分發(fā)給下載中間件去下載。而這份代碼我們主要設置了一個類變量用來進行計時。當next_request這個函數(shù)處理的請求不為空,我們就會重置他的狀態(tài)為0,當請求為空,我們就要進行一個15秒的計時。當計時結束,引發(fā)異常調(diào)用close_spider將爬蟲進行關閉。此代碼還修改了scrapy_redis默認的dupfilter和request 兩個key。
5.spider開發(fā)代碼詳解
# -*- coding: utf-8 -*-
import scrapy
from scrapy.http import Request, HtmlResponse
from bs4 import BeautifulSoup
import time
from urllib import urlencode
import re
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
import redis
from redistest.settings import REDIS_PORT, REDIS_HOST
import hashlib
import random
import os
import urllib
def soup_text(body):
try:
soup = BeautifulSoup(body, 'lxml')
for script in soup(["script", "style"]):
script.extract()
line = re.compile(r'\s+')
line = line.sub(r'', soup.body.getText())
#p2 = re.compile(u'[^\u4e00-\u9fa5]') # 中GDAC\u4e00\u9fa5
#str2 = p2.sub(r'', line)
outStr = line.strip(',')
except:
outStr = ''
return outStr
def rand5():
randnum = ""
for i in range(0, 5):
randnum += str(random.randint(0, 9))
return randnum
class BaiduSpider(scrapy.Spider):
name = 'redistest'
params = ''
keyword = ''
allowed_domains = []
sousu = ''
start_urls = []
datadupefilter = ''
filepath = ''
def __init__(self, param=None, *args, **kwargs):
super(BaiduSpider, self).__init__(*args, **kwargs)
self.data_conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=8)
self.yuqing = self.name+':item'
self.datadupefilter = 'datadupefilter'
self.params = eval(param)
if self.params['keyword'] != '0':
self.keyword = self.params['keyword'].decode('utf-8')
print self.params
self.sousu = self.params['engine']
self.filepath = "/home/YuQing/scrapy_yuqing/content"
if not os.path.exists(self.filepath):
os.mkdir(self.filepath)
if self.sousu == 1:
self.start_urls = ['http://weixin.sogou.com']
self.params['redis_key'] = None
elif self.sousu == 2:
self.start_urls = ['https://www.baidu.com']
self.params['redis_key'] = None
elif self.sousu == 0:
self.params['redis_key'] = None
self.start_urls = ['https://tieba.baidu.com/f?ie=utf-8&kw=%s&fr=search&' % self.params['keyword']]
else:
if self.params['redis_key']:
self.start_urls = [self.params['redis_key']]
if self.params['crosssitecrawl'] == 0 and self.params['redis_key']:
proto, rest = urllib.splittype(self.params['redis_key'])
host, rest = urllib.splithost(rest)
self.allowed_domains = [host.replace('www.','')]
print self.start_urls,self.sousu,self.params['redis_key'],self.allowed_domains
def make_requests_from_url(self, url):
if self.params['st_status'] == 1:
return Request(url, meta={'keyword': self.keyword, 'engine':self.sousu, 'phantomjs':True})
else:
return Request(url)
def parse(self, response):
# 判斷頁面的返回狀態(tài)
if int(response.status) >= 200 and int(response.status) < 400:
if not self.params['redis_key'] and self.sousu:
a_list = response.xpath('//h3/a/@href').extract()
for url in a_list:
if url.startswith('http://') != True and url.startswith('https://') !=True:
url = response.urljoin(url)
yield scrapy.Request(url=url, meta={'url':response.url}, callback=self.parse_url)
if response.meta.has_key('page') != True and self.sousu == 2:
flag = 1
for next_url in response.xpath('//div[@id="page"]/a/@href').extract():
if next_url.startswith('http://') != True and next_url.startswith('https://') !=True:
nextUrl = self.start_urls[0] + next_url
regex = 'pn=(\d+)'
page_number = re.compile(regex).search(nextUrl).group(1)
if page_number and flag:
flag = 0
# 抓取前50頁
for page in range(10,500,10):
next_page = 'pn=' + str(page)
old_page = re.compile(regex).search(nextUrl).group()
nextUrl = nextUrl.replace(old_page, next_page)
yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse)
if response.meta.has_key('page') != True and self.sousu == 1:
flag = 1
for next_url in response.xpath('//div[@class="p-fy"]/a/@href').extract():
if next_url.startswith('http://') != True and next_url.startswith('https://') !=True:
nextUrl = self.start_urls[0] + '/weixin' + next_url
regex = 'page=(\d+)'
page_number = re.compile(regex).search(nextUrl).group(1)
if page_number and flag:
flag = 0
for page in range(2,3):
next_page = 'page=' + str(page)
old_page = re.compile(regex).search(nextUrl).group()
nextUrl = nextUrl.replace(old_page, next_page)
yield scrapy.Request(url=nextUrl, meta={'page':page}, callback=self.parse)
# 支持貼吧抓取
elif not self.params['redis_key'] and not self.sousu:
for page in range(0, 200, 50):
pn = urlencode({'pn':page})
next_page = response.url + pn
yield scrapy.Request(next_page, callback=self.parse_tieba)
else:
self.parse_url(response)
print response.url
a_list = response.xpath('//a/@href').extract()
for linkstr in a_list:
if linkstr.startswith('http://') != True and linkstr.startswith('https://') !=True:
linkstr = response.urljoin(linkstr)
if 'about:blank' != linkstr and linkstr.lower().endswith('.rar') != True and linkstr.lower().endswith('.apk') != True and linkstr.startswith('tel') != True and linkstr.lower().endswith('.css') != True and linkstr.lower().endswith('.js') != True:
yield scrapy.Request(url=linkstr, meta={'url':response.url}, callback=self.parse)
def parse_tieba(self, response):
with open('aa', 'a') as f:
f.write(response.url+'\n')
regex = 'href="(.*?)"'
a_list = re.compile(regex).findall(response.body)
for url in a_list:
if len(url) >= 5 and 'javascript' not in url and 'css' not in url and url.startswith('/p'):
if url.startswith('http:') != True and url.startswith('https:') != True:
url = response.urljoin(url)
yield scrapy.Request(url, meta={'url':response.url}, callback=self.parse_url)
def parse_dupefilter(self, response):
try:
a_list = response.xpath("http://a/@href").extract()
data = soup_text(response)
data = data + str(a_list)
except Exception, e:
data = str(e)
print data
if data:
m = hashlib.md5()
m.update(data)
data_md5 = m.hexdigest()
return data_md5
else:
return ''
def parse_text(self, response):
item = {}
try:
father_url = response.meta["url"]
except:
father_url = "''"
try:
item['title'] = response.xpath('//title/text()').extract_first().replace('\r\n','').replace('\n','')
except:
item['title'] = "''"
item['url'] = response.url
item['domain'] = ''
item['crawl_time'] = time.strftime('%Y%m%d%H%M%S')
item['keyword'] = ''
item['Type_result'] = ''
item['type'] = 'html'
item['filename'] = 'yq_' + str(int(time.time())) + '_0' + str(rand5())+'.txt'
item['referver'] = father_url
item['like'] = ''
item['transpond'] = ''
item['comment'] = ''
item['publish_time'] = ''
return item
def parse_url(self, response):
# 以內(nèi)容做指紋
data_md5 = self.parse_dupefilter(response)
if self.data_conn.sadd(self.datadupefilter, data_md5):
content = soup_text(response.body)
print content
item = self.parse_text(response)
self.data_conn.lpush(self.yuqing, item)
yuqing_file = os.path.join(self.filepath, item['filename'])
with open(yuqing_file, 'w') as b:
b.write(content)
def pang_bo(self, response):
# 過略掉百度網(wǎng)頁
if 'baidu.com' not in response.url:
item = self.parse_text(response)
content = soup_text(response.body)
if len(content) > 3000:
content = content[:3000]
body = item['url']+','+item['crawl_time']+','+item['title'].replace(',','') +','+content+'\n'
filename = time.strftime('%Y%m%d%H')+'.csv'
with open(filename, 'a') as f:
f.write(body)
啟動爬蟲的時候我們會以 scrapy crawl spider -a="{}" 的形式,給他傳遞一個json字符串,在spider 的init中,我們繼承了BaiduSpider原有的初始化,并重寫我們需要的東西。在init方法中,我們提取到我們從外面?zhèn)魅氲臄?shù)據(jù),并進行處理。連接數(shù)據(jù)庫、引擎選擇、跨域開關設置、redis_key配置等。
修改make_request_from_url這個函數(shù),此函數(shù)會傳遞一個url參數(shù)給Request對象。在這個過程過,我們會根據(jù)傳遞的動靜態(tài)開關,給Request對象添加一個meta字典。這樣我們在middlewares中間件中,通過process_request方法,提取到該請求攜帶的參數(shù),來啟動selenium+chrome來進行抓取。process_request函數(shù)是一個類方法,它默認攜帶request和spider兩個對象。我們可以通過request.meta來獲取剛剛傳遞的數(shù)據(jù),詳情如下圖:

接下來詳細解說parse代碼:
請查閱scrapy_redis抓取百度貼吧、微信文章、微信公眾號、域名等通用式爬蟲(二)

