2
0
forked from kodorvan/stcs

16 Commits

Author SHA1 Message Date
8706449c78 redis 2025-12-24 11:45:59 +05:00
89a3c70e4b venv 2025-12-23 12:58:30 +05:00
98cc3c248c Merge pull request 'Fixed the work of the websocket' (#36) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#36
2025-12-18 22:19:34 +07:00
algizn97
cb6499e347 Fixed the work of the websocket 2025-12-18 18:46:21 +05:00
7494f85202 Merge pull request 'Fixed the websocket' (#35) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#35
2025-12-16 00:18:12 +07:00
algizn97
867802b2a7 Fixed the websocket 2025-12-15 21:57:13 +05:00
28c9614ecb Merge pull request 'devel' (#34) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#34
2025-11-20 14:33:56 +07:00
algizn97
1f123c77e7 Fixed the websocket 2025-11-19 16:50:18 +05:00
algizn97
56729c287b Step comparison postponed 2025-11-17 20:40:24 +05:00
algizn97
f268d3290b Added stop loss setting in isolated mode and start trading with the base rate when the maximum number of steps is reached. 2025-11-17 20:39:06 +05:00
d6b36799dc Merge pull request 'Fixed database initialization' (#33) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#33
2025-11-13 01:24:04 +07:00
0b3e9ff476 Merge pull request 'Creating a new database' (#32) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#32
2025-11-11 23:11:01 +07:00
ec7e10f7a1 Merge pull request 'Updated database, added new migrations' (#31) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#31
2025-11-09 22:28:32 +07:00
37257d2ec2 Merge pull request 'The logic of the trading cycle has been changed' (#30) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#30
2025-11-09 15:35:45 +07:00
88c358b90e Merge pull request 'Fixed websocket' (#29) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#29
2025-11-02 19:55:17 +07:00
a7a23a4662 Merge pull request 'Added error information output' (#28) from Alex/stcs:devel into stable
Reviewed-on: kodorvan/stcs#28
2025-10-30 23:37:38 +07:00
7 changed files with 253 additions and 140 deletions

3
.gitignore vendored
View File

@@ -210,3 +210,6 @@ cython_debug/
marimo/_static/ marimo/_static/
marimo/_lsp/ marimo/_lsp/
__marimo__/ __marimo__/
stcs_venv
venv

View File

@@ -1,6 +1,11 @@
Crypto Trading Telegram Bot # Crypto Trading Telegram Bot by [KODORVAN](https://git.svoboda.works/kodorvan)
Этот бот — автоматизированный торговый помощник для работы с криптовалютной биржей Bybit на основе стратегии мартингейла. Он позволяет торговать бессрочными контрактами с управлением рисками, тейк-профитами, стоп-лоссами и кредитным плечом. Автоматизированный торговый помощник для работы с криптовалютной биржей Bybit на основе стратегии мартингейла.<br>
Он позволяет торговать бессрочными контрактами с управлением рисками, тейк-профитами, стоп-лоссами и кредитным плечом.
**Разработано командой [КОДОРВАНЬ](https://git.svoboda.works/kodorvan)**<br>
_Мы окажем полное содействие и поддержку, пишите в телеграм: https://t.me/kodorvan_
## Основные возможности ## Основные возможности
@@ -59,7 +64,12 @@ nvim .env
alembic upgrade head alembic upgrade head
``` ```
5. Запустите бота: 6. Убедитесь в том, что установлен redis и открыт порт 6789
```bash
sudo ufw allow 6789
```
7. Запустите бота:
```bash ```bash
python run.py python run.py

View File

@@ -233,9 +233,6 @@ async def trading_cycle(
) )
current_step += 1 current_step += 1
if max_bets_in_series < current_step:
return "Max bets in series"
await set_margin_mode(tg_id=tg_id, margin_mode=margin_type) await set_margin_mode(tg_id=tg_id, margin_mode=margin_type)
await set_leverage( await set_leverage(
tg_id=tg_id, tg_id=tg_id,

View File

@@ -1,10 +1,9 @@
import logging.config import logging.config
import math import math
# import json import json
import app.telegram.keyboards.inline as kbi import app.telegram.keyboards.inline as kbi
import database.request as rq import database.request as rq
from app.bybit.get_functions.get_instruments_info import get_instruments_info from app.bybit.get_functions.get_instruments_info import get_instruments_info
from app.bybit.get_functions.get_positions import get_active_positions_by_symbol
from app.bybit.logger_bybit.logger_bybit import LOGGING_CONFIG from app.bybit.logger_bybit.logger_bybit import LOGGING_CONFIG
from app.bybit.open_positions import trading_cycle, trading_cycle_profit from app.bybit.open_positions import trading_cycle, trading_cycle_profit
from app.bybit.set_functions.set_tp_sl import set_tp_sl_for_position from app.bybit.set_functions.set_tp_sl import set_tp_sl_for_position
@@ -123,7 +122,6 @@ class TelegramMessageHandler:
current_step = user_deals_data.current_step current_step = user_deals_data.current_step
order_quantity = user_deals_data.order_quantity order_quantity = user_deals_data.order_quantity
pnl_series = user_deals_data.pnl_series pnl_series = user_deals_data.pnl_series
margin_type = user_deals_data.margin_type
take_profit_percent = user_deals_data.take_profit_percent take_profit_percent = user_deals_data.take_profit_percent
stop_loss_percent = user_deals_data.stop_loss_percent stop_loss_percent = user_deals_data.stop_loss_percent
leverage = safe_float(user_deals_data.leverage) leverage = safe_float(user_deals_data.leverage)
@@ -167,24 +165,14 @@ class TelegramMessageHandler:
if commission_place == "Commission_for_tp": if commission_place == "Commission_for_tp":
total_commission = safe_float(total_fee) / qty_formatted total_commission = safe_float(total_fee) / qty_formatted
if margin_type == "ISOLATED_MARGIN": if side == "Buy":
if side == "Buy": take_profit_price = safe_float(exec_price) * (
take_profit_price = safe_float(exec_price) * ( 1 + take_profit_percent / 100) + total_commission
1 + take_profit_percent / 100) + total_commission stop_loss_price = safe_float(exec_price) * (1 - stop_loss_percent / 100)
stop_loss_price = None
else:
take_profit_price = safe_float(exec_price) * (
1 - take_profit_percent / 100) - total_commission
stop_loss_price = None
else: else:
if side == "Buy": take_profit_price = safe_float(exec_price) * (
take_profit_price = safe_float(exec_price) * ( 1 - take_profit_percent / 100) - total_commission
1 + take_profit_percent / 100) + total_commission stop_loss_price = safe_float(exec_price) * (1 + stop_loss_percent / 100)
stop_loss_price = safe_float(exec_price) * (1 - stop_loss_percent / 100)
else:
take_profit_price = safe_float(exec_price) * (
1 - take_profit_percent / 100) - total_commission
stop_loss_price = safe_float(exec_price) * (1 + stop_loss_percent / 100)
ress = await set_tp_sl_for_position(tg_id=tg_id, ress = await set_tp_sl_for_position(tg_id=tg_id,
symbol=symbol, symbol=symbol,
@@ -193,27 +181,11 @@ class TelegramMessageHandler:
position_idx=0) position_idx=0)
if ress or ress == "not modified": if ress or ress == "not modified":
take_profit_truncated = await truncate_float(take_profit_price, 6) take_profit_truncated = await truncate_float(take_profit_price, 6)
stop_loss_truncated = await truncate_float(stop_loss_price, 6)
text += (f"Движение: {side_rus}\n" text += (f"Движение: {side_rus}\n"
f"Тейк-профит: {take_profit_truncated}\n" f"Тейк-профит: {take_profit_truncated}\n"
f"Стоп-лосс: {stop_loss_truncated}\n"
) )
if stop_loss_price is not None:
stop_loss_truncated = await truncate_float(stop_loss_price, 6)
else:
stop_loss_truncated = None
if stop_loss_truncated is not None:
text += f"Стоп-лосс: {stop_loss_truncated}\n"
else:
deals = await get_active_positions_by_symbol(
tg_id=tg_id, symbol=symbol
)
position = next((d for d in deals if d.get("symbol") == symbol), None)
if position:
liq_price = position.get("liqPrice", 0)
text += f"Цена ликвидации: {liq_price}\n"
else: else:
text += (f"Движение: {side_rus}\n" text += (f"Движение: {side_rus}\n"
"Не удалось установить ТП и СЛ\n") "Не удалось установить ТП и СЛ\n")
@@ -258,7 +230,6 @@ class TelegramMessageHandler:
pass pass
else: else:
errors = { errors = {
"Max bets in series": "❗️ Максимальное количество сделок в серии достигнуто",
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения", "Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли", "ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.", "InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
@@ -285,49 +256,105 @@ class TelegramMessageHandler:
) )
elif stop_order_type == "StopLoss" or exec_type == "BustTrade": elif stop_order_type == "StopLoss" or exec_type == "BustTrade":
open_order_text = "\n❗️ Открываю новую сделку с увеличенной ставкой.\n" current_step = user_deals_data.current_step
await self.telegram_bot.send_message( max_bets_in_series = user_deals_data.max_bets_in_series
chat_id=tg_id, text=open_order_text current_step += 1
)
if side == "Buy": if max_bets_in_series < current_step:
r_side = "Sell" text_series = ("\n❗️ Максимальное количество сделок в серии достигнуто.\n"
else: "📈 Начинаю новую серию с базовой ставки\n")
r_side = "Buy" await self.telegram_bot.send_message(
chat_id=tg_id, text=text_series
res = await trading_cycle(
tg_id=tg_id, symbol=symbol, side=r_side
)
if res == "OK":
pass
else:
errors = {
"Max bets in series": "❗️ Максимальное количество сделок в серии достигнуто",
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
"The number of contracts exceeds maximum limit allowed": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки с текущим плечом",
}
error_text = errors.get(
res, "❗️ Не удалось открыть новую сделку"
)
await rq.set_auto_trading(
tg_id=tg_id, symbol=symbol, auto_trading=False
) )
if side == "Buy":
r_side = "Sell"
else:
r_side = "Buy"
await rq.set_last_side_by_symbol(
tg_id=tg_id, symbol=symbol, last_side=r_side)
await rq.set_total_fee_user_auto_trading( await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0 tg_id=tg_id, symbol=symbol, total_fee=0
) )
await rq.set_fee_user_auto_trading( await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0 tg_id=tg_id, symbol=symbol, fee=0
) )
await self.telegram_bot.send_message( await rq.set_pnl_series_by_symbol(tg_id=tg_id, symbol=symbol, pnl_series=0)
chat_id=tg_id,
text=error_text, res = await trading_cycle_profit(
reply_markup=kbi.profile_bybit, tg_id=tg_id, symbol=symbol, side=r_side
) )
if res == "OK":
pass
else:
errors = {
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
"The number of contracts exceeds maximum limit allowed": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки",
}
error_text = errors.get(
res, "❗️ Не удалось открыть новую сделку"
)
await rq.set_auto_trading(
tg_id=tg_id, symbol=symbol, auto_trading=False
)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
await self.telegram_bot.send_message(
chat_id=tg_id,
text=error_text,
reply_markup=kbi.profile_bybit,
)
else:
open_order_text = "\n❗️ Открываю новую сделку с увеличенной ставкой.\n"
await self.telegram_bot.send_message(
chat_id=tg_id, text=open_order_text
)
if side == "Buy":
r_side = "Sell"
else:
r_side = "Buy"
res = await trading_cycle(
tg_id=tg_id, symbol=symbol, side=r_side
)
if res == "OK":
pass
else:
errors = {
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
"The number of contracts exceeds maximum limit allowed": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки с текущим плечом",
}
error_text = errors.get(
res, "❗️ Не удалось открыть новую сделку"
)
await rq.set_auto_trading(
tg_id=tg_id, symbol=symbol, auto_trading=False
)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
await self.telegram_bot.send_message(
chat_id=tg_id,
text=error_text,
reply_markup=kbi.profile_bybit,
)
elif create_type == "CreateByClosing": elif create_type == "CreateByClosing":
await self.telegram_bot.send_message( await self.telegram_bot.send_message(
chat_id=tg_id, chat_id=tg_id,
@@ -346,5 +373,7 @@ class TelegramMessageHandler:
) )
logger.info("Stop trading for symbol: %s, create_type: %s, stop_order_type: %s: %s", logger.info("Stop trading for symbol: %s, create_type: %s, stop_order_type: %s: %s",
symbol, create_type, stop_order_type, tg_id) symbol, create_type, stop_order_type, tg_id)
else:
logger.info("Execution update: %s", json.dumps(message))
except Exception as e: except Exception as e:
logger.error("Error in telegram_message_handler: %s", e, exc_info=True) logger.error("Error in telegram_message_handler: %s", e, exc_info=True)

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
from collections import deque
import logging.config import logging.config
from pybit.unified_trading import WebSocket from pybit.unified_trading import WebSocket
@@ -11,104 +12,178 @@ logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("web_socket") logger = logging.getLogger("web_socket")
class CustomWebSocket(WebSocket):
"""Custom WebSocket wrapper with enhanced error handling."""
def _on_error(self, error):
logger.error(f"WebSocket error: {error}")
return super()._on_error(error)
def _on_close(self):
logger.warning("WebSocket connection closed")
super()._on_close()
class WebSocketBot: class WebSocketBot:
""" """
Class to handle WebSocket connections and messages. Manages multiple Bybit private WebSocket connections for Telegram users.
Uses queue-based message processing to handle thread-safe async calls.
""" """
def __init__(self, telegram_bot): def __init__(self, telegram_bot):
"""Initialize the TradingBot class.""" """
Initialize WebSocketBot.
Args:
telegram_bot: Telegram bot instance for message handling
"""
self.telegram_bot = telegram_bot self.telegram_bot = telegram_bot
self.ws_private = None
self.user_messages = {}
self.user_sockets = {} self.user_sockets = {}
self.user_messages = {}
self.user_keys = {} self.user_keys = {}
self.loop = None self.loop = None
self.message_handler = TelegramMessageHandler(telegram_bot) self.message_handler = TelegramMessageHandler(telegram_bot)
self.order_queues = {} # {tg_id: deque}
self.execution_queues = {} # {tg_id: deque}
self.processing_tasks = {} # {tg_id: task}
async def run_user_check_loop(self): async def run_user_check_loop(self):
"""Run a loop to check for users and connect them to the WebSocket.""" """Main loop that continuously checks users and maintains connections."""
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
logger.info("Starting WebSocket user check loop")
while True: while True:
users = await WebSocketBot.get_users_from_db() try:
for user in users: users = await WebSocketBot.get_users_from_db()
tg_id = user.tg_id for user in users:
api_key, api_secret = await rq.get_user_api(tg_id=tg_id) tg_id = user.tg_id
api_key, api_secret = await rq.get_user_api(tg_id=tg_id)
if not api_key or not api_secret: if not api_key or not api_secret:
continue continue
keys_stored = self.user_keys.get(tg_id) keys_stored = self.user_keys.get(tg_id)
if tg_id in self.user_sockets and keys_stored == (api_key, api_secret): socket_exists = tg_id in self.user_sockets
continue
if tg_id in self.user_sockets: if socket_exists and keys_stored == (api_key, api_secret):
self.user_sockets.clear() continue
self.user_messages.clear()
self.user_keys.clear()
logger.info(
"Closed old websocket for user %s due to key change", tg_id
)
success = await self.try_connect_user(api_key, api_secret, tg_id) if socket_exists:
if success: await self.close_user_socket(tg_id)
self.user_keys[tg_id] = (api_key, api_secret)
self.user_messages.setdefault( success = await self.try_connect_user(api_key, api_secret, tg_id)
tg_id, {"position": None, "order": None, "execution": None} if success:
) self.user_keys[tg_id] = (api_key, api_secret)
logger.info("User %s connected to WebSocket", tg_id) self.user_messages.setdefault(
else: tg_id, {"position": None, "order": None, "execution": None}
await asyncio.sleep(5) )
await self.try_connect_user(api_key, api_secret, tg_id) logger.info("User %s successfully connected", tg_id)
except Exception as e:
logger.error("Error in user check loop: %s", e)
await asyncio.sleep(10) await asyncio.sleep(10)
async def clear_user_sockets(self):
"""Clear the user_sockets and user_messages dictionaries."""
self.user_sockets.clear()
self.user_messages.clear()
self.user_keys.clear()
logger.info("Cleared user_sockets")
async def try_connect_user(self, api_key, api_secret, tg_id): async def try_connect_user(self, api_key, api_secret, tg_id):
"""Try to connect a user to the WebSocket.""" """
Create and setup WebSocket streams with thread-safe queues.
"""
try: try:
self.ws_private = WebSocket( ws = CustomWebSocket(
demo=True, demo=True,
testnet=False, testnet=False,
channel_type="private", channel_type="private",
api_key=api_key, api_key=api_key,
api_secret=api_secret, api_secret=api_secret
) )
self.user_sockets[tg_id] = self.ws_private self.user_sockets[tg_id] = ws
# Connect to the WebSocket private channel
# Handle order updates self.order_queues[tg_id] = deque()
self.ws_private.order_stream( self.execution_queues[tg_id] = deque()
lambda msg: self.loop.call_soon_threadsafe(
asyncio.create_task, self.handle_order_update(msg, tg_id) self.processing_tasks[tg_id] = asyncio.create_task(
) self._process_order_queue(tg_id)
) )
# Handle execution updates self.processing_tasks[tg_id + 1] = asyncio.create_task(
self.ws_private.execution_stream( self._process_execution_queue(tg_id)
lambda msg: self.loop.call_soon_threadsafe(
asyncio.create_task, self.handle_execution_update(msg, tg_id)
)
) )
def order_callback(msg):
self.order_queues[tg_id].append(msg)
def execution_callback(msg):
self.execution_queues[tg_id].append(msg)
ws.order_stream(order_callback)
ws.execution_stream(execution_callback)
logger.info("WebSocket streams configured for user %s", tg_id)
return True return True
except Exception as e: except Exception as e:
logger.error("Error connecting user %s: %s", tg_id, e) logger.error("Error connecting user %s: %s", tg_id, e)
self.user_sockets.pop(tg_id, None)
return False return False
async def _process_order_queue(self, tg_id):
"""Continuously process order queue for user."""
while tg_id in self.user_sockets:
try:
if self.order_queues[tg_id]:
msg = self.order_queues[tg_id].popleft()
await self.handle_order_update(msg, tg_id)
except Exception as e:
logger.error("Error processing order queue %s: %s", tg_id, e)
await asyncio.sleep(0.01)
async def _process_execution_queue(self, tg_id):
"""Continuously process execution queue for user."""
while tg_id in self.user_sockets:
try:
if self.execution_queues[tg_id]:
msg = self.execution_queues[tg_id].popleft()
await self.handle_execution_update(msg, tg_id)
except Exception as e:
logger.error("Error processing execution queue %s: %s", tg_id, e)
await asyncio.sleep(0.01)
async def close_user_socket(self, tg_id):
"""Gracefully close user connection."""
if tg_id in self.user_sockets:
self.user_sockets.pop(tg_id, None)
for key in (tg_id, tg_id + 1):
task = self.processing_tasks.pop(key, None)
if task and not task.done():
task.cancel()
self.order_queues.pop(tg_id, None)
self.execution_queues.pop(tg_id, None)
self.user_messages.pop(tg_id, None)
self.user_keys.pop(tg_id, None)
logger.info("Cleaned up user %s", tg_id)
async def handle_order_update(self, message, tg_id): async def handle_order_update(self, message, tg_id):
"""Handle order updates.""" """Process order updates."""
await self.message_handler.format_order_update(message, tg_id) try:
await self.message_handler.format_order_update(message, tg_id)
except Exception as e:
logger.error("Error handling order update for %s: %s", tg_id, e)
async def handle_execution_update(self, message, tg_id): async def handle_execution_update(self, message, tg_id):
"""Handle execution updates without duplicate processing.""" """Process execution updates."""
await self.message_handler.format_execution_update(message, tg_id) try:
await self.message_handler.format_execution_update(message, tg_id)
except Exception as e:
logger.error("Error handling execution update for %s: %s", tg_id, e)
@staticmethod @staticmethod
async def get_users_from_db(): async def get_users_from_db():
"""Get all users from the database.""" """Fetch all users from database."""
return await rq.get_users() try:
return await rq.get_users()
except Exception as e:
logger.error("Error getting users from DB: %s", e)
return []

View File

@@ -8,7 +8,7 @@ After=syslog.target network-online.target
ExecStart=sudo -u www-data /usr/bin/python3 /var/www/stcs/BybitBot_API.py ExecStart=sudo -u www-data /usr/bin/python3 /var/www/stcs/BybitBot_API.py
PIDFile=/var/run/python/stcs.pid PIDFile=/var/run/python/stcs.pid
RemainAfterExit=no RemainAfterExit=no
RuntimeMaxSec=3600s RuntimeMaxSec=604800s
Restart=always Restart=always
RestartSec=5s RestartSec=5s

1
run.py
View File

@@ -31,7 +31,6 @@ async def main():
dp = Dispatcher(storage=storage) dp = Dispatcher(storage=storage)
dp.include_router(router) dp.include_router(router)
web_socket = WebSocketBot(telegram_bot=bot) web_socket = WebSocketBot(telegram_bot=bot)
await web_socket.clear_user_sockets()
ws_task = asyncio.create_task(web_socket.run_user_check_loop()) ws_task = asyncio.create_task(web_socket.run_user_check_loop())
tg_task = asyncio.create_task(dp.start_polling(bot)) tg_task = asyncio.create_task(dp.start_polling(bot))