import asyncio import logging.config from pybit.unified_trading import WebSocket from websocket import WebSocketConnectionClosedException from logger_helper.logger_helper import LOGGING_CONFIG import app.telegram.database.requests as rq logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("bybit_ws") event_loop = None # Сюда нужно будет установить event loop из основного приложения active_ws_tasks = {} def on_ws_error(ws, error): logger.error(f"WebSocket internal error: {error}") # Запланировать переподключение через event loop if event_loop: asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop) def on_ws_close(ws, close_status_code, close_msg): logger.warning(f"WebSocket closed: {close_status_code} - {close_msg}") # Запланировать переподключение через event loop if event_loop: asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop) async def reconnect_ws(ws): logger.info("Запускаем переподключение WebSocket...") await asyncio.sleep(5) try: await ws.run_forever() except WebSocketConnectionClosedException: logger.info("WebSocket переподключение успешно завершено.") def get_or_create_event_loop() -> asyncio.AbstractEventLoop: """ Возвращает текущий активный цикл событий asyncio или создает новый, если его нет. """ try: return asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop def set_event_loop(loop: asyncio.AbstractEventLoop): global event_loop event_loop = loop async def run_ws_for_user(tg_id, message) -> None: """ Запускает WebSocket Bybit для пользователя с указанным tg_id. """ if tg_id not in active_ws_tasks or active_ws_tasks[tg_id].done(): api_key = await rq.get_bybit_api_key(tg_id) api_secret = await rq.get_bybit_secret_key(tg_id) # Запускаем WebSocket как асинхронную задачу active_ws_tasks[tg_id] = asyncio.create_task( start_execution_ws(api_key, api_secret, message) ) logger.info(f"WebSocket для пользователя {tg_id} запущен.") def on_order_callback(message, msg): if event_loop is not None: from app.services.Bybit.functions.Futures import handle_order_message asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop) logger.info("Callback выполнен.") else: logger.error("Event loop не установлен, callback пропущен.") def on_execution_callback(message, ws_msg): if event_loop is not None: from app.services.Bybit.functions.Futures import handle_execution_message asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop) logger.info("Callback выполнен.") else: logger.error("Event loop не установлен, callback пропущен.") async def start_execution_ws(api_key: str, api_secret: str, message): """ Запускает и поддерживает WebSocket подключение для исполнения сделок. Реконнект при потерях соединения. """ reconnect_delay = 5 while True: try: if not api_key or not api_secret: logger.error("API_KEY и API_SECRET должны быть указаны для подключения к приватным каналам.") await asyncio.sleep(reconnect_delay) continue ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private") ws.on_error = on_ws_error ws.on_close = on_ws_close ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg)) ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg)) while True: await asyncio.sleep(1) # Поддержание активности except WebSocketConnectionClosedException: logger.warning("WebSocket закрыт, переподключение через 5 секунд...") await asyncio.sleep(reconnect_delay) except Exception as e: logger.error(f"Ошибка WebSocket: {e}") await asyncio.sleep(reconnect_delay)