develop #4
@@ -1,5 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging.config
|
import logging.config
|
||||||
|
|
||||||
from pybit.unified_trading import WebSocket
|
from pybit.unified_trading import WebSocket
|
||||||
from websocket import WebSocketConnectionClosedException
|
from websocket import WebSocketConnectionClosedException
|
||||||
from logger_helper.logger_helper import LOGGING_CONFIG
|
from logger_helper.logger_helper import LOGGING_CONFIG
|
||||||
@@ -11,6 +12,30 @@ logger = logging.getLogger("bybit_ws")
|
|||||||
event_loop = None # Сюда нужно будет установить event loop из основного приложения
|
event_loop = None # Сюда нужно будет установить event loop из основного приложения
|
||||||
active_ws_tasks = {}
|
active_ws_tasks = {}
|
||||||
|
|
||||||
|
|
||||||
|
def on_ws_error(ws, error):
|
||||||
|
logger.error(f"WebSocket internal error: {error}")
|
||||||
|
# Запланировать переподключение через event loop
|
||||||
|
if event_loop:
|
||||||
|
asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop)
|
||||||
|
|
||||||
|
|
||||||
|
def on_ws_close(ws, close_status_code, close_msg):
|
||||||
|
logger.warning(f"WebSocket closed: {close_status_code} - {close_msg}")
|
||||||
|
# Запланировать переподключение через event loop
|
||||||
|
if event_loop:
|
||||||
|
asyncio.run_coroutine_threadsafe(reconnect_ws(ws), event_loop)
|
||||||
|
|
||||||
|
|
||||||
|
async def reconnect_ws(ws):
|
||||||
|
logger.info("Запускаем переподключение WebSocket...")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
try:
|
||||||
|
await ws.run_forever()
|
||||||
|
except WebSocketConnectionClosedException:
|
||||||
|
logger.info("WebSocket переподключение успешно завершено.")
|
||||||
|
|
||||||
|
|
||||||
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
|
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
|
||||||
"""
|
"""
|
||||||
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
|
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
|
||||||
@@ -46,6 +71,7 @@ def on_order_callback(message, msg):
|
|||||||
if event_loop is not None:
|
if event_loop is not None:
|
||||||
from app.services.Bybit.functions.Futures import handle_order_message
|
from app.services.Bybit.functions.Futures import handle_order_message
|
||||||
asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop)
|
asyncio.run_coroutine_threadsafe(handle_order_message(message, msg), event_loop)
|
||||||
|
logger.info("Callback выполнен.")
|
||||||
else:
|
else:
|
||||||
logger.error("Event loop не установлен, callback пропущен.")
|
logger.error("Event loop не установлен, callback пропущен.")
|
||||||
|
|
||||||
@@ -54,6 +80,7 @@ def on_execution_callback(message, ws_msg):
|
|||||||
if event_loop is not None:
|
if event_loop is not None:
|
||||||
from app.services.Bybit.functions.Futures import handle_execution_message
|
from app.services.Bybit.functions.Futures import handle_execution_message
|
||||||
asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop)
|
asyncio.run_coroutine_threadsafe(handle_execution_message(message, ws_msg), event_loop)
|
||||||
|
logger.info("Callback выполнен.")
|
||||||
else:
|
else:
|
||||||
logger.error("Event loop не установлен, callback пропущен.")
|
logger.error("Event loop не установлен, callback пропущен.")
|
||||||
|
|
||||||
@@ -72,8 +99,12 @@ async def start_execution_ws(api_key: str, api_secret: str, message):
|
|||||||
continue
|
continue
|
||||||
ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private")
|
ws = WebSocket(api_key=api_key, api_secret=api_secret, testnet=False, channel_type="private")
|
||||||
|
|
||||||
|
ws.on_error = on_ws_error
|
||||||
|
ws.on_close = on_ws_close
|
||||||
|
|
||||||
ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg))
|
ws.subscribe("order", lambda ws_msg: on_order_callback(message, ws_msg))
|
||||||
ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg))
|
ws.subscribe("execution", lambda ws_msg: on_execution_callback(message, ws_msg))
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
await asyncio.sleep(1) # Поддержание активности
|
await asyncio.sleep(1) # Поддержание активности
|
||||||
except WebSocketConnectionClosedException:
|
except WebSocketConnectionClosedException:
|
||||||
|
Reference in New Issue
Block a user