import asyncio import logging.config from typing import Optional from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle, open_position from logger_helper.logger_helper import LOGGING_CONFIG logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("tasks") active_start_tasks = {} active_close_tasks = {} active_start_tasks_timer = {} lock_start_tasks = asyncio.Lock() lock_close_tasks = asyncio.Lock() def start_trading_cycle(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None: """ Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id. """ task = asyncio.create_task(open_position(tg_id, message, side, margin_mode, tpsl_mode)) active_start_tasks[tg_id] = task def stop_trading_cycle(tg_id) -> None: """ Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id. """ task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None) if task: task.cancel() def start_trading_for_timer(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None: # Старт с задержкой (trading_cycle) task = asyncio.create_task(trading_cycle(tg_id, message)) active_start_tasks_timer[tg_id] = task def stop_trading_for_timer(tg_id) -> None: task: Optional[asyncio.Task] = active_start_tasks_timer.pop(tg_id, None) if task: task.cancel() def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None: """ Запускает асинхронную задачу автоматического закрытия сделки после задержки. """ task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec)) active_close_tasks[tg_id] = task def stop_close_trade_task(tg_id) -> None: """ Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя. """ task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None) if task: task.cancel() async def handle_start_trading(tg_id: int, message, side: str, margin_mode: str, tpsl_mode='Full', use_timer=False): """ Запускает торговый цикл. Если уже есть запущенная задача, отменяет её. """ async with lock_start_tasks: if use_timer: old_task = active_start_tasks_timer.get(tg_id) if old_task and not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: logger.info(f"Старая задача торговли с таймером для пользователя {tg_id} отменена") start_trading_for_timer(tg_id, message, side, margin_mode, tpsl_mode) logger.info(f"Новая задача торговли с таймером запущена для пользователя {tg_id}") else: old_task = active_start_tasks.get(tg_id) if old_task and not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: logger.info(f"Старая задача торговли для пользователя {tg_id} отменена") start_trading_cycle(tg_id, message, side, margin_mode, tpsl_mode) logger.info(f"Новая задача торговли запущена для пользователя {tg_id}") async def handle_stop_trading(tg_id: int, use_timer=False): """ Останавливает торговую задачу пользователя, если она активна. """ async with lock_start_tasks: if use_timer: stop_trading_for_timer(tg_id) logger.info(f"Задача торговли с таймером остановлена для пользователя {tg_id}") else: stop_trading_cycle(tg_id) logger.info(f"Задача торговли остановлена для пользователя {tg_id}") async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int): """ Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть. """ async with lock_close_tasks: old_task = active_close_tasks.get(tg_id) if old_task and not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена") start_close_trade_task(tg_id, message, symbol, delay_sec) logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}") async def handle_stop_close_trade(tg_id: int): """ Отменяет задачу закрытия сделки пользователя, если она есть. """ async with lock_close_tasks: stop_close_trade_task(tg_id) logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")