Files
stcs/app/services/Bybit/functions/bybit_ws.py
algizn97 2ee8c9916f Fixed
2025-08-30 16:29:56 +05:00

85 lines
3.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging.config
from pybit.unified_trading import WebSocket
from websocket import WebSocketConnectionClosedException
from logger_helper.logger_helper import LOGGING_CONFIG
import app.telegram.database.requests as rq
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("bybit_ws")
event_loop = None # Сюда нужно будет установить event loop из основного приложения
active_ws_tasks = {}
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
"""
try:
return asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def set_event_loop(loop: asyncio.AbstractEventLoop):
global event_loop
event_loop = loop
async def run_ws_for_user(tg_id, message) -> None:
"""
Запускает WebSocket Bybit для пользователя с указанным tg_id.
"""
if tg_id not in active_ws_tasks or active_ws_tasks[tg_id].done():
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
# Запускаем WebSocket как асинхронную задачу
active_ws_tasks[tg_id] = asyncio.create_task(
start_execution_ws(api_key, api_secret, message)
)
logger.info(f"WebSocket для пользователя {tg_id} запущен.")
def on_order_callback(message, msg):
if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_order_message
asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop)
else:
logger.error("Event loop не установлен, callback пропущен.")
def on_execution_callback(message, ws_msg):
if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_execution_message
asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop)
else:
logger.error("Event loop не установлен, callback пропущен.")
async def start_execution_ws(api_key: str, api_secret: str, message):
"""
Запускает и поддерживает WebSocket подключение для исполнения сделок.
Реконнект при потерях соединения.
"""
reconnect_delay = 5
while True:
try:
if not api_key or not api_secret:
logger.error("API_KEY и API_SECRET должны быть указаны для подключения к приватным каналам.")
await asyncio.sleep(reconnect_delay)
continue
ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private")
ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg))
ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg))
while True:
await asyncio.sleep(1) # Поддержание активности
except WebSocketConnectionClosedException:
logger.warning("WebSocket закрыт, переподключение через 5 секунд...")
await asyncio.sleep(reconnect_delay)
except Exception as e:
logger.error(f"Ошибка WebSocket: {e}")
await asyncio.sleep(reconnect_delay)