2
0
forked from kodorvan/stcs
Files
stcs/app/services/Bybit/functions/bybit_ws.py
algizn97 44f9b05001 Fixed
2025-09-10 15:28:08 +05:00

116 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging.config
from pybit.unified_trading import WebSocket
from websocket import WebSocketConnectionClosedException
from logger_helper.logger_helper import LOGGING_CONFIG
import app.telegram.database.requests as rq
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("bybit_ws")
event_loop = None # Сюда нужно будет установить event loop из основного приложения
active_ws_tasks = {}
def on_ws_error(ws, error):
logger.error(f"WebSocket internal error: {error}")
# Запланировать переподключение через event loop
if event_loop:
asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop)
def on_ws_close(ws, close_status_code, close_msg):
logger.warning(f"WebSocket closed: {close_status_code} - {close_msg}")
# Запланировать переподключение через event loop
if event_loop:
asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop)
async def reconnect_ws(ws):
logger.info("Запускаем переподключение WebSocket...")
await asyncio.sleep(5)
try:
await ws.run_forever()
except WebSocketConnectionClosedException:
logger.info("WebSocket переподключение успешно завершено.")
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
"""
try:
return asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
def set_event_loop(loop: asyncio.AbstractEventLoop):
global event_loop
event_loop = loop
async def run_ws_for_user(tg_id, message) -> None:
"""
Запускает WebSocket Bybit для пользователя с указанным tg_id.
"""
if tg_id not in active_ws_tasks or active_ws_tasks[tg_id].done():
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
# Запускаем WebSocket как асинхронную задачу
active_ws_tasks[tg_id] = asyncio.create_task(
start_execution_ws(api_key, api_secret, message)
)
logger.info(f"WebSocket для пользователя {tg_id} запущен.")
def on_order_callback(message, msg):
if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_order_message
asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop)
logger.info("Callback выполнен.")
else:
logger.error("Event loop не установлен, callback пропущен.")
def on_execution_callback(message, ws_msg):
if event_loop is not None:
from app.services.Bybit.functions.Futures import handle_execution_message
asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop)
logger.info("Callback выполнен.")
else:
logger.error("Event loop не установлен, callback пропущен.")
async def start_execution_ws(api_key: str, api_secret: str, message):
"""
Запускает и поддерживает WebSocket подключение для исполнения сделок.
Реконнект при потерях соединения.
"""
reconnect_delay = 5
while True:
try:
if not api_key or not api_secret:
logger.error("API_KEY и API_SECRET должны быть указаны для подключения к приватным каналам.")
await asyncio.sleep(reconnect_delay)
continue
ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private")
ws.on_error = on_ws_error
ws.on_close = on_ws_close
ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg))
ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg))
while True:
await asyncio.sleep(1) # Поддержание активности
except WebSocketConnectionClosedException:
logger.warning("WebSocket закрыт, переподключение через 5 секунд...")
await asyncio.sleep(reconnect_delay)
except Exception as e:
logger.error(f"Ошибка WebSocket: {e}")
await asyncio.sleep(reconnect_delay)