diff --git a/app/services/Bybit/functions/bybit_ws.py b/app/services/Bybit/functions/bybit_ws.py index 8b8562d..4d0d31e 100644 --- a/app/services/Bybit/functions/bybit_ws.py +++ b/app/services/Bybit/functions/bybit_ws.py @@ -3,26 +3,53 @@ import logging.config from pybit.unified_trading import WebSocket from websocket import WebSocketConnectionClosedException from logger_helper.logger_helper import LOGGING_CONFIG +import app.telegram.database.requests as rq logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("bybit_ws") event_loop = None # Сюда нужно будет установить event loop из основного приложения +def get_or_create_event_loop() -> asyncio.AbstractEventLoop: + """ + Возвращает текущий активный цикл событий asyncio или создает новый, если его нет. + """ + try: + return asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + def set_event_loop(loop: asyncio.AbstractEventLoop): global event_loop event_loop = loop -def on_execution_callback(message, msg): +async def run_ws_for_user(tg_id, message) -> None: """ - Callback на событие исполнения сделки. - Безопасно запускает асинхронный обработчик из sync callback. + Запускает WebSocket Bybit для пользователя с указанным tg_id. """ + + api_key = await rq.get_bybit_api_key(tg_id) + api_secret = await rq.get_bybit_secret_key(tg_id) + + await start_execution_ws(api_key, api_secret, message) + + +def on_order_callback(message, msg): if event_loop is not None: - from app.services.Bybit.functions.Futures import handle_execution_message # Импорт внутри, чтобы избежать циклических импортов - asyncio.run_coroutine_threadsafe(handle_execution_message(message, msg), event_loop) + from app.services.Bybit.functions.Futures import handle_order_message + asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop) + else: + logger.error("Event loop не установлен, callback пропущен.") + + +def on_execution_callback(message, ws_msg): + if event_loop is not None: + from app.services.Bybit.functions.Futures import handle_execution_message + asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop) else: logger.error("Event loop не установлен, callback пропущен.") @@ -35,8 +62,14 @@ async def start_execution_ws(api_key: str, api_secret: str, message): reconnect_delay = 5 while True: try: + if not api_key or not api_secret: + logger.error("API_KEY и API_SECRET должны быть указаны для подключения к приватным каналам.") + await asyncio.sleep(reconnect_delay) + continue ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private") - ws.execution_stream(lambda msg: on_execution_callback(message, msg)) + + ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg)) + ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg)) while True: await asyncio.sleep(1) # Поддержание активности except WebSocketConnectionClosedException: diff --git a/app/tasks/tasks.py b/app/tasks/tasks.py deleted file mode 100644 index af81ad1..0000000 --- a/app/tasks/tasks.py +++ /dev/null @@ -1,125 +0,0 @@ -import asyncio -import logging.config -from typing import Optional -from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle, open_position -from logger_helper.logger_helper import LOGGING_CONFIG - -logging.config.dictConfig(LOGGING_CONFIG) -logger = logging.getLogger("tasks") - -active_start_tasks = {} -active_close_tasks = {} -active_start_tasks_timer = {} - -lock_start_tasks = asyncio.Lock() -lock_close_tasks = asyncio.Lock() - - -def start_trading_cycle(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None: - """ - Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id. - """ - task = asyncio.create_task(open_position(tg_id, message, side, margin_mode, tpsl_mode)) - active_start_tasks[tg_id] = task - - -def stop_trading_cycle(tg_id) -> None: - """ - Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id. - """ - task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None) - if task: - task.cancel() - - -def start_trading_for_timer(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None: - # Старт с задержкой (trading_cycle) - task = asyncio.create_task(trading_cycle(tg_id, message)) - active_start_tasks_timer[tg_id] = task - -def stop_trading_for_timer(tg_id) -> None: - task: Optional[asyncio.Task] = active_start_tasks_timer.pop(tg_id, None) - if task: - task.cancel() - - -def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None: - """ - Запускает асинхронную задачу автоматического закрытия сделки после задержки. - """ - task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec)) - active_close_tasks[tg_id] = task - - -def stop_close_trade_task(tg_id) -> None: - """ - Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя. - """ - task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None) - if task: - task.cancel() - - -async def handle_start_trading(tg_id: int, message, side: str, margin_mode: str, tpsl_mode='Full', use_timer=False): - """ - Запускает торговый цикл. Если уже есть запущенная задача, отменяет её. - """ - async with lock_start_tasks: - if use_timer: - old_task = active_start_tasks_timer.get(tg_id) - if old_task and not old_task.done(): - old_task.cancel() - try: - await old_task - except asyncio.CancelledError: - logger.info(f"Старая задача торговли с таймером для пользователя {tg_id} отменена") - start_trading_for_timer(tg_id, message, side, margin_mode, tpsl_mode) - logger.info(f"Новая задача торговли с таймером запущена для пользователя {tg_id}") - else: - old_task = active_start_tasks.get(tg_id) - if old_task and not old_task.done(): - old_task.cancel() - try: - await old_task - except asyncio.CancelledError: - logger.info(f"Старая задача торговли для пользователя {tg_id} отменена") - start_trading_cycle(tg_id, message, side, margin_mode, tpsl_mode) - logger.info(f"Новая задача торговли запущена для пользователя {tg_id}") - - -async def handle_stop_trading(tg_id: int, use_timer=False): - """ - Останавливает торговую задачу пользователя, если она активна. - """ - async with lock_start_tasks: - if use_timer: - stop_trading_for_timer(tg_id) - logger.info(f"Задача торговли с таймером остановлена для пользователя {tg_id}") - else: - stop_trading_cycle(tg_id) - logger.info(f"Задача торговли остановлена для пользователя {tg_id}") - - -async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int): - """ - Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть. - """ - async with lock_close_tasks: - old_task = active_close_tasks.get(tg_id) - if old_task and not old_task.done(): - old_task.cancel() - try: - await old_task - except asyncio.CancelledError: - logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена") - start_close_trade_task(tg_id, message, symbol, delay_sec) - logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}") - - -async def handle_stop_close_trade(tg_id: int): - """ - Отменяет задачу закрытия сделки пользователя, если она есть. - """ - async with lock_close_tasks: - stop_close_trade_task(tg_id) - logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")