功能說(shuō)明
@功能1:[定義Redis數(shù)據(jù)庫(kù)模板]主要實(shí)現(xiàn)連接數(shù)據(jù)庫(kù)、寫(xiě)入ip數(shù)據(jù)、隨機(jī)讀取正常的ip、權(quán)重淘汰機(jī)制、讀取數(shù)據(jù)庫(kù)ip總數(shù)
@功能2:[編寫(xiě)代理IP收集器(爬蟲(chóng))]采集toolbaba網(wǎng)站和beesproxy網(wǎng)站的免費(fèi)高匿代理ip和端口并處理成json數(shù)據(jù)
@功能3:[調(diào)度器(采集、存取)]調(diào)用爬蟲(chóng)去采集代理ip和端口數(shù)據(jù)并將調(diào)用Redis數(shù)據(jù)庫(kù)存儲(chǔ)
@功能4:[代理IP測(cè)試機(jī)制]定時(shí)從數(shù)據(jù)庫(kù)中獲取代理ip并請(qǐng)求對(duì)應(yīng)的頁(yè)面來(lái)測(cè)試代理ip是否可用,從而給Redis數(shù)據(jù)庫(kù)中的的代理ip權(quán)重打分
@功能5:[搭建API接口]提供外部調(diào)用的接口、一共三個(gè)功能(歡迎頁(yè)面、數(shù)據(jù)庫(kù)ip總數(shù)頁(yè)面、獲取ip頁(yè)面)
@功能6:[[main]代理IP池的入口]多線程運(yùn)行各個(gè)模塊的功能
定義Redis數(shù)據(jù)庫(kù)模板
# -*- encoding: utf-8 -*-
"""
@File : store_IP.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/8/18 22:54 MuKe~ 1.0 None
"""
import redis
from random import choice
# ========================================================= 配置信息 =====================================================
MAX_SCORE = 100 # 權(quán)重(最高分?jǐn)?shù))
MIN_SCORE = 0 # 權(quán)重(最低分?jǐn)?shù))
INITIAL_SCORE = 10 # 權(quán)重(初始分?jǐn)?shù))
REDIS_HOST = 'localhost' # Redis數(shù)據(jù)庫(kù)地址
REDIS_PORT = 6379 # Redis數(shù)據(jù)庫(kù)端口
REDIS_PASSWORD = "admin" # Redis數(shù)據(jù)庫(kù)密碼
REDIS_KEY = 'proxies' # Redis數(shù)據(jù)庫(kù)key(相當(dāng)于集合或表)
# ========================================================= 初始化 ====================================================
class RedisClient(object): # 定義一個(gè)類(lèi)
def __init__(self,host=REDIS_HOST,port=REDIS_PORT,password=REDIS_PASSWORD): # 定義一個(gè)初始化
"""
初始化
:param host: Redis 地址
:param port: Redis 端口
:param password: Redis 密碼
"""
# 實(shí)例化redis庫(kù)的StrictRedis方法傳入地址、端口、密碼、連接redis存的數(shù)據(jù)是字符串格式 文章參考https://blog.csdn.net/weixin_30520015/article/details/95749694
self.db = redis.StrictRedis(host=host,port=port,password=password,decode_responses=True)
print("[INFO] 正在訪問(wèn)Redis數(shù)據(jù)庫(kù)")
# ========================================================= 添加代理ip ===================================================
def add(self,proxy,score=INITIAL_SCORE): # 定義添加函數(shù)
"""
添加代理、設(shè)置分?jǐn)?shù)為最高
:param proxy: 代理
:param score: 分?jǐn)?shù)
:return: 添加結(jié)果
"""
# 調(diào)用db對(duì)象的zscore()方法參數(shù):key名稱(chēng)、字符串內(nèi)容 判斷如果不存在則執(zhí)行寫(xiě)入
if not self.db.zscore(REDIS_KEY,proxy):
print(f"[INFO] 寫(xiě)入數(shù)據(jù)庫(kù){proxy}")
# 返回 調(diào)用db對(duì)象的zscore()方法參數(shù):key名稱(chēng)、寫(xiě)入的內(nèi)容 注意:(坑)寫(xiě)入時(shí)要以字典方法寫(xiě)入字符串、{代理ip:初始化分?jǐn)?shù)}
return self.db.zadd(REDIS_KEY,{proxy:score})
# ========================================================= 獲取隨機(jī)代理ip ===============================================
def random(self): # 定義隨機(jī)函數(shù)
"""
隨機(jī)獲取有效代理,首先嘗試獲取最高分?jǐn)?shù)代理,如果最高分?jǐn)?shù)不存在,則按照排名獲取,否則異常
:return: 隨機(jī)代理
"""
# 調(diào)用db對(duì)象的zrangebyscore()方法查詢符合分?jǐn)?shù)范圍的值(代理ip) 參數(shù):key、最大分?jǐn)?shù)(100)、最大分?jǐn)?shù)(100)
result = self.db.zrangebyscore(REDIS_KEY,MAX_SCORE,MAX_SCORE)
if len(result): # 統(tǒng)計(jì)返回值數(shù)量 條件不為空?qǐng)?zhí)行
return choice(result) # 返回Redis數(shù)據(jù)庫(kù)符合最大值100條件的代理ip
else: # 不滿足最大值100條件 條件為空?qǐng)?zhí)行
# 調(diào)用db對(duì)象的zrevrange()方法 一次從最高值查詢到最低 參數(shù):key、最小值、最大分?jǐn)?shù)
result = self.db.zrevrange(REDIS_KEY,0,100)
if len(result): # 統(tǒng)計(jì)返回值數(shù)量 條件不為空?qǐng)?zhí)行
return choice(result)
else:
raise PoolEmptyError # 如果沒(méi)有則返回異常 raise函數(shù)條件異常
# =========================================================== 打分機(jī)制 ===================================================
def decrease(self,proxy): # 檢測(cè)函數(shù) 參數(shù):代理ip
"""
代理值減一分,分?jǐn)?shù)小于最小值,則代理刪除
:param proxy: 代理
:return: 修改后的代理分?jǐn)?shù)
"""
score = self.db.zscore(REDIS_KEY,proxy) # 調(diào)用db對(duì)象的zscore()方法獲取代理ip 參數(shù):key、代理ip
if score and score > MIN_SCORE: # 判斷 獲取到的值(權(quán)重分?jǐn)?shù)) 大于 最低分?jǐn)?shù),則執(zhí)行
print('[INFO] 代理',proxy,'當(dāng)前分?jǐn)?shù)',score,'減1')
return self.db.zincrby(REDIS_KEY,-1,proxy) # 返回 調(diào)用db對(duì)象的zincrby()方法 寫(xiě)入修改后的值 注意:(坑)參數(shù)key、運(yùn)算符、代理ip(鍵)
else: # 判斷 獲取到的值(權(quán)重分?jǐn)?shù)) 小于 最低分?jǐn)?shù),則執(zhí)行
print('[INFO] 代理',proxy,'當(dāng)前分?jǐn)?shù)',score,'移除')
return self.db.zrem(REDIS_KEY,proxy) # 調(diào)用db對(duì)象的zrem()方法刪除數(shù)據(jù) 參數(shù)key、代理ip(鍵)
# ============================================================ 驗(yàn)證 =====================================================
def exists(self,proxy): # 定義查詢函數(shù)
"""
判斷是否存在
:param proxy:代理
:return: 是否存在
"""
# 向函數(shù)外部返回 Redis數(shù)據(jù)庫(kù)獲取到的值 調(diào)用db對(duì)象的zscore()方法獲取值 若不等于空則,為T(mén)uer/Fake not轉(zhuǎn)換真假
return not self.db.zscore(REDIS_KEY,proxy) == None
# ========================================================= 修改最權(quán)重(大分?jǐn)?shù)) ============================================
def max(self,proxy): # 定義最大函數(shù) 參數(shù):代理ip
"""
將代理設(shè)置為MAX_SCORE
:param proxy: 代理
:return: 設(shè)置結(jié)果
"""
print('[INFO] 代理',proxy,'可用,設(shè)置為',MAX_SCORE)
# 返回 調(diào)用db對(duì)象的zscore()方法參數(shù):key名稱(chēng)、寫(xiě)入的內(nèi)容 注意:(坑)寫(xiě)入時(shí)要以字典方法寫(xiě)入字符串、{代理ip:初始化分?jǐn)?shù)}
return self.db.zadd(REDIS_KEY,{proxy:MAX_SCORE})
# ========================================================= 獲取數(shù)據(jù)庫(kù)代理ip總數(shù)量 =========================================
def count(self): # 定義統(tǒng)計(jì)函數(shù)
"""
獲取數(shù)量
:return: 數(shù)量
"""
return self.db.zcard(REDIS_KEY) # 調(diào)用db對(duì)象的zcard()方法查詢key(集合或表)中共有多少數(shù)據(jù)條
# ========================================================= 獲取數(shù)據(jù)庫(kù)全部代理ip ==========================================
def all(self): # 定義查詢?nèi)看韎p函數(shù)
"""
獲取全部代理
:return:全部代理列表
"""
return self.db.zrangebyscore(REDIS_KEY,MIN_SCORE,MAX_SCORE) # 返回 調(diào)用db對(duì)象的zrangebyscore()方法查詢'proxies' 鍵中從最小權(quán)重到最大權(quán)重的代理ip
編寫(xiě)代理IP收集器(爬蟲(chóng))
# -*- encoding: utf-8 -*-
"""
@File : Obtain_IP.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/8/18 20:46 MuKe~ 1.0 None
"""
import json
import requests
from lxml import etree,html
from pyquery import PyQuery as pq
from html.parser import HTMLParser
# ====================================================== 調(diào)度爬取數(shù)據(jù)函數(shù) =================================================
class ProxyMetaclass(type): # type元類(lèi) 參考文章:https://blog.csdn.net/qq_16688265/article/details/80378255
def __new__(cls, name,bases,attrs): # __new__類(lèi) 參考文章:https://zhuanlan.zhihu.com/p/524551724
count = 0 # 計(jì)數(shù)器
attrs['__CrawlFunc__'] = [] # 創(chuàng)建空列表
for k,v in attrs.items(): # 獲取列表中的值
if 'crawl_' in k:
attrs['__CrawlFunc__'].append(k) # 向列表添加
count += 1 # 計(jì)數(shù)器
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls,name,bases,attrs) # 返回
# ===================================================== 爬取toolbaba網(wǎng)數(shù)據(jù) ===============================================
class Crawler(object,metaclass=ProxyMetaclass): # 定義一個(gè)爬取類(lèi) 參數(shù):繼承object對(duì)象、
def get_proxies(self,callback):
proxies = [] # 創(chuàng)建空列表
for proxy in eval("self.{}()".format(callback)): # 歷遍 拼接執(zhí)行表達(dá)式 參考文章:https://www.runoob.com/python/python-func-eval.html
print('[INFO] 成功獲取到代理',proxy)
proxies.append(proxy) # 添加到列表中
return proxies # 返回列表
# ===================================================== 爬取toolbaba網(wǎng)數(shù)據(jù) ===============================================
def crawl_toolbaba(self,page_count=2):#3649 # 參數(shù) 網(wǎng)站頁(yè)碼
"""
獲取https://www.toolbaba.cn/ip?p=1網(wǎng)代理ip
:param page_count:頁(yè)碼
:return: 代理ip+端口
"""
start_url = 'https://www.toolbaba.cn/ip?p={}'
headers = {
"referer": "https://www.toolbaba.cn/",
"User-Agent": "Mozilla/4.0 (Windows NT 11.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4430.85 Safari/537.16"
}
urls = [start_url.format(page) for page in range(1,page_count + 1)] # 拼接url 循環(huán)步進(jìn)為1
for url in urls: # 歷遍列表
print('[++++] Crawling', url)
try:
GET_URL = requests.get(url, headers=headers).text # 發(fā)送get請(qǐng)求 轉(zhuǎn)換為文本
GET_html = etree.HTML(GET_URL) # 解析為html
for i in range(1, 108): # 歷遍頁(yè)面的ip和端口數(shù)據(jù)
ip_html = GET_html.xpath(f'/html/body/div[1]/div[1]/div[4]/div/table/tbody/tr[{i}]/td[1]/text()')# ip
port_html = GET_html.xpath(f'/html/body/div[1]/div[1]/div[4]/div/table/tbody/tr[{i}]/td[2]/text()')# 端口
if ip_html != []: # 判斷是否為空,如果不為空則執(zhí)行
# print(ip_html[0] + ":" + port_html[0])
yield ':'.join([ip_html[0],port_html[0]]) # 生成器函數(shù) 將ip和端口拼接成join數(shù)據(jù) 參考文章https://zhuanlan.zhihu.com/p/268605982
except () as Error:
print(f"[INFO] {url}異常!",Error)
# =========================================================== 爬取beesproxy數(shù)據(jù) ==========================================
def crawl_beesproxy(self,page_count=2):#3396 # 參數(shù) 網(wǎng)站頁(yè)碼
"""
獲取https://www.beesproxy.com/free/page/2
:param page_count:頁(yè)碼
:return: 代理ip+端口
"""
start_url = 'https://www.beesproxy.com/free/page/{}'
headers = {
"referer": "https://www.beesproxy.com/",
"User-Agent": "Mozilla/4.0 (Windows NT 11.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4430.85 Safari/537.16"
}
urls = [start_url.format(page) for page in range(1, page_count + 1)] # 拼接url 循環(huán)步進(jìn)為1
for url in urls: # 歷遍列表
print('[++++] Crawling', url)
try:
GET_URL = requests.get(url, headers=headers).text # 發(fā)送get請(qǐng)求 轉(zhuǎn)換為文本
GET_html = etree.HTML(GET_URL) # 解析為html
for i in range(1, 21): # 歷遍頁(yè)面的ip和端口數(shù)據(jù)
ip_html = GET_html.xpath(f'//*[@id="article-copyright"]/figure/table/tbody/tr[{i}]/td[1]/text()')# ip
port_html = GET_html.xpath(f'//*[@id="article-copyright"]/figure/table/tbody/tr[{i}]/td[2]/text()')# 端口
if ip_html != []: # 判斷是否為空,如果不為空則執(zhí)行
# print(ip_html[0] + ":" + port_html[0])
yield ':'.join([ip_html[0], port_html[0]]) # 生成器函數(shù) 將ip和端口拼接成join數(shù)據(jù)
except () as Error:
print(f"[INFO] {url}異常!",Error)
調(diào)度器(采集、存取)
# -*- encoding: utf-8 -*-
"""
@File : Judgment_module.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/8/25 14:28 MuKe~ 1.0 None
"""
from db import RedisClient
from crawler import Crawler
# ============================================================= 配置信息 =================================================
POOL_UPPER_THRESHOLD = 100000000 # 設(shè)置數(shù)據(jù)庫(kù)最大代理ip的存儲(chǔ)量(條)
# ============================================================= 調(diào)度模塊 =================================================
class Getter(): # 定義一個(gè)類(lèi)
def __init__(self): # 初始化
self.redis = RedisClient() # 將數(shù)據(jù)庫(kù)類(lèi)賦值給self參數(shù)
self.crawler = Crawler() # 將爬取類(lèi)賦值給self參數(shù)
# =========================================================== 判斷數(shù)據(jù)庫(kù)寫(xiě)入容量 ===========================================
def is_over_threshold(self):
"""
判斷是否達(dá)到了代理池限制
"""
if self.redis.count() >= POOL_UPPER_THRESHOLD: # 獲取到的數(shù)據(jù)庫(kù)總數(shù) 大于 設(shè)置的數(shù)據(jù)庫(kù)最大現(xiàn)在則返回為 True否則為False
return True
else:
return False
# ================================================================ 入口 =================================================
def run(self):
print('[INFO] 獲取器開(kāi)始執(zhí)行')
if not self.is_over_threshold(): # is_over_threshold函數(shù)返回的值若為T(mén)rue則執(zhí)行
for callback_label in range(self.crawler.__CrawlFuncCount__): # 獲取crawler模塊的類(lèi) 并歷遍
callback =self.crawler.__CrawlFunc__[callback_label] # 調(diào)用類(lèi)
proxies = self.crawler.get_proxies(callback) # 調(diào)用類(lèi)的方法
for proxy in proxies: # 對(duì)返回值(json數(shù)據(jù){ip:端口})歷遍
self.redis.add(proxy) # 調(diào)用數(shù)據(jù)庫(kù)中的add()方法 參數(shù):代理ip
代理IP測(cè)試機(jī)制
# -*- encoding: utf-8 -*-
"""
@File : Tester.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/9/7 23:47 MuKe~ 1.0 None
"""
import asyncio
import time
import aiohttp
from aiohttp.client import ClientHttpProxyError,ClientError,ClientConnectorError
from db import RedisClient
# ========================================================= 配置信息 =====================================================
VALID_STATUS_CODES = 200 # 狀態(tài)碼
TEST_URL = 'https://www.baidu.com' # 被測(cè)試網(wǎng)頁(yè)
BATCH_TEST_SIZE = 100 #
Time_out = 40 # 請(qǐng)求超時(shí)時(shí)間(秒)
# ========================================================= 測(cè)試代理ip模塊 ===============================================
class Tester(object):
def __init__(self):
self.redis = RedisClient() # 實(shí)例化Redis數(shù)據(jù)庫(kù)
async def test_single_proxy(self,proxy): # async函數(shù)異步非阻塞 參數(shù):代理ip
"""
測(cè)試單個(gè)代理
:param proxy:單個(gè)代理
:return: None
"""
conn = aiohttp.TCPConnector(verify_ssl=False) # 設(shè)置connector 參考文章:https://blog.csdn.net/u014651560/article/details/117285028
async with aiohttp.ClientSession(connector=conn) as session: # async函數(shù)異步非阻塞 with自動(dòng)關(guān)閉aiohttp
if isinstance(proxy,bytes): # isinstance函數(shù)判斷一個(gè)對(duì)象是否是一個(gè)已知的類(lèi)型 參考文章:https://blog.csdn.net/u012813109/article/details/106008885
proxy = proxy.decode('utf-8') # 轉(zhuǎn)換編碼 (這時(shí)的proxy是bytes對(duì)象需要轉(zhuǎn)換編碼)
real_proxy = 'http://'+proxy
print('[INFO] 正在測(cè)試',proxy)
try: # 捕獲由session對(duì)象的get()方法產(chǎn)生的一些列異常 如:請(qǐng)求超時(shí)、網(wǎng)絡(luò)中斷等等……
# async函數(shù)異步非阻塞 with自動(dòng)關(guān)閉aiohttp 請(qǐng)求參數(shù)為被測(cè)試網(wǎng)頁(yè)、代理ip、超時(shí)時(shí)間
async with session.get(TEST_URL,proxy=str(real_proxy),timeout=Time_out) as response:
if response.status == VALID_STATUS_CODES: # 判斷服務(wù)器返回的狀態(tài)碼是否與設(shè)置的相等
# print(await response.text())
print(f"[++++] 傳入?yún)?shù):(狀態(tài)碼:{response.status} 被測(cè)試網(wǎng)頁(yè):{TEST_URL} 代理IP{str(real_proxy)}延時(shí):{Time_out})")
self.redis.max(proxy) # 調(diào)用Redis數(shù)據(jù)的max()方法來(lái)將權(quán)重修改為最大分?jǐn)?shù)
print('[INFO] 代理可用',proxy)
else:
self.redis.decrease(proxy) # 調(diào)用Redis數(shù)據(jù)的decrease()方法 打分函數(shù)
print('[INFO] 請(qǐng)求響應(yīng)碼不合法',proxy)
# 注意:巨坑 捕獲多個(gè)異常使用小括號(hào) 如果找不到異常類(lèi),則需要導(dǎo)入可以在捕獲異常里導(dǎo)入也可以在開(kāi)頭模塊中導(dǎo)入
# except (ClientHttpProxyError,ClientError,ClientConnectorError,TimeoutError,AttributeError,ClientOSError):
except (asyncio.TimeoutError,aiohttp.ClientOSError,aiohttp.ClientHttpProxyError,
aiohttp.ServerDisconnectedError,aiohttp.ClientResponseError):
self.redis.decrease(proxy) # 調(diào)用Redis數(shù)據(jù)的decrease()方法 打分函數(shù)
print('[INFO] 代理請(qǐng)求失敗',proxy)
# ========================================================= 測(cè)試代理ip模塊 ===============================================
def run(self):
"""
測(cè)試主函數(shù)
:return: None
"""
print('[INFO] 測(cè)試器開(kāi)始運(yùn)行')
try: # 捕獲調(diào)用中產(chǎn)生的未知異常
proxies = self.redis.all() # 調(diào)用Redis數(shù)據(jù)庫(kù)中all()方法輸出數(shù)據(jù)庫(kù)中的全部代理ip
loop = asyncio.get_event_loop() # 獲取當(dāng)前上下文的事件循環(huán) 參考文章:https://zhuanlan.zhihu.com/p/83627584#:~:text=Asyncio%E6%8F%90%E4%BE%9B%E7%94%A8%E4%BA%8E%E7%AE%A1%E7%90%86%E4%BA%8B%E4%BB%B6%E5%BE%AA%E7%8E%AF%E7%9A%84%E6%96%B9%E6%B3%95%E5%A6%82%E4%B8%8B%EF%BC%9A%20loop%20%3D%20get_event%20_%20loop%20%28%29%E6%96%B9%E6%B3%95,-%20%E8%8E%B7%E5%8F%96%E5%BD%93%E5%89%8D%E4%B8%8A%E4%B8%8B%E6%96%87%E7%9A%84%E4%BA%8B%E4%BB%B6%E5%BE%AA%E7%8E%AF%20loop.call_later%20%28time_delay%2C%20callback%2C%20argument%29%20-%20%E5%9C%A8%E7%BB%99%E5%AE%9A%E7%9A%84%E6%97%B6%E9%97%B4%E5%BB%B6%E8%BF%9F%E7%A7%92%E4%B9%8B%E5%90%8E%E8%B0%83%E7%94%A8%E5%9B%9E%E8%B0%83%E5%87%BD%E6%95%B0
# 批量測(cè)試
for i in range(0,len(proxies),BATCH_TEST_SIZE): # 歷遍開(kāi)始位置為0 數(shù)量求和數(shù)據(jù)庫(kù)的總條數(shù) 步長(zhǎng)
test_proxies = proxies[i:i + BATCH_TEST_SIZE]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies] # 傳入代理ip 歷遍從列表中獲取到的代理ip
loop.run_until_complete(asyncio.wait(tasks)) # run_until_complete()方法發(fā)起輪詢 參考文章:https://zhuanlan.zhihu.com/p/73568282#:~:text=asyncio.wait,%E4%BC%9A%E8%BF%94%E5%9B%9E%E5%B0%81%E8%A3%85%E7%9A%84Task%20%28%E5%8C%85%E5%90%AB%E5%B7%B2%E5%AE%8C%E6%88%90%E5%92%8C%E6%8C%82%E8%B5%B7%E7%9A%84%E4%BB%BB%E5%8A%A1%29%EF%BC%8C%E5%A6%82%E6%9E%9C%E4%BD%A0%E5%85%B3%E6%B3%A8%E5%8D%8F%E7%A8%8B%E6%89%A7%E8%A1%8C%E7%BB%93%E6%9E%9C%E4%BD%A0%E9%9C%80%E8%A6%81%E4%BB%8E%E5%AF%B9%E5%BA%94Task%E5%AE%9E%E4%BE%8B%E9%87%8C%E9%9D%A2%E7%94%A8result%E6%96%B9%E6%B3%95%E8%87%AA%E5%B7%B1%E6%8B%BF%E3%80%82
time.sleep(5)
except Exception as e: # 輸出異常
print('[INFO] 測(cè)試器發(fā)送錯(cuò)誤',e.args)
搭建API接口
# -*- encoding: utf-8 -*-
"""
@File : api.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/9/7 23:49 MuKe~ 1.0 None
"""
from flask import Flask,g
from db import RedisClient
# ======================================================= Flask ========================================================
__all__ = ['app']
app = Flask(__name__)
def get_conn():
if not hasattr(g,'redis'): # 判斷如果對(duì)象有該屬性返回 True,否則返回 False 參數(shù):對(duì)象、屬性名、字符串 參考文章:https://www.runoob.com/python/python-func-hasattr.html
g.redis = RedisClient()
return g.redis # 返回屬性名
@app.route('/') # 裝飾器 給函數(shù)額外添加功能 ’/' api的url參數(shù)
def index(): # api主頁(yè)
return '<h2>Welcome to Proxy Poll System</h2>'
@app.route('/random')
def get_proxy(): # 代理ip頁(yè)
"""
獲取隨機(jī)可用代理
:return: 隨機(jī)代理
"""
conn = get_conn()
return conn.random()
@app.route('/count')
def get_counts(): # 數(shù)據(jù)庫(kù)代理ip總數(shù)
"""
獲取代理池總量
:return: 代理池總量
"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__': # main程序的入口
app.run()
[main]代理IP池的入口
# -*- encoding: utf-8 -*-
"""
@File : insatrll_schedule.py
@Contact : t.ianxi@foxmail.com
@License : // Copyright (C) 2018 Milo Yip<dell>
@Modify Time @Author @Version @Desciption
------------ ------- -------- -----------
2022/9/8 14:31 MuKe~ 1.0 None
"""
import time,socket
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
# ======================================================= 配置信息 =======================================================
TESTER_CYCLE = 120#120 # 爬取代理ip模塊延時(shí)(秒)
GETTER_CYCLE = 14400#14400 # 檢測(cè)代理ip模塊延時(shí)(秒)
TESTER_ENABLED = False # 檢測(cè)模塊開(kāi)關(guān)
GETTER_ENABLED = False # 爬取模塊開(kāi)關(guān)
API_ENABLED = True # API模塊開(kāi)關(guān)
API_PORT = '8081' # Flask的端口號(hào)(api)
# ======================================================= 啟動(dòng)檢測(cè)代理ip模塊 ==============================================
class Scheduler():
def schedule_tester(self):
"""
定時(shí)測(cè)試代理
:param cycle:
:return:
"""
tester = Tester() # 實(shí)例化檢測(cè)代理ip
while True: # 維持進(jìn)程
print('[INFO] 測(cè)試器開(kāi)始運(yùn)行')
tester.run() # 啟動(dòng)檢測(cè)代理ip
time.sleep(TESTER_CYCLE) # 延時(shí)檢測(cè)
# ======================================================= 啟動(dòng)爬取代理ip模塊 ==============================================
def schedule_getter(self):
"""
定時(shí)獲取代理
:param cycle:
:return:
"""
getter = Getter() # 實(shí)例化爬取代理ip掉度模塊
while True: # 維持進(jìn)程
print('[INFO] 開(kāi)始抓取代理')
getter.run() # 啟動(dòng)爬取代理ip
time.sleep(GETTER_CYCLE) # 延時(shí)爬取
# ======================================================= 啟動(dòng)API模塊 ====================================================
def schedule_api(self):
"""
開(kāi)啟API
:return:
"""
IP = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 通過(guò)socket庫(kù)的socket()方法機(jī)器ip 參數(shù):socket.AF_INET 和 socket.SOCK_DGRAM
IP.connect(("8.8.8.8", 80)) # 過(guò)濾格式
API_HOST = IP.getsockname()[0] # 切片地址
app.run(API_HOST,API_PORT) # 啟動(dòng)api 傳入機(jī)器地址 和 端口號(hào)
# ======================================================= 程序調(diào)度 =======================================================
def run(self):
print('[INFO] 代理池開(kāi)始運(yùn)行')
if TESTER_ENABLED: # 檢測(cè)模塊開(kāi)關(guān)
tester_process = Process(target=self.schedule_tester) # 代理池傳入屬性
tester_process.start() # 啟動(dòng)線程
if GETTER_ENABLED: # 爬取模塊開(kāi)關(guān)
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED: # API模塊開(kāi)關(guān)
api_process = Process(target=self.schedule_api)
api_process.start()
# ======================================================= main =======================================================
if __name__ == '__main__':
satrt = Scheduler().run()