這次對之前的代理池1.0版本進行了升級,可用性大大增加了,也增加了一些IP源頭的獲取,包括西刺高匿代理前50頁的IP抓取,還有對于TXT文件里面的IP存入數(shù)據(jù)庫的操作,因為樓主是測試了免費的代理之后再也不想用了,就直接去淘寶買了20幾塊一個月,還能接受,因為返回的IP是在txt文件里,所以有了這個操作。順便提一下,之前抓取了西刺免費2000多個IP,測出來能用的只有20個左右,媽的,亮瞎我的眼,好了,先看代碼框架。

Paste_Image.png
先看封裝的數(shù)據(jù)庫操作
from pymongo import MongoClient,errors
from _datetime import datetime,timedelta
class mogo_queue(object):
OUTSTANDING = 1 ##初始狀態(tài)
PROCESSING = 2 ##測試過后的狀態(tài)
def __init__(self, db, collection):
self.client = MongoClient()
self.database = self.client[db] # 鏈接數(shù)據(jù)庫
self.db = self.database[collection] # 鏈接數(shù)據(jù)庫里面這個表
def __bool__(self):
"""
這個函數(shù),我的理解是如果下面的表達為真,則整個類為真
至于有什么用,后面我會注明的(如果我的理解有誤,請指點出來謝謝,我也是Python新手)
$ne的意思是不匹配
"""
record = self.db.find_one(
{'status': {'$ne': self.PROCESSING}}
)
return True if record else False
def push_ip_url(self,url):
self.db.insert({'_id':url})
print('IP鏈接{}插入成功'.format(url))
def find_url(self):#找到所有代理的url
url_list=[]
for i in self.db.find():
url= i['_id']
url_list.append(url)
return url_list
def find_proxy(self):
proxy_list = [] # 用來接收從數(shù)據(jù)庫查找到的所有代理
for i in self.db.find():
proxy = i['proxy']
proxy_list.append(proxy)
return proxy_list
def push_ip(self,ip,port,proxy):#把代理插進數(shù)據(jù)庫的操作
try:
self.db.insert({'_id':ip,'port':port,'proxy':proxy,'status':self.OUTSTANDING})
print(proxy,'代理插入成功')
except errors.DuplicateKeyError as e:#對于重復(fù)的ip不能插入
print(proxy,'已經(jīng)存在隊列中')
def find_one_ip(self):
record = self.db.find_and_modify(
query={'status': self.OUTSTANDING},#改變狀態(tài),防止另外的進程也車市同一個ip
update={'$set': {'status': self.PROCESSING,}}
)
if record:
return record['proxy']
else:
raise KeyError
def status_setting(self):
record = self.db.find({'status':self.PROCESSING})#找到所有狀態(tài)為2的代理,
#就是之前測試過的,以備重新測試,畢竟很多IP存活率不高
#print(record)
for i in record:
print(i)
id=i["_id"]
#query={'status':self.PROCESSING},
self.db.update({'_id':id},{'$set': {'status': self.OUTSTANDING }})#該狀態(tài)為1,
#重新測試
print('代理',id,'更改成功')
# if record:
# return record
def delete_proxy(self,proxy):
"""這個函數(shù)是更新已完成的URL完成"""
self.db.delete_one({'proxy': proxy})
print('無效代理{}刪除成功'.format(proxy))
下面是test文件
import requests
from pymongo import MongoClient
import re
from bs4 import BeautifulSoup
from mogodb_operate import mogo_queue
from proxy_request import request
url_queue = mogo_queue('ip_database','ip_link_collection')
ip_queue = mogo_queue('ip_database','proxy_collection')
class ip_operator():
@staticmethod
def insert_xici_url(page):
urls = ['http://www.xicidaili.com/nn/{}'.format(str(i)) for i in range(page)]
#構(gòu)造西刺網(wǎng)前面page頁的URL
for url in urls:
#print(url)
url_queue.push_ip_url(url)#插進URL數(shù)據(jù)庫
@staticmethod
def catch_ip_xici():#爬取西刺網(wǎng)前面50頁的免費IP
ip_url=url_queue.find_url()
for url in ip_url:
data = request.get(url,3)
all_data = BeautifulSoup(data.text, 'lxml')
all_ip = all_data.find_all('tr', class_='odd')
for i in all_ip:
ip = i.find_all('td')[1].get_text() # ip
port = i.find_all('td')[2].get_text() # 端口
proxy = (ip + ':' + port).strip() # 組成成proxy代理
ip_queue.push_ip(ip,port,proxy)#插進數(shù)據(jù)庫
#ip_queue.find_one_ip()
@staticmethod#本來還想把快代理的抓取也封裝進來,
然后測試了西刺之后,覺得免費IP就算了吧....,這段代碼大家可以無視
def insert_kuaidaili_url(page):
urls=['http://www.kuaidaili.com/free/inha/{}/'.format(str(i)) for i in range(page)]
for url in urls:
url_queue.push_ip_url(url)
#catch_url_ip()
@staticmethod
def insert_ip_text():#把txt文件里面的ip存進數(shù)據(jù)庫
f= open('C:\\Users\\admin\\Desktop\\ip_daili.txt',encoding='utf-8')
data = f.read()
proxy=data.split('\n')
for i in proxy:
proxie = i,
ip =i.split(':')[0]# ip
port =i.split(":")[1] # 端口
#proxie = str(ip)+':'+str(port),
#print(ip,port,i)
ip_queue.push_ip(ip,port,i)
#print(proxy)
f.close()
ip_operator.catch_ip_xici()
#爬西刺的代理,存進數(shù)據(jù)庫以待檢驗
ip_operator().insert_ip_text()
#把txt文件里面保存的IP插進數(shù)數(shù)據(jù)庫以待檢驗存進數(shù)據(jù)庫以待檢驗
接下來是爬蟲主程序,寫了好久啊,哎,都是因為要爬文書網(wǎng)才寫了這玩意
import requests
from pymongo import MongoClient
import threading
from bs4 import BeautifulSoup
import time
import re
from mogodb_operate import mogo_queue
url = 'http://ip.chinaz.com/getip.aspx'#這個是用于測試IP有效性的網(wǎng)站,
IP正常情況下會返回IP,以及IP所在地址
import multiprocessing
ip_queue = mogo_queue('ip_database','proxy_collection')#鏈接到儲存IP的數(shù)據(jù)庫
def ip_catch(max_threads=9):
def test_effictive_ip():
while True:#不斷循環(huán),找到數(shù)據(jù)進行測試
try:
proxy = ip_queue.find_one_ip()#提取IP,準備測試
try:
proxies = {'http':'http://{}'.format(proxy),
'https':'http://{}'.format(proxy),}
html = requests.get(url,proxies=proxies,timeout=1)
status_number = re.findall(r'\d\d\d', str(html))[0]#提取網(wǎng)頁返回碼
re_ip = re.findall(r'\{ip',html.text)#有些ip極其惡心,
雖然返回的是200數(shù)字,
表示正常,實則是bad request,這里去除掉
#print(re_ip)
if status_number==str(200):
if re_ip:
#檢驗代理是否能正常使用
print('網(wǎng)頁返回狀態(tài)碼:',html,proxy,'代理有效,地址是:',html.text)
else:
ip_queue.delete_proxy(proxy)
else:
ip_queue.delete_proxy(proxy)
except:
ip_queue.delete_proxy(proxy)
except KeyError:
print('隊列沒有數(shù)據(jù)了')
break
#print(proxy,'代理無效')
threads = []
while threads or ip_queue:
"""
這兒crawl_queue用上了,就是我們__bool__函數(shù)的作用,
為真則代表我們MongoDB隊列里面還有IP沒檢測完,
也就是狀態(tài)依然是沒有改變,還有沒被測試過的IP
threads 或者 為真都代表我們還沒下載完成,程序就會繼續(xù)執(zhí)行
"""
for thread in threads:
if not thread.is_alive(): ##is_alive是判斷是否為空,不是空則在隊列中刪掉
threads.remove(thread)
while len(threads) < max_threads : ##線程池中的線程少于max_threads 或者 crawl_qeue時
thread = threading.Thread(target=test_effictive_ip) ##創(chuàng)建線程
thread.setDaemon(True) ##設(shè)置守護線程
thread.start() ##啟動線程
threads.append(thread) ##添加進線程隊列
time.sleep(5)
def process_crawler():
process = []
num_cpus = multiprocessing.cpu_count()
print('將會啟動進程數(shù)為:', num_cpus)
for i in range(num_cpus):
p = multiprocessing.Process(target=ip_catch) ##創(chuàng)建進程
p.start() ##啟動進程
process.append(p) ##添加進進程隊列
for p in process:
p.join() ##等待進程隊列里面的進程結(jié)束
if __name__ == "__main__":
ip_queue.status_setting()#重置狀態(tài),以便測試
process_crawler()
于是我在淘寶每次提取5000個IP,都有差不多300個能有,已經(jīng)滿足了,以后找到更好的IP代理源再分享出來,上張運行圖和測試過后的圖,開始之前是5000,最后只剩幾百,源頭問題真的很重要

Paste_Image.png

Paste_Image.png