import asyncio import logging.config from typing import Optional from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle from logger_helper.logger_helper import LOGGING_CONFIG logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("tasks") active_start_tasks = {} active_close_tasks = {} lock_start_tasks = asyncio.Lock() lock_close_tasks = asyncio.Lock() def start_trading_cycle(tg_id, message) -> None: """ Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id. """ task = asyncio.create_task(trading_cycle(tg_id, message)) active_start_tasks[tg_id] = task def stop_trading_cycle(tg_id) -> None: """ Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id. """ task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None) if task: task.cancel() def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None: """ Запускает асинхронную задачу автоматического закрытия сделки после задержки. """ task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec)) active_close_tasks[tg_id] = task def stop_close_trade_task(tg_id) -> None: """ Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя. """ task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None) if task: task.cancel() async def handle_start_trading(tg_id: int, message): """ Запускает торговый цикл. Если уже есть запущенная задача, отменяет её. """ async with lock_start_tasks: old_task = active_start_tasks.get(tg_id) if old_task and not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: logger.info(f"Старая задача торговли для пользователя {tg_id} отменена") start_trading_cycle(tg_id, message) logger.info(f"Новая задача торговли запущена для пользователя {tg_id}") async def handle_stop_trading(tg_id: int): """ Останавливает торговую задачу пользователя, если она активна. """ async with lock_start_tasks: stop_trading_cycle(tg_id) logger.info(f"Задача торговли остановлена для пользователя {tg_id}") async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int): """ Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть. """ async with lock_close_tasks: old_task = active_close_tasks.get(tg_id) if old_task and not old_task.done(): old_task.cancel() try: await old_task except asyncio.CancelledError: logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена") start_close_trade_task(tg_id, message, symbol, delay_sec) logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}") async def handle_stop_close_trade(tg_id: int): """ Отменяет задачу закрытия сделки пользователя, если она есть. """ async with lock_close_tasks: stop_close_trade_task(tg_id) logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")