1
0
forked from kodorvan/stcs

Deleted tasks.py

This commit is contained in:
algizn97
2025-08-29 11:41:18 +05:00
parent e05b214a8a
commit f6130c0b8c
2 changed files with 39 additions and 131 deletions

View File

@@ -3,26 +3,53 @@ import logging.config
from pybit.unified_trading import WebSocket from pybit.unified_trading import WebSocket
from websocket import WebSocketConnectionClosedException from websocket import WebSocketConnectionClosedException
from logger_helper.logger_helper import LOGGING_CONFIG from logger_helper.logger_helper import LOGGING_CONFIG
import app.telegram.database.requests as rq
logging.config.dictConfig(LOGGING_CONFIG) logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("bybit_ws") logger = logging.getLogger("bybit_ws")
event_loop = None # Сюда нужно будет установить event loop из основного приложения event_loop = None # Сюда нужно будет установить event loop из основного приложения
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
"""
try:
return asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def set_event_loop(loop: asyncio.AbstractEventLoop): def set_event_loop(loop: asyncio.AbstractEventLoop):
global event_loop global event_loop
event_loop = loop event_loop = loop
def on_execution_callback(message, msg): async def run_ws_for_user(tg_id, message) -> None:
""" """
Callback на событие исполнения сделки. Запускает WebSocket Bybit для пользователя с указанным tg_id.
Безопасно запускает асинхронный обработчик из sync callback.
""" """
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
await start_execution_ws(api_key, api_secret, message)
def on_order_callback(message, msg):
if event_loop is not None: if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_execution_message # Импорт внутри, чтобы избежать циклических импортов from app.services.Bybit.functions.Futures import handle_order_message
asyncio.run_coroutine_threadsafe(handle_execution_message(message, msg), event_loop) asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop)
else:
logger.error("Event loop не установлен, callback пропущен.")
def on_execution_callback(message, ws_msg):
if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_execution_message
asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop)
else: else:
logger.error("Event loop не установлен, callback пропущен.") logger.error("Event loop не установлен, callback пропущен.")
@@ -35,8 +62,14 @@ async def start_execution_ws(api_key: str, api_secret: str, message):
reconnect_delay = 5 reconnect_delay = 5
while True: while True:
try: try:
if not api_key or not api_secret:
logger.error("API_KEY и API_SECRET должны быть указаны для подключения к приватным каналам.")
await asyncio.sleep(reconnect_delay)
continue
ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private") ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private")
ws.execution_stream(lambda msg: on_execution_callback(message, msg))
ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg))
ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg))
while True: while True:
await asyncio.sleep(1) # Поддержание активности await asyncio.sleep(1) # Поддержание активности
except WebSocketConnectionClosedException: except WebSocketConnectionClosedException:

View File

@@ -1,125 +0,0 @@
import asyncio
import logging.config
from typing import Optional
from app.services.Bybit.functions.Futures import close_trade_after_delay, trading_cycle, open_position
from logger_helper.logger_helper import LOGGING_CONFIG
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("tasks")
active_start_tasks = {}
active_close_tasks = {}
active_start_tasks_timer = {}
lock_start_tasks = asyncio.Lock()
lock_close_tasks = asyncio.Lock()
def start_trading_cycle(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None:
"""
Запускает асинхронную задачу торгового цикла для пользователя с указанным tg_id.
"""
task = asyncio.create_task(open_position(tg_id, message, side, margin_mode, tpsl_mode))
active_start_tasks[tg_id] = task
def stop_trading_cycle(tg_id) -> None:
"""
Останавливает (отменяет) задачу торгового цикла для пользователя с указанным tg_id.
"""
task: Optional[asyncio.Task] = active_start_tasks.pop(tg_id, None)
if task:
task.cancel()
def start_trading_for_timer(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full') -> None:
# Старт с задержкой (trading_cycle)
task = asyncio.create_task(trading_cycle(tg_id, message))
active_start_tasks_timer[tg_id] = task
def stop_trading_for_timer(tg_id) -> None:
task: Optional[asyncio.Task] = active_start_tasks_timer.pop(tg_id, None)
if task:
task.cancel()
def start_close_trade_task(tg_id, message, symbol, delay_sec) -> None:
"""
Запускает асинхронную задачу автоматического закрытия сделки после задержки.
"""
task = asyncio.create_task(close_trade_after_delay(tg_id, message, symbol, delay_sec))
active_close_tasks[tg_id] = task
def stop_close_trade_task(tg_id) -> None:
"""
Останавливает (отменяет) задачу автоматического закрытия сделки для пользователя.
"""
task: Optional[asyncio.Task] = active_close_tasks.pop(tg_id, None)
if task:
task.cancel()
async def handle_start_trading(tg_id: int, message, side: str, margin_mode: str, tpsl_mode='Full', use_timer=False):
"""
Запускает торговый цикл. Если уже есть запущенная задача, отменяет её.
"""
async with lock_start_tasks:
if use_timer:
old_task = active_start_tasks_timer.get(tg_id)
if old_task and not old_task.done():
old_task.cancel()
try:
await old_task
except asyncio.CancelledError:
logger.info(f"Старая задача торговли с таймером для пользователя {tg_id} отменена")
start_trading_for_timer(tg_id, message, side, margin_mode, tpsl_mode)
logger.info(f"Новая задача торговли с таймером запущена для пользователя {tg_id}")
else:
old_task = active_start_tasks.get(tg_id)
if old_task and not old_task.done():
old_task.cancel()
try:
await old_task
except asyncio.CancelledError:
logger.info(f"Старая задача торговли для пользователя {tg_id} отменена")
start_trading_cycle(tg_id, message, side, margin_mode, tpsl_mode)
logger.info(f"Новая задача торговли запущена для пользователя {tg_id}")
async def handle_stop_trading(tg_id: int, use_timer=False):
"""
Останавливает торговую задачу пользователя, если она активна.
"""
async with lock_start_tasks:
if use_timer:
stop_trading_for_timer(tg_id)
logger.info(f"Задача торговли с таймером остановлена для пользователя {tg_id}")
else:
stop_trading_cycle(tg_id)
logger.info(f"Задача торговли остановлена для пользователя {tg_id}")
async def handle_start_close_trade(tg_id: int, message, symbol: str, delay_sec: int):
"""
Запускает задачу закрытия сделки с задержкой. Отменяет старую задачу, если есть.
"""
async with lock_close_tasks:
old_task = active_close_tasks.get(tg_id)
if old_task and not old_task.done():
old_task.cancel()
try:
await old_task
except asyncio.CancelledError:
logger.info(f"Старая задача закрытия сделки пользователя {tg_id} отменена")
start_close_trade_task(tg_id, message, symbol, delay_sec)
logger.info(f"Задача закрытия сделки для {symbol} запущена с задержкой {delay_sec}s для пользователя {tg_id}")
async def handle_stop_close_trade(tg_id: int):
"""
Отменяет задачу закрытия сделки пользователя, если она есть.
"""
async with lock_close_tasks:
stop_close_trade_task(tg_id)
logger.info(f"Задача закрытия сделки отменена для пользователя {tg_id}")