目標(biāo)
在大規(guī)模爬取數(shù)據(jù)前,先定一個能達到的小目標(biāo),比方說先爬個10萬條數(shù)據(jù)。
爬蟲爬數(shù)據(jù)太慢了,怎么爬快點?
程序中途中斷了怎么辦,好不容易爬了這么多數(shù)據(jù),又要重頭開始爬嗎/(ㄒoㄒ)/
數(shù)據(jù)有重復(fù)的,占用多余的空間,影響統(tǒng)計怎么辦?
這些都是剛開始爬取大規(guī)模數(shù)據(jù)都會遇到的問題,這次就來說說解決這些問題的思路。
涉及的知識點如下:
- 多線程的生產(chǎn)者和消費者模型
- 斷點數(shù)據(jù)的記錄和恢復(fù)
- 數(shù)據(jù)入庫前的去重
多線程的生產(chǎn)者和消費者模型
1. 單線程
-
其實可以看成生產(chǎn)者生產(chǎn)一個任務(wù)(比如構(gòu)造出一個url),然后消費者執(zhí)行這個任務(wù)(爬取url對應(yīng)的網(wǎng)站),消費者任務(wù)還沒執(zhí)行完時,生產(chǎn)者就不會生產(chǎn)任務(wù),所以他們相對任務(wù)來說是一對一的,同步執(zhí)行的。當(dāng)生產(chǎn)速率遠遠大于消費速率,這時生產(chǎn)者也會被拖累。
生產(chǎn)者和消費者單線程版本 - 還有一個問題就是,我們的爬蟲任務(wù),在做請求網(wǎng)絡(luò)時,實際上cpu大多數(shù)時候都在等待網(wǎng)絡(luò)返回的包,沒有完全發(fā)揮出cpu的計算能力,所以要想辦法讓cpu在等待網(wǎng)絡(luò)響應(yīng)時,也能動起來做些計算任務(wù)。而多線程就是為了解決這種情況出現(xiàn)的,操作系統(tǒng)會自動安排cpu在不同的線程中切換,提高cpu利用率。所以這就是為什么要開啟多線程的原因。
2. 多線程
主要考慮的是線程的管理,任務(wù)的調(diào)度,還有線程間共享數(shù)據(jù)會出現(xiàn)的沖突問題。
-
先來看下面這張圖。整個流程就是,我們給每個生產(chǎn)者和消費者都開啟不同的線程,生產(chǎn)者生產(chǎn)任務(wù),放入隊列中存儲,消費者從隊列中取出任務(wù),并執(zhí)行任務(wù),cpu會在不同的線程中切換,來執(zhí)行任務(wù)的。
生產(chǎn)者和消費者多線程版本 生產(chǎn)者不需要管任務(wù)是否被消費掉,只需要不停生產(chǎn)就行,而消費者也不需要去等待生產(chǎn)者,只要隊列中有任務(wù),就取出來執(zhí)行就行了,這樣就解決了任務(wù)的調(diào)度問題。
因為隊列本身是線程安全的,換句話說就是隊列中的數(shù)據(jù),在多線程下不會出現(xiàn)數(shù)據(jù)沖突的問題,這樣也就解決了隊列中共享數(shù)據(jù)的沖突問題。對于其他共享的數(shù)據(jù) ,可以使用線程鎖,來給共享數(shù)據(jù)加鎖,這樣在釋放鎖之前,其他線程就不會訪問這些數(shù)據(jù),防止沖突。
因為生產(chǎn)者和消費者中間隔了一個隊列,使得他們互不干擾,解耦程度高。這樣也帶來一個好處 ,當(dāng)生產(chǎn)速率遠大于消費速率,這時可以添加多個消費者(實際就是多開幾個線程),提高消費速率,與生產(chǎn)速率達到相對的平衡,提高資源利用率。
關(guān)于多線程知識的補充
如果對于線程的基礎(chǔ)概念不了解的,可以看下參考鏈接,講解十分到位。
廖雪峰 進程和線程
菜鳥教程 python多線程
多線程代碼思路
前面的都是介紹偏概念的知識,估計耐心看的人不多,這里就講講代碼中的思路
- 導(dǎo)入threading線程庫,隊列庫Queue,python3的是queue。
- 創(chuàng)建任務(wù)隊列,必須要設(shè)置隊列的大小,否則隊列中的任務(wù)數(shù)量會無限增長,占用大量內(nèi)存。
task_queue = Queue.Queue(maxsize=thread_max_count*10)
- 創(chuàng)建生產(chǎn)者線程,threading.Thread(target=producer)需要傳入線程要執(zhí)行函數(shù)的名字,注意函數(shù)名不要加括號。調(diào)用start后,線程才會真正的跑起來。
"""負(fù)責(zé)對生產(chǎn)者線程的創(chuàng)建"""
def producer_manager():
thread = threading.Thread(target=producer)
thread.start()
- 生產(chǎn)者執(zhí)行的生產(chǎn)任務(wù)
"""生產(chǎn)者負(fù)責(zé)請求網(wǎng)站首頁,解析出每個帖子的url,和創(chuàng)建出帖子的請求任務(wù)并放入任務(wù)隊列中"""
def producer():
for title_page in range(start_page, total_page):
text = request_title(title_page)
if not text:
continue
tie_list = parse_title(text)
for tie in tie_list:
if tieba.find_one({'link':tie['link']}): #數(shù)據(jù)去重的判斷
print('Data already exists: '+tie['link'])
else:
task_queue.put(tie) #將任務(wù)放入隊列
log.update_one(run_log,{'$set':{'run_page':title_page}}) #記錄斷點數(shù)據(jù)
- 創(chuàng)建消費者線程。將線程放入list中放入方便管理。 調(diào)用task_queue.join(),表示若隊列中還存在任務(wù),那么主線程就阻塞住,不會往下面執(zhí)行。
"""負(fù)責(zé)創(chuàng)建并管理消費者的線程"""
def consumer_manager():
threads = []
while len(threads) < thread_max_count:
thread = threading.Thread(target=consumer)
thread.setName("thread%d" % len(threads))
threads.append(thread)
thread.start()
task_queue.join()
6.消費者執(zhí)行的任務(wù)。這里用了一個while True做死循環(huán),這樣線程就不會結(jié)束,避免了創(chuàng)建和銷毀線程帶來的開銷,能提高一點運行效率。任務(wù)執(zhí)行完后需要調(diào)用queue.task_done()函數(shù),告訴隊列已經(jīng)完成一個任務(wù),只有所有任務(wù)都調(diào)用過queue.task_done()以后,隊列才會解除阻塞,主線程繼續(xù)往下執(zhí)行。
"""消費者負(fù)責(zé)從任務(wù)隊列中取出任務(wù)(即任務(wù)的消費),并執(zhí)行爬取每篇帖子和里面評論的任務(wù)"""
def consumer():
while True:
if task_queue.qsize() == 0:
time.sleep(3)
else:
task_count[0] = task_count[0] + 1
#print("run time second %s, ready task counts %d, finish task counts %d, db counts %d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
print("運行時間:%s秒,隊列中剩余任務(wù)數(shù)%d,已完成任務(wù)數(shù)%d,數(shù)據(jù)已保存條數(shù)%d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
tie = task_queue.get()
request_comment(tie)
insert_db(tie)
task_queue.task_done()
斷點記錄和恢復(fù)
為了每次開始爬取數(shù)據(jù)不必重頭開始,必須要記錄下上次斷點的位置
- 記錄的手段,可以使用csv或者是數(shù)據(jù)庫,這里用的是mongodb。
- 記錄的位置。這個需要考慮一下。如果放在每個消費者線程中的話,記錄的位置會比較多,到時候恢復(fù)起來比較麻煩。所以還是放在生產(chǎn)者線程中記錄會比較好,每次分頁請求導(dǎo)航頁時,使用數(shù)據(jù)更新的方式將頁數(shù)記錄下來,恢復(fù)時讀出頁數(shù),從這里開始繼續(xù)爬取。
數(shù)據(jù)更新的函數(shù)update_one,接受的第一個參數(shù),是表示查詢的位置的,第二個參數(shù)里 '$set‘ 是固定用法,后面是更新的數(shù)據(jù)。
log.update_one(run_log,{'$set':{'run_page':title_page}}) - 因為恢復(fù)的時候可能會存在重復(fù)的數(shù)據(jù),所以還需要做去重處理。
去重
- 去重最簡單的方法,就是在寫入數(shù)據(jù)庫前,查找有沒有這條記錄,有的話就不寫入,比較適合數(shù)據(jù)量不多時采用的方法。但在海量數(shù)據(jù)時,會受到空間和時間效率的限制,這時可以采用性能更加優(yōu)秀的Bloom-Filter,即布隆過濾器算法,不過本人沒研究過,這里不詳細討論了。
- 查詢的條件要有唯一性,本文中就是直接對 URL 進行查詢。
- 如果需要對數(shù)據(jù)庫中某個字段頻繁查詢的話,會涉及到查詢的效率問題,那就需要對這個字段做索引。索引就像書的目錄,如果查找某內(nèi)容在沒有目錄的幫助下,只能全篇從頭到尾查找翻閱,這導(dǎo)致效率非常的低下;如果在借助目錄情況下,就能很快的定位內(nèi)容所在位置,效率會直線提高。
- 對字段做索引時需要的條件,該字段最好是能滿足唯一性的,比如 ID,URL 這些數(shù)據(jù),這樣查找返回的值只有一個。還有這個字段的內(nèi)容不能頻繁變化,因為數(shù)據(jù)庫引擎會對索引維護,其實就是對索引進行排序,索引值經(jīng)常變化就會加大排序的負(fù)擔(dān),影響性能。
運行結(jié)果
可以放在服務(wù)器上運行,抓取了10萬條帖子的數(shù)據(jù)。我用個人電腦運行時,可能是因為網(wǎng)絡(luò)或者路由器的問題,最多只能開5個線程,多了容易出現(xiàn)請求超時的情況,所以最后沒辦法只能放在服務(wù)器上去跑,速度挺快的,可以開20個線程,每秒能抓取十幾個帖子,只不過cpu是瓶頸,一運行cpu就滿載,想以后再考慮優(yōu)化下吧。


完整代碼
以下是python2.7版本的代碼。
使用python3.6運行的話,import Queue要變成小寫的import queue,還有用queue.Queue()來創(chuàng)建隊列。
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import pymongo
import re, math
import time,sys
import threading
import Queue
thread_max_count = 20
total_page = 1501
db_name = 'tieba3'
"""若請求超時,則重試請求,重試次數(shù)在5次以內(nèi)"""
def request(method, url, **kwargs):
retry_count = 5
while retry_count > 0:
try:
res = requests.get(url, **kwargs) if method == 'get' else requests.post(url, **kwargs)
return res.text
except:
print('retry...', url)
retry_count -= 1
"""請求網(wǎng)站的導(dǎo)航頁,獲取帖子數(shù)據(jù)"""
def request_title(title_page=1):
title_url = "http://guba.eastmoney.com/list,cjpl_" + str(title_page) + ".html"
return request('get', title_url, timeout=5)
"""解析導(dǎo)航頁帖子的標(biāo)題數(shù)據(jù),包括閱讀數(shù),評論數(shù),標(biāo)題,作者,發(fā)布時間,評論的總頁數(shù)"""
def parse_title(text):
article_list = []
soup = BeautifulSoup(text, 'lxml')
host_url = 'http://guba.eastmoney.com'
elem_article = soup.find_all(name='div', class_='articleh')
for item in elem_article:
article_dict = {'read_count': '', 'comment_count': '', 'page': '', 'title': '', 'tie': '', 'author': '',
'time': '', 'link': '', 'comment': ''}
article_dict['read_count'] = item.select_one("span.l1").text
article_dict['comment_count'] = item.select_one("span.l2").text
article_dict['page'] = int(math.ceil(int(article_dict['comment_count']) / 30.0))
article_dict['title'] = item.select_one("span.l3 > a").text
article_dict['author'] = item.select_one("span.l4 > a").text if item.select_one("span.l4 > a") else u'匿名作者'
article_dict['time'] = item.select_one("span.l5").text
href = item.select_one("span.l3 > a").get("href")
article_dict['link'] = host_url + href if href[:1] == '/' else host_url + '/' + href
article_dict['comment'] = []
article_list.append(article_dict)
return article_list
"""根據(jù)評論的總頁數(shù),拼接出每一個評論頁的url"""
def get_comment_urls(tie):
comment_urls = []
for cur_page in range(1, tie['page'] + 1 if tie['page'] > 0 else tie['page'] + 2):
comment_url = tie['link'][:-5] + '_' + str(cur_page) + ".html"
comment_urls.append(comment_url)
return comment_urls
"""請求評論頁的數(shù)據(jù)"""
def request_comment(tie):
"""跳過一些不是帖子的鏈接"""
if re.compile(r'news,cjpl').search(tie['link']) == None:
return
print(tie['link']+' '+threading.currentThread().name)
for comment_url in get_comment_urls(tie):
text = request('get', comment_url, timeout=5)
parse_comment(text, tie)
"""解析出評論頁的數(shù)據(jù),包括作者,時間,評論內(nèi)容和計算評論樓層"""
def parse_comment(text, tie):
soup = BeautifulSoup(text, 'lxml')
if (soup.find(name='div', id='zw_body')):
tie['tie'] = soup.find(name='div', id='zw_body').text.replace(u'\u3000', u'')
div_list = soup.find(id="mainbody").find_all(name='div', class_="zwlitxt")
for item in div_list:
comment_info = {"author": '', "time": '', "content": '', "lou": 0}
comment_info['author'] = item.find(name='span', class_="zwnick").text
comment_info['lou'] = len(tie['comment']) + 1
comment_info['time'] = item.find(name='div', class_="zwlitime").text[3:]
if (item.find(name='div', class_="zwlitext stockcodec")):
comment_info['content'] = item.find(name='div', class_="zwlitext stockcodec").text
comment_info['content'] = u"沒有評論內(nèi)容" if comment_info['content'] == '' else comment_info['content']
else:
comment_info['content'] = u"沒有評論內(nèi)容"
tie['comment'].append(comment_info)
"""消費者負(fù)責(zé)從任務(wù)隊列中取出任務(wù)(即任務(wù)的消費),并執(zhí)行爬取每篇帖子和里面評論的任務(wù)"""
def consumer():
while True:
if task_queue.qsize() == 0:
time.sleep(3)
else:
task_count[0] = task_count[0] + 1
#print("run time second %s, ready task counts %d, finish task counts %d, db counts %d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
print("運行時間:%s秒,隊列中剩余任務(wù)數(shù)%d,已完成任務(wù)數(shù)%d,數(shù)據(jù)已保存條數(shù)%d"%(str(time.time() - start_time)[:6], task_queue.qsize(),task_count[0],tieba.count()))
tie = task_queue.get()
request_comment(tie)
insert_db(tie)
task_queue.task_done()
"""負(fù)責(zé)創(chuàng)建并管理消費者的線程"""
def consumer_manager():
threads = []
while len(threads) < thread_max_count:
thread = threading.Thread(target=consumer)
thread.setName("thread%d" % len(threads))
threads.append(thread)
thread.start()
task_queue.join()
"""數(shù)據(jù)保存"""
def insert_db(tie):
tieba.insert_one(tie)
"""生產(chǎn)者負(fù)責(zé)請求網(wǎng)站首頁,解析出每個帖子的url,和創(chuàng)建出帖子的請求任務(wù)并放入任務(wù)隊列中"""
def producer():
for title_page in range(start_page, total_page):
text = request_title(title_page)
if not text:
continue
tie_list = parse_title(text)
for tie in tie_list:
if tieba.find_one({'link':tie['link']}): #數(shù)據(jù)去重的判斷
print('Data already exists: '+tie['link'])
else:
task_queue.put(tie)
log.update_one(run_log,{'$set':{'run_page':title_page}}) #記錄斷點數(shù)據(jù)
"""負(fù)責(zé)對生產(chǎn)者線程的創(chuàng)建"""
def producer_manager():
thread = threading.Thread(target=producer)
thread.start()
if __name__ == '__main__':
start_time = time.time()
task_count = [0]
client = pymongo.MongoClient('localhost', 27017)
test = client['test']
tieba = test[db_name]
"""
創(chuàng)建一個log數(shù)據(jù)庫,記錄斷點的位置,每次重新運行就從斷點為位置重爬,
這里記錄的斷點數(shù)據(jù)是帖子在首頁的頁數(shù)
"""
log = test['log']
run_log = {'db_name':db_name}
if not log.find_one(run_log):
log.insert_one(run_log)
start_page = 1
else:
start_page = log.find_one(run_log)['run_page']
print('start_page',start_page)
"""使用帖子的鏈接作為索引,可以提高去重時的查詢效率"""
tieba.create_index('link')
"""必須要設(shè)置隊列的大小,否則隊列中的任務(wù)數(shù)量會無限增長,占用大量內(nèi)存"""
task_queue = Queue.Queue(maxsize=thread_max_count*10)
producer_manager() #創(chuàng)建生產(chǎn)者線程
consumer_manager() #創(chuàng)建消費者線程

