forked from kodorvan/stcs
126 lines
5.5 KiB
Python
126 lines
5.5 KiB
Python
import asyncio
|
||
import logging.config
|
||
from typing import Optional
|
||
from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle, open_position
|
||
from logger_helper.logger_helper import LOGGING_CONFIG
|
||
|
||
logging.config.dictConfig(LOGGING_CONFIG)
|
||
logger = logging.getLogger("tasks")
|
||
|
||
active_start_tasks = {}
|
||
active_close_tasks = {}
|
||
active_start_tasks_timer = {}
|
||
|
||
lock_start_tasks = asyncio.Lock()
|
||
lock_close_tasks = asyncio.Lock()
|
||
|
||
|
||
def start_trading_cycle(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None:
|
||
"""
|
||
Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id.
|
||
"""
|
||
task = asyncio.create_task(open_position(tg_id, message, side, margin_mode, tpsl_mode))
|
||
active_start_tasks[tg_id] = task
|
||
|
||
|
||
def stop_trading_cycle(tg_id) -> None:
|
||
"""
|
||
Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id.
|
||
"""
|
||
task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None)
|
||
if task:
|
||
task.cancel()
|
||
|
||
|
||
def start_trading_for_timer(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None:
|
||
# Старт с задержкой (trading_cycle)
|
||
task = asyncio.create_task(trading_cycle(tg_id, message))
|
||
active_start_tasks_timer[tg_id] = task
|
||
|
||
def stop_trading_for_timer(tg_id) -> None:
|
||
task: Optional[asyncio.Task] = active_start_tasks_timer.pop(tg_id, None)
|
||
if task:
|
||
task.cancel()
|
||
|
||
|
||
def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None:
|
||
"""
|
||
Запускает асинхронную задачу автоматического закрытия сделки после задержки.
|
||
"""
|
||
task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec))
|
||
active_close_tasks[tg_id] = task
|
||
|
||
|
||
def stop_close_trade_task(tg_id) -> None:
|
||
"""
|
||
Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя.
|
||
"""
|
||
task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None)
|
||
if task:
|
||
task.cancel()
|
||
|
||
|
||
async def handle_start_trading(tg_id: int, message, side: str, margin_mode: str, tpsl_mode='Full', use_timer=False):
|
||
"""
|
||
Запускает торговый цикл. Если уже есть запущенная задача, отменяет её.
|
||
"""
|
||
async with lock_start_tasks:
|
||
if use_timer:
|
||
old_task = active_start_tasks_timer.get(tg_id)
|
||
if old_task and not old_task.done():
|
||
old_task.cancel()
|
||
try:
|
||
await old_task
|
||
except asyncio.CancelledError:
|
||
logger.info(f"Старая задача торговли с таймером для пользователя {tg_id} отменена")
|
||
start_trading_for_timer(tg_id, message, side, margin_mode, tpsl_mode)
|
||
logger.info(f"Новая задача торговли с таймером запущена для пользователя {tg_id}")
|
||
else:
|
||
old_task = active_start_tasks.get(tg_id)
|
||
if old_task and not old_task.done():
|
||
old_task.cancel()
|
||
try:
|
||
await old_task
|
||
except asyncio.CancelledError:
|
||
logger.info(f"Старая задача торговли для пользователя {tg_id} отменена")
|
||
start_trading_cycle(tg_id, message, side, margin_mode, tpsl_mode)
|
||
logger.info(f"Новая задача торговли запущена для пользователя {tg_id}")
|
||
|
||
|
||
async def handle_stop_trading(tg_id: int, use_timer=False):
|
||
"""
|
||
Останавливает торговую задачу пользователя, если она активна.
|
||
"""
|
||
async with lock_start_tasks:
|
||
if use_timer:
|
||
stop_trading_for_timer(tg_id)
|
||
logger.info(f"Задача торговли с таймером остановлена для пользователя {tg_id}")
|
||
else:
|
||
stop_trading_cycle(tg_id)
|
||
logger.info(f"Задача торговли остановлена для пользователя {tg_id}")
|
||
|
||
|
||
async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int):
|
||
"""
|
||
Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть.
|
||
"""
|
||
async with lock_close_tasks:
|
||
old_task = active_close_tasks.get(tg_id)
|
||
if old_task and not old_task.done():
|
||
old_task.cancel()
|
||
try:
|
||
await old_task
|
||
except asyncio.CancelledError:
|
||
logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена")
|
||
start_close_trade_task(tg_id, message, symbol, delay_sec)
|
||
logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}")
|
||
|
||
|
||
async def handle_stop_close_trade(tg_id: int):
|
||
"""
|
||
Отменяет задачу закрытия сделки пользователя, если она есть.
|
||
"""
|
||
async with lock_close_tasks:
|
||
stop_close_trade_task(tg_id)
|
||
logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")
|