是1 2年前做的了 現(xiàn)在應(yīng)該有點(diǎn)不一樣了。發(fā)出來記錄
參考:https://blog.csdn.net/qq_28804275/article/details/82150874
https://cuiqingcai.com/4652.html
https://www.chenwenguan.com/wechat-browse-automation/#comment-114
https://www.cnblogs.com/luojiangwen/p/7943696.html
目前使用的爬取單個(gè)公眾號(hào)的辦法:通過電腦登錄微信,使用fiddler抓包工具抓取https包信息,去構(gòu)造翻頁參數(shù)等數(shù)據(jù)來爬取全部的文章。該方法不會(huì)被封。
其他方法:通過模擬器或者真機(jī),使用AnyProxy等軟件,原理同上,抓取https包信息,再利用自動(dòng)化工具來實(shí)現(xiàn)采取所有公眾號(hào)的目的。此方法嘗試后發(fā)現(xiàn)限制較多,比如對(duì)微信版本,安卓版本都有限制。 之前用自動(dòng)化工具的時(shí)候發(fā)現(xiàn)會(huì)經(jīng)常獲取不到軟件的元素,不知道是配置不行還是怎樣。另外在公司的網(wǎng)絡(luò)里,不能打開公眾號(hào)的全部消息。不明。
目前代碼還有點(diǎn)問題,文章內(nèi)容還不能插入到oracle數(shù)據(jù)庫中。抓取的只是文章的文字部分,看到別人有庫可以轉(zhuǎn)換成markdown的形式保存下來。
另外此時(shí)抓取的文章不包括最新的文章,可以通過搜狗的接口去獲取最新的文章。那個(gè)接口有限制,最多10條數(shù)據(jù)。
用fildder去獲取pc端微信的接口,通過接口去自動(dòng)爬取數(shù)據(jù)。
import requests
import json
import time
from bs4 import BeautifulSoup
from lxml import html
import cx_Oracle
from datetime import datetime
def parse(__biz, uin, key, pass_ticket, appmsg_token="", offset="0", **kwargs):
url = "txe_eliforp/pm/moc.qq.nixiew.pm//:sptth"[::-1]
url = "https://mp.weixin.qq.com/mp/profile_ext"
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko)"
"Chrome/39.0.2171.95 Safari/537.36 MicroMessenger/6.5.2.501 NetType/WIFI WindowsWechat "
"QBCore/3.43.901.400 QQBrowser/9.0.2524.400"}
params = {"action": "getmsg", "__biz": __biz, "f": "json", "offset": str(offset), "count": "10", "is_ok": "1",
"scene": "124", "uin": uin, "key": key, "pass_ticket": pass_ticket, "wxtoken": "", "appmsg_token":
appmsg_token, "x5": "0"}
proxies = {"https": None, "http": None, }
cookies = {"rewardsn": '', "wxtokenkey": "777", "wxuin": "1679389560", "devicetype": "Windows10",
"version": "62060739", "lang": "zh_CN",
"wap_sid2": "wap_sid2=CPjm5aAGEnB2RGdMZFEycHFBcFZvSFJDZkJjMmQ1WWhFWmVDM0FtRDhqeDVYOXhRMmNqVXlRb0hCcThMWll4TUstMWtVRXdFc2haa1A1TklkYk03c1ItNEozRWxEQWFtMk9sSVRqMjdEemNUR0xESnRZdnRBd0FBMN/x5OUFOA1AlU4=; Path=/; HttpOnly"}
res = requests.get(
url, cookies=cookies, headers=headers, params=params, proxies=proxies, timeout=3)
data = json.loads(res.text)
msg_list = eval(data.get("general_msg_list")).get("list", [])
print("msg_list", msg_list)
for msg in msg_list:
comm_msg_info = msg['comm_msg_info'] # 該數(shù)據(jù)是本次推送多篇文章公共的
msg_id = comm_msg_info['id'] # 文章id
msg_type = comm_msg_info['type']
post_time = datetime.fromtimestamp(
comm_msg_info['datetime']) # 發(fā)布時(shí)間
if msg_type != 49:
continue
app_msg_ext_info = msg["app_msg_ext_info"]
if app_msg_ext_info:
url = app_msg_ext_info["content_url"]
title = app_msg_ext_info["title"]
本次推送的首條文章
_parse_articles(
app_msg_ext_info, msg_id, post_time, msg_type)
本次推送的其余文章
multi_app_msg_item_list = app_msg_ext_info.get(
'multi_app_msg_item_list')
if multi_app_msg_item_list:
for item in multi_app_msg_item_list:
msg_id = item['fileid'] # 文章id
if msg_id or not isinstance(msg_id, int):
設(shè)置唯一id,解決部分文章id=0出現(xiàn)唯一索引沖突的情況
msg_id = int(time.time())
_parse_articles(
item, msg_id, post_time, msg_type)
print(title, url)
with open('article.csv', 'a') as f: f.write(title + ',' + digest + ',' + url + ',' + str(date) + '')
判斷是否可繼續(xù)翻頁 1-可以翻頁 0-到底了
if 1 == data.get("can_msg_continue", 0):
time.sleep(3)
print('翻頁------------------------------------')
parse(__biz, uin, key, pass_ticket, appmsg_token, data["next_offset"])
else:
print("爬取完畢")
def _parse_articles(info, msg_id, post_time, msg_type):
"""解析嵌套文章數(shù)據(jù)并保存入庫"""
title = info.get('title') # 標(biāo)題
cover = info.get('cover') # 封面圖
author = info.get('author') # 作者
digest = info.get('digest') # 關(guān)鍵字
source_url = info.get('source_url') # 原文地址
content_url = info.get('content_url') # 微信地址
ext_data = json.dumps(info, ensure_ascii=False) # 原始數(shù)據(jù)
content_url = content_url.replace('amp;', '').replace(
'#wechat_redirect', '').replace('http', 'https').replace("\", "")
content = crawl_article_content(content_url)
print(type(content))
print(content)
insert(
[
(msg_id, title, author, cover, digest, source_url,
content_url, post_time, datetime.now(), "")
]
)
def insert(param):
建立和數(shù)據(jù)庫系統(tǒng)的連接
conn = cx_Oracle.connect("yjqg_cs2/oracle123@192.168.0.235:1521/orcl")
獲取操作游標(biāo)
cursor = conn.cursor()
執(zhí)行SQL,創(chuàng)建一個(gè)表
cursor.execute(
"create table tb_user(id number, name varchar2(50),password varchar(50),primary key(id))")
sql = ("insert into wx_article values (:id,:msg_id,:title,:author,:cover,:digest,:source_url,:content_url,:post_time,"
":create_time,:content)")
cursor.executemany(sql, param)
x = cursor.execute("commit")
關(guān)閉連接,釋放資源
cursor.close()
conn.close()
執(zhí)行完成,打印提示信息
print("已插入數(shù)據(jù)")
def crawl_article_content(content_url):
"""抓取文章內(nèi)容
:param content_url: 文章地址
"""
try:
html = requests.get(content_url, verify=False).text
except:
print(content_url)
pass
else:
bs = BeautifulSoup(html, 'html.parser')
js_content = bs.find(id='js_content')
if js_content:
p_list = js_content.find_all('p')
content_list = list(
map(lambda p: p.text, filter(lambda p: p.text != '', p_list)))
content = ''.join(content_list)
return content
if name == 'main':
biz = 'MjM5OTYwOTM0Nw=='
uin = 'MTY3OTM4OTU2MA=='
key = 'ecaf4cd30abcc9709264253c1793480783e020f2869752e8c6ad9ca8d02339fa102491c488bffe6949eb197b08da5b25630f06f10656592c9766fa3c81bd87b601196d08a59d143f981c55d04c6c9da0'
pass_ticket = 'S+QbSO3cBSGp+lhEokLHjhRNkyAXUxlPAlP8qEdXr1SErHnk2Rk21q5chPjiKP+N'
appmsg_token = '1005_kPRG1Vmt3Uc37O0Md33biezanF-yIt5fKbBoZA~~'
parse(biz, uin, key, pass_ticket, appmsg_token, 1)