2026-01-24通過爬蟲的方式讀取bg的盤口

直接上代碼:

import asyncio
import json
import logging
import time
from dingding import DingSend
import websockets
import gzip
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
log_console = logging.info
#通過爬蟲的方式讀取bg的盤口


# 新增:獲取當(dāng)前毫秒級時(shí)間戳
def get_current_timestamp_ms():
    return int(time.time() * 1000)


# 新增:發(fā)送ping心跳消息
async def send_ping_message(ws):
    try:
        await ws.send("ping")
        log_console(f"已發(fā)送ping消息")
    except Exception as e:
        log_console(f"發(fā)送ping消息失敗:{e}")


class BgWebsocketClient:

    def __init__(self, uri):
        self.uri = uri
        self.websocket = None
        self.market_status = False

    async def connect(self):
        # 配置項(xiàng)

        print("建立鏈接")

        headers = {
            'Upgrade': 'websocket',
            'Origin': 'https://www.bitget.com',
            'Cache-Control': 'no-cache',
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Pragma': 'no-cache',
            'Connection': 'Upgrade',
            'Sec-WebSocket-Key': 'WUcBLyC+ry/xLhQ1pJP8bw==',
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36',
            'Sec-WebSocket-Version': '13',
            'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits',
        }

        print(type(headers))
        self.websocket = await websockets.connect(uri=self.uri,
                                                  additional_headers=headers)
        print('[ExWebsocketClient]connect ok')
        await self.on_open()

    async def on_open(self):
        global last_ping_ts
        last_ping_ts = int(time.time())
        print("打開鏈接")
        subscribe_messages = [
            {"op": "subscribe",
             "args": [{"channel": "depth", "instType": "mc", "instId": "DEEPUSDT", "params": {"scale": "0.01"}}]}
        ]
        await self.websocket.send("ping")
        for msg in subscribe_messages:
            try:
                json_msg = json.dumps(msg)
                await self.websocket.send(json_msg)
                log_console(f"已發(fā)送訂閱消息:{json_msg}")
                time.sleep(0.1)

            except Exception as e:
                log_console(f"發(fā)送訂閱消息失敗:{e}")

    async def on_message(self, message):
        # log_console(f"收到消息:{message}")
        decompressed_data = gzip.decompress(message)
        data = decompressed_data.decode('utf-8', errors='ignore')
        print("文本形式:", data)
        # print("返回消息了:",message)

        # data = json.loads(message)
        # b = data.get('b', 0)
        # if b != 0:
        #     ask = data.get('a', 0)
        #     exAsk = b
        #     exBid = ask
        # if CheckFunc:
        #     await CheckFunc()
        # if int(time.time()) - last_ping_ts > 30:
        #     last_ping_ts = int(time.time())
        #     await send_ping_message(self.websocket)

    async def on_error(self, error):

        print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}")
        error_message = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), f"Error: {error}"
        DingSend(f"Ex連接失敗 msg: {error_message}", at_all=True)

    async def on_close(self, code, reason):
        print("ex關(guān)閉連接")

    async def run(self):
        try:
            await self.connect()
            async for message in self.websocket:
                await self.on_message(message)
        except Exception as e:
            await self.on_error(e)
        finally:
            await self.on_close(None, None)
            await self.websocket.close()


async def ex_main():
    while True:
        print("進(jìn)入了while循環(huán)")

        client = BgWebsocketClient(
            "wss://stream.bitget.com/public/v1/stream?compress=true&terminalType=1")
        await client.run()
        await asyncio.sleep(1)


if __name__ == "__main__":
    # asyncio.run(ex_main("mexc_btc.json"))
    # coin = "mexc_btc"
    # with open(f"{coin}.json", 'r+') as f:
    #     init = json.loads(f.read())

    # print(init)
    asyncio.run(ex_main())
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容