首先需要有一個(gè)telegram賬號,
可以使用虛擬手機(jī)號注冊:textnow或者gv
準(zhǔn)備環(huán)境:
目前是python37,
# 安裝 Telethon 包
pip install Telethon
編寫登錄代碼:
from telethon import TelegramClient, utils, errors
from telethon.sessions import StringSession
import asyncio
“”“
api_id,和hash,需要去
https://my.telegram.org/auth?to=apps
上面注冊一個(gè)app,注冊一次即可,不需要每一個(gè)賬號都注冊,
id和hash是表示是什么軟件接入telegram服務(wù)器的,可以在官方提供的app中提取,畢竟源碼是開源的,
”“”
api_id =""
api_hash=""
phone =""
password = "" # 二次驗(yàn)證的密碼,如果沒有,為空即可
"""
PROXY_TYPE_SOCKS4 = SOCKS4 = 1
PROXY_TYPE_SOCKS5 = SOCKS5 = 2
PROXY_TYPE_HTTP = HTTP = 3
"""
# 目前我使用的是socket5 的代理
p = (2, "127.0.0.1", 1080)
# 如果是多線程采集的情況下,需要指定事件循環(huán),在單線程的情況,可以不要loop參數(shù)
loop = asyncio.new_event_loop()
“”“
登錄 session,這里有兩個(gè)選擇,
一個(gè)是.session 文件,這種保存session的方式,不是很推薦,在后面機(jī)器部署的時(shí)候,需要同步每一個(gè)賬號的session,文件,這種只需要指定一個(gè)文件的路徑即可,例如:/home/xxx.session,
另外一個(gè)是將session,保存成一個(gè)字符串,這樣可以在數(shù)據(jù)庫中保存,例如,
需要導(dǎo)入session包
from telethon.sessions import StringSession
StringSession("xxx")
”“
session = StringSession("xxx")
client = TelegramClient(session, api_id, api_hash,
proxy=p, loop=loop,
auto_reconnect=True,
)
client.start(phone=phone, password=password)
print(client.get_me())
## 如何保存session 字符串?dāng)?shù)據(jù)了?
# 需要登錄后的賬號,然后執(zhí)行下面的命令,便可以把session的token保存為字符串
print(StringSession.save(client.session))
# 使用監(jiān)聽的方式 采集消息
from telethon import TelegramClient, events
# 登錄代碼同上
client.add_event_handler(callback=msg_event_handler,
event=events.NewMessage(incoming=True))
async def msg_event_handler( event: events.NewMessage.Event):
message = event.message
# 判斷是否是群組或者頻道發(fā)送的消息
if event.is_channel or event.is_group:
## 獲取消息的id,
if isinstance(message.to_id, types.PeerChannel):
to_id = message.to_id.channel_id
else:
to_id = message.to_id.chat_id
## 自定義處理
# 使用api的方式查詢聊天記錄
username = "群組|頻道 的username或者id"
min_id = 0 # 開啟的消息id
limit =1000 # 檢索的消息數(shù)量
“”“
telegram使用的是群組|頻道+ 消息id來標(biāo)示唯一消息的,而且消息id是從1-無窮的
”“”
messages = client.iter_messages(username, min_id=min_id, reverse=True, limit=limit)
# 進(jìn)行消息處理
async for message in messages:
print(message)
# 例子:
async def dow():
dialog = await client.get_entity("fordarknetspiderbot")
old = datetime.datetime.now()
zero_today = old - datetime.timedelta(days=0, hours=old.hour, minutes=old.minute, seconds=old.second,
microseconds=old.microsecond)
async for msg in client.iter_messages(dialog, min_id=2110, offset_date=zero_today, reverse=True):
# user = await client.get_entity(msg.from_id)
print(" 時(shí)間:{} id:{} 消息:{}, ".format(
(msg.date + datetime.timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S"), msg.id, msg.message))
now = datetime.datetime.now()
t = now - old
print(t)
client.loop.run_until_complete(dow())
總結(jié):一般來說監(jiān)聽的方式比查詢api的方式要快,而且要好,查詢api會被限流,目前我是兩個(gè)方式一起用的,因?yàn)橛幸恍┵~號被封了的時(shí)候,新賬號又不想去理會,可以新開一個(gè)線程去輪訓(xùn)服務(wù)器中的群組列表,我們只需要當(dāng)前登錄的賬號中都沒有添加這些群組的群組,這樣子就不和監(jiān)聽的方式?jīng)_突了,
# 設(shè)置監(jiān)聽后,可以新開一個(gè)任務(wù)去輪訓(xùn)群組,
client.loop.create_task(task_Handler())
client.run_until_disconnected()
async def task_Handler():
pass