diff --git a/app/tasks/tasks.py b/app/tasks/tasks.py index 13becce..a59c859 100644 --- a/app/tasks/tasks.py +++ b/app/tasks/tasks.py @@ -1,24 +1,98 @@ import asyncio - +import logging.config +from typing import Optional from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle +from logger_helper.logger_helper import LOGGING_CONFIG + +logging.config.dictConfig(LOGGING_CONFIG) +logger = logging.getLogger("tasks") active_start_tasks = {} active_close_tasks = {} -def start_trading_cycle(tg_id, message): +lock_start_tasks = asyncio.Lock() +lock_close_tasks = asyncio.Lock() + + +def start_trading_cycle(tg_id, message) -> None: + """ + Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id. + """ task = asyncio.create_task(trading_cycle(tg_id, message)) active_start_tasks[tg_id] = task -def stop_trading_cycle(tg_id): - task = active_start_tasks.pop(tg_id, None) + +def stop_trading_cycle(tg_id) -> None: + """ + Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id. + """ + task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None) if task: task.cancel() -def start_close_trade_task(tg_id, message, symbol, delay_sec): + +def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None: + """ + Запускает асинхронную задачу автоматического закрытия сделки после задержки. + """ task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec)) active_close_tasks[tg_id] = task -def stop_close_trade_task(tg_id): - task = active_close_tasks.pop(tg_id, None) + +def stop_close_trade_task(tg_id) -> None: + """ + Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя. + """ + task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None) if task: task.cancel() + + +async def handle_start_trading(tg_id: int, message): + """ + Запускает торговый цикл. Если уже есть запущенная задача, отменяет её. + """ + async with lock_start_tasks: + old_task = active_start_tasks.get(tg_id) + if old_task and not old_task.done(): + old_task.cancel() + try: + await old_task + except asyncio.CancelledError: + logger.info(f"Старая задача торговли для пользователя {tg_id} отменена") + start_trading_cycle(tg_id, message) + logger.info(f"Новая задача торговли запущена для пользователя {tg_id}") + + +async def handle_stop_trading(tg_id: int): + """ + Останавливает торговую задачу пользователя, если она активна. + """ + async with lock_start_tasks: + stop_trading_cycle(tg_id) + logger.info(f"Задача торговли остановлена для пользователя {tg_id}") + + +async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int): + """ + Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть. + """ + async with lock_close_tasks: + old_task = active_close_tasks.get(tg_id) + if old_task and not old_task.done(): + old_task.cancel() + try: + await old_task + except asyncio.CancelledError: + logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена") + start_close_trade_task(tg_id, message, symbol, delay_sec) + logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}") + + +async def handle_stop_close_trade(tg_id: int): + """ + Отменяет задачу закрытия сделки пользователя, если она есть. + """ + async with lock_close_tasks: + stop_close_trade_task(tg_id) + logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")