From cb6499e347eb1b6b063cb767baeef5bcfb0a00cc Mon Sep 17 00:00:00 2001 From: algizn97 Date: Thu, 18 Dec 2025 18:46:21 +0500 Subject: [PATCH] Fixed the work of the websocket --- app/bybit/web_socket.py | 194 ++++++++++++++++++++++++++-------------- run.py | 1 - 2 files changed, 128 insertions(+), 67 deletions(-) diff --git a/app/bybit/web_socket.py b/app/bybit/web_socket.py index cd438c0..17512ae 100644 --- a/app/bybit/web_socket.py +++ b/app/bybit/web_socket.py @@ -1,4 +1,5 @@ import asyncio +from collections import deque import logging.config from pybit.unified_trading import WebSocket @@ -12,81 +13,84 @@ logger = logging.getLogger("web_socket") class CustomWebSocket(WebSocket): + """Custom WebSocket wrapper with enhanced error handling.""" + def _on_error(self, error): logger.error(f"WebSocket error: {error}") return super()._on_error(error) + def _on_close(self): + logger.warning("WebSocket connection closed") + super()._on_close() + class WebSocketBot: """ - Class to handle WebSocket connections and messages. + Manages multiple Bybit private WebSocket connections for Telegram users. + Uses queue-based message processing to handle thread-safe async calls. """ def __init__(self, telegram_bot): - """Initialize the TradingBot class.""" + """ + Initialize WebSocketBot. + + Args: + telegram_bot: Telegram bot instance for message handling + """ self.telegram_bot = telegram_bot - self.ws_private = None - self.user_messages = {} self.user_sockets = {} + self.user_messages = {} self.user_keys = {} self.loop = None self.message_handler = TelegramMessageHandler(telegram_bot) + self.order_queues = {} # {tg_id: deque} + self.execution_queues = {} # {tg_id: deque} + self.processing_tasks = {} # {tg_id: task} + async def run_user_check_loop(self): - """Run a loop to check for users and connect them to the WebSocket.""" + """Main loop that continuously checks users and maintains connections.""" self.loop = asyncio.get_running_loop() + logger.info("Starting WebSocket user check loop") + while True: - users = await WebSocketBot.get_users_from_db() - for user in users: - tg_id = user.tg_id - api_key, api_secret = await rq.get_user_api(tg_id=tg_id) + try: + users = await WebSocketBot.get_users_from_db() + for user in users: + tg_id = user.tg_id + api_key, api_secret = await rq.get_user_api(tg_id=tg_id) - if not api_key or not api_secret: - continue + if not api_key or not api_secret: + continue - keys_stored = self.user_keys.get(tg_id) - if tg_id in self.user_sockets and keys_stored == (api_key, api_secret): - continue + keys_stored = self.user_keys.get(tg_id) + socket_exists = tg_id in self.user_sockets - if tg_id in self.user_sockets: - self.user_sockets.pop(tg_id, None) - self.user_messages.pop(tg_id, None) - self.user_keys.pop(tg_id, None) - logger.info( - "Closed old websocket for user %s due to key change", tg_id - ) + if socket_exists and keys_stored == (api_key, api_secret): + continue - success = await self.try_connect_user(api_key, api_secret, tg_id) - if success: - self.user_keys[tg_id] = (api_key, api_secret) - self.user_messages.setdefault( - tg_id, {"position": None, "order": None, "execution": None} - ) - logger.info("User %s connected to WebSocket", tg_id) - else: - await asyncio.sleep(5) - await self.try_connect_user(api_key, api_secret, tg_id) + if socket_exists: + await self.close_user_socket(tg_id) + + success = await self.try_connect_user(api_key, api_secret, tg_id) + if success: + self.user_keys[tg_id] = (api_key, api_secret) + self.user_messages.setdefault( + tg_id, {"position": None, "order": None, "execution": None} + ) + logger.info("User %s successfully connected", tg_id) + + except Exception as e: + logger.error("Error in user check loop: %s", e) await asyncio.sleep(10) - async def clear_user_sockets(self): - """Clear the user_sockets and user_messages dictionaries.""" - for tg_id, ws in list(self.user_sockets.items()): - try: - if ws and hasattr(ws, 'close'): - await ws.close() - except Exception as e: - logger.error(f"Error closing WS for {tg_id}: {e}") - - self.user_sockets.clear() - self.user_messages.clear() - self.user_keys.clear() - logger.info("Cleared user_sockets") - async def try_connect_user(self, api_key, api_secret, tg_id): - """Try to connect a user to the WebSocket.""" + """ + Create and setup WebSocket streams with thread-safe queues. + """ try: - self.ws_private = CustomWebSocket( + ws = CustomWebSocket( demo=True, testnet=False, channel_type="private", @@ -94,34 +98,92 @@ class WebSocketBot: api_secret=api_secret ) - self.user_sockets[tg_id] = self.ws_private - # Connect to the WebSocket private channel - # Handle order updates - self.ws_private.order_stream( - lambda msg: self.loop.call_soon_threadsafe( - asyncio.create_task, self.handle_order_update(msg, tg_id) - ) + self.user_sockets[tg_id] = ws + + self.order_queues[tg_id] = deque() + self.execution_queues[tg_id] = deque() + + self.processing_tasks[tg_id] = asyncio.create_task( + self._process_order_queue(tg_id) ) - # Handle execution updates - self.ws_private.execution_stream( - lambda msg: self.loop.call_soon_threadsafe( - asyncio.create_task, self.handle_execution_update(msg, tg_id) - ) + self.processing_tasks[tg_id + 1] = asyncio.create_task( + self._process_execution_queue(tg_id) ) + + def order_callback(msg): + self.order_queues[tg_id].append(msg) + + def execution_callback(msg): + self.execution_queues[tg_id].append(msg) + + ws.order_stream(order_callback) + ws.execution_stream(execution_callback) + + logger.info("WebSocket streams configured for user %s", tg_id) return True + except Exception as e: logger.error("Error connecting user %s: %s", tg_id, e) - await asyncio.sleep(5) + self.user_sockets.pop(tg_id, None) + return False + + async def _process_order_queue(self, tg_id): + """Continuously process order queue for user.""" + while tg_id in self.user_sockets: + try: + if self.order_queues[tg_id]: + msg = self.order_queues[tg_id].popleft() + await self.handle_order_update(msg, tg_id) + except Exception as e: + logger.error("Error processing order queue %s: %s", tg_id, e) + await asyncio.sleep(0.01) + + async def _process_execution_queue(self, tg_id): + """Continuously process execution queue for user.""" + while tg_id in self.user_sockets: + try: + if self.execution_queues[tg_id]: + msg = self.execution_queues[tg_id].popleft() + await self.handle_execution_update(msg, tg_id) + except Exception as e: + logger.error("Error processing execution queue %s: %s", tg_id, e) + await asyncio.sleep(0.01) + + async def close_user_socket(self, tg_id): + """Gracefully close user connection.""" + if tg_id in self.user_sockets: + self.user_sockets.pop(tg_id, None) + + for key in (tg_id, tg_id + 1): + task = self.processing_tasks.pop(key, None) + if task and not task.done(): + task.cancel() + + self.order_queues.pop(tg_id, None) + self.execution_queues.pop(tg_id, None) + self.user_messages.pop(tg_id, None) + self.user_keys.pop(tg_id, None) + logger.info("Cleaned up user %s", tg_id) async def handle_order_update(self, message, tg_id): - """Handle order updates.""" - await self.message_handler.format_order_update(message, tg_id) + """Process order updates.""" + try: + await self.message_handler.format_order_update(message, tg_id) + except Exception as e: + logger.error("Error handling order update for %s: %s", tg_id, e) async def handle_execution_update(self, message, tg_id): - """Handle execution updates without duplicate processing.""" - await self.message_handler.format_execution_update(message, tg_id) + """Process execution updates.""" + try: + await self.message_handler.format_execution_update(message, tg_id) + except Exception as e: + logger.error("Error handling execution update for %s: %s", tg_id, e) @staticmethod async def get_users_from_db(): - """Get all users from the database.""" - return await rq.get_users() + """Fetch all users from database.""" + try: + return await rq.get_users() + except Exception as e: + logger.error("Error getting users from DB: %s", e) + return [] diff --git a/run.py b/run.py index 110b270..1ffea91 100644 --- a/run.py +++ b/run.py @@ -31,7 +31,6 @@ async def main(): dp = Dispatcher(storage=storage) dp.include_router(router) web_socket = WebSocketBot(telegram_bot=bot) - await web_socket.clear_user_sockets() ws_task = asyncio.create_task(web_socket.run_user_check_loop()) tg_task = asyncio.create_task(dp.start_polling(bot))