2
0
forked from kodorvan/stcs

16 Commits

Author SHA1 Message Date
algizn97
cb6499e347 Fixed the work of the websocket 2025-12-18 18:46:21 +05:00
algizn97
867802b2a7 Fixed the websocket 2025-12-15 21:57:13 +05:00
algizn97
1f123c77e7 Fixed the websocket 2025-11-19 16:50:18 +05:00
algizn97
56729c287b Step comparison postponed 2025-11-17 20:40:24 +05:00
algizn97
f268d3290b Added stop loss setting in isolated mode and start trading with the base rate when the maximum number of steps is reached. 2025-11-17 20:39:06 +05:00
algizn97
856169cba9 Added API key verification for permissions 2025-11-14 13:56:08 +05:00
algizn97
0bc74ed188 Fixed database initialization 2025-11-12 22:18:38 +05:00
algizn97
d7b558664b Creating a new database 2025-11-10 09:44:29 +05:00
algizn97
34922e6998 Updated database, added new migrations 2025-11-09 15:18:18 +05:00
algizn97
7c85c03d10 The logic of the trading cycle has been changed, fixed errors when setting TP and SL 2025-11-09 13:10:13 +05:00
algizn97
39bbe8d997 Added user verification 2025-11-09 13:07:11 +05:00
algizn97
3ba32660ea The log is hidden 2025-11-02 17:33:11 +05:00
algizn97
e0167ea406 Fixed the function of setting TP and SL 2025-11-02 17:14:22 +05:00
algizn97
dbf0a30d54 The message receipt has been sorted 2025-11-02 17:13:36 +05:00
algizn97
78f21e6718 Fixed message output for certain conditions 2025-10-30 13:47:29 +05:00
algizn97
6416bd6dc9 Added output in case of installation error TP and SL 2025-10-30 13:21:22 +05:00
21 changed files with 544 additions and 527 deletions

View File

@@ -84,7 +84,7 @@ path_separator = os
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = sqlite+aiosqlite:///./database/db/stcs.db
sqlalchemy.url = sqlite+aiosqlite:///./database/stcs.db
[post_write_hooks]

View File

@@ -1,32 +0,0 @@
"""Added column current_series
Revision ID: 0ee52ab23e66
Revises: e5d612e44563
Create Date: 2025-10-26 11:48:48.055031
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '0ee52ab23e66'
down_revision: Union[str, Sequence[str], None] = 'e5d612e44563'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user_deals', sa.Column('current_series', sa.Integer(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user_deals', 'current_series')
# ### end Alembic commands ###

View File

@@ -1,44 +0,0 @@
"""Added TP_SL and PNL
Revision ID: 3fca121b7554
Revises: adf3d2991896
Create Date: 2025-10-29 11:07:45.350771
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = '3fca121b7554'
down_revision: Union[str, Sequence[str], None] = 'adf3d2991896'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
conn = op.get_bind()
inspector = inspect(conn)
columns = [col['name'] for col in inspector.get_columns('user_deals')]
if 'pnl_series' not in columns:
op.add_column('user_deals', sa.Column('pnl_series', sa.Float(), nullable=True))
if 'take_profit' not in columns:
op.add_column('user_deals', sa.Column('take_profit', sa.Boolean(), nullable=False, server_default=sa.false()))
if 'stop_loss' not in columns:
op.add_column('user_deals', sa.Column('stop_loss', sa.Boolean(), nullable=False, server_default=sa.false()))
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user_deals', 'stop_loss')
op.drop_column('user_deals', 'take_profit')
op.drop_column('user_deals', 'pnl_series')
# ### end Alembic commands ###

View File

@@ -1,49 +0,0 @@
"""Fixed TP_SL type
Revision ID: 8329c0994b26
Revises: 3fca121b7554
Create Date: 2025-10-29 13:07:52.161139
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '8329c0994b26'
down_revision: Union[str, Sequence[str], None] = '3fca121b7554'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade():
with op.batch_alter_table('user_deals', recreate='always') as batch_op:
# Добавляем новую колонку с нужным типом
batch_op.add_column(sa.Column('take_profit_new', sa.Float(), nullable=False, server_default='0'))
# После закрытия batch создается и переименовывается таблица.
# Теперь мы можем обновить данные.
op.execute(
"UPDATE user_deals SET take_profit_new = CAST(take_profit AS FLOAT)"
)
with op.batch_alter_table('user_deals', recreate='always') as batch_op:
# Удаляем старую колонку
batch_op.drop_column('take_profit')
# Меняем имя новой колонки на старое
batch_op.alter_column('take_profit_new', new_column_name='take_profit')
def downgrade():
# Аналогично, но в обратном порядке и типе
with op.batch_alter_table('user_deals', recreate='always') as batch_op:
batch_op.add_column(sa.Column('take_profit_old', sa.Boolean(), nullable=False, server_default='0'))
op.execute(
"UPDATE user_deals SET take_profit_old = CAST(take_profit AS BOOLEAN)"
)
with op.batch_alter_table('user_deals', recreate='always') as batch_op:
batch_op.drop_column('take_profit')
batch_op.alter_column('take_profit_old', new_column_name='take_profit')

View File

@@ -1,45 +0,0 @@
"""Added column commission_place
Revision ID: adf3d2991896
Revises: 0ee52ab23e66
Create Date: 2025-10-26 13:37:33.662318
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy import inspect
# revision identifiers, used by Alembic.
revision: str = 'adf3d2991896'
down_revision: Union[str, Sequence[str], None] = '0ee52ab23e66'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
bind = op.get_bind()
inspector = inspect(bind)
columns_user_deals = [col['name'] for col in inspector.get_columns('user_deals')]
if 'commission_fee' not in columns_user_deals:
op.add_column('user_deals', sa.Column('commission_fee', sa.String(), server_default='', nullable=True))
if 'commission_place' not in columns_user_deals:
op.add_column('user_deals', sa.Column('commission_place', sa.String(), server_default='', nullable=True))
columns_user_risk_mgmt = [col['name'] for col in inspector.get_columns('user_risk_management')]
if 'commission_place' not in columns_user_risk_mgmt:
op.add_column('user_risk_management',
sa.Column('commission_place', sa.String(), server_default='', nullable=False))
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user_risk_management', 'commission_place')
op.drop_column('user_deals', 'commission_place')
op.drop_column('user_deals', 'commission_fee')
# ### end Alembic commands ###

View File

@@ -1,34 +0,0 @@
"""Added column side for additional_setiings
Revision ID: e5d612e44563
Revises: fbf4e3658310
Create Date: 2025-10-25 18:25:52.746250
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'e5d612e44563'
down_revision: Union[str, Sequence[str], None] = 'fbf4e3658310'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user_additional_settings',
sa.Column('side', sa.String(), nullable=False, server_default='')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user_additional_settings', 'side')
# ### end Alembic commands ###

View File

@@ -1,8 +1,8 @@
"""Added side_mode column
"""initial
Revision ID: fbf4e3658310
Revision ID: f6e7eb3f25c0
Revises:
Create Date: 2025-10-22 13:08:02.317419
Create Date: 2025-11-12 22:53:02.189445
"""
from typing import Sequence, Union
@@ -12,7 +12,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'fbf4e3658310'
revision: str = 'f6e7eb3f25c0'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
@@ -21,12 +21,12 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user_deals', sa.Column('side_mode', sa.String(), nullable=True))
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user_deals', 'side_mode')
pass
# ### end Alembic commands ###

View File

@@ -38,7 +38,7 @@ async def get_active_positions(tg_id: int) -> list | None:
return None
async def get_active_positions_by_symbol(tg_id: int, symbol: str) -> dict | None:
async def get_active_positions_by_symbol(tg_id: int, symbol: str):
"""
Get active positions for a user by symbol
"""
@@ -62,6 +62,10 @@ async def get_active_positions_by_symbol(tg_id: int, symbol: str) -> dict | None
)
return None
except Exception as e:
errors = str(e)
if errors.startswith("Permission denied, please check your API key permissions"):
return "Invalid API key permissions"
else:
logger.error("Error getting active positions for user %s: %s", tg_id, e)
return None

View File

@@ -54,12 +54,6 @@ async def start_trading_cycle(
tg_id=tg_id,
symbol=symbol,
mode=0)
await set_margin_mode(tg_id=tg_id, margin_mode=margin_type)
await set_leverage(
tg_id=tg_id,
symbol=symbol,
leverage=leverage,
)
await rq.set_user_deal(
tg_id=tg_id,
@@ -81,6 +75,12 @@ async def start_trading_cycle(
commission_place=commission_place,
pnl_series=0
)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
res = await open_positions(
tg_id=tg_id,
@@ -109,6 +109,7 @@ async def start_trading_cycle(
"The number of contracts exceeds maximum limit allowed",
"The number of contracts exceeds minimum limit allowed",
"Order placement failed as your position may exceed the max",
"Permission denied, please check your API key permissions"
}
else None
)
@@ -232,9 +233,6 @@ async def trading_cycle(
)
current_step += 1
if max_bets_in_series < current_step:
return "Max bets in series"
await set_margin_mode(tg_id=tg_id, margin_mode=margin_type)
await set_leverage(
tg_id=tg_id,
@@ -316,7 +314,12 @@ async def open_positions(
try:
client = await get_bybit_client(tg_id=tg_id)
get_ticker = await get_tickers(tg_id, symbol=symbol)
price_symbol = safe_float(get_ticker.get("lastPrice")) or 0
if get_ticker is None:
price_symbol = 0
else:
price_symbol = safe_float(get_ticker.get("lastPrice"))
instruments_info = await get_instruments_info(tg_id=tg_id, symbol=symbol)
qty_step_str = instruments_info.get("lotSizeFilter").get("qtyStep")
qty_step = safe_float(qty_step_str)
@@ -344,7 +347,6 @@ async def open_positions(
"triggerBy": "LastPrice",
"timeInForce": "GTC",
"positionIdx": 0,
"tpslMode": "Full",
}
response = client.place_order(**order_params)
@@ -367,6 +369,7 @@ async def open_positions(
"The number of contracts exceeds maximum limit allowed": "The number of contracts exceeds maximum limit allowed",
"The number of contracts exceeds minimum limit allowed": "The number of contracts exceeds minimum limit allowed",
"Order placement failed as your position may exceed the max": "Order placement failed as your position may exceed the max",
"Permission denied, please check your API key permissions": "Permission denied, please check your API key permissions"
}
for key, msg in known_errors.items():
if key in error_text:

View File

@@ -37,7 +37,7 @@ async def user_profile_bybit(tg_id: int, message: Message, state: FSMContext) ->
)
else:
await message.answer(
text="Ошибка при подключении, повторите попытку",
text="Ошибка при подключении к платформе. Проверьте корректность и разрешения API ключа и добавьте повторно.",
reply_markup=kbi.connect_the_platform,
)
logger.error("Error processing user profile for user %s", tg_id)

View File

@@ -13,7 +13,7 @@ async def set_tp_sl_for_position(
take_profit_price: float,
stop_loss_price: float,
position_idx: int,
) -> bool:
) -> bool | str:
"""
Set take profit and stop loss for a symbol.
:param tg_id: Telegram user ID
@@ -21,15 +21,17 @@ async def set_tp_sl_for_position(
:param take_profit_price: Take profit price
:param stop_loss_price: Stop loss price
:param position_idx: Position index
:return: bool
:return: bool or str
"""
try:
client = await get_bybit_client(tg_id)
take_profit = round(take_profit_price, 6) if take_profit_price is not None else None
stop_loss = round(stop_loss_price, 6) if stop_loss_price is not None else None
resp = client.set_trading_stop(
category="linear",
symbol=symbol,
takeProfit=str(round(take_profit_price, 5)),
stopLoss=str(round(stop_loss_price, 5)),
takeProfit=str(take_profit) if take_profit is not None else None,
stopLoss=str(stop_loss) if stop_loss is not None else None,
positionIdx=position_idx,
tpslMode="Full",
)
@@ -38,8 +40,18 @@ async def set_tp_sl_for_position(
logger.info("TP/SL for %s has been set", symbol)
return True
else:
logger.error("Error setting TP/SL for %s: %s", symbol, resp.get("retMsg"))
error_msg = resp.get("retMsg")
if "not modified" in error_msg.lower():
logger.info("TP/SL for %s not modified: %s", symbol, error_msg)
return "not modified"
else:
logger.error("Error setting TP/SL for %s: %s", symbol, error_msg)
return False
except Exception as e:
logger.error("Error setting TP/SL for %s: %s", symbol, e)
error_msg = str(e)
if "not modified" in error_msg.lower():
logger.info("TP/SL for %s not modified: %s", symbol, error_msg)
return "not modified"
else:
logger.error("Error set TP/SL for %s: %s", symbol, e)
return False

View File

@@ -1,6 +1,6 @@
import logging.config
import math
import json
import app.telegram.keyboards.inline as kbi
import database.request as rq
from app.bybit.get_functions.get_instruments_info import get_instruments_info
@@ -15,10 +15,13 @@ logger = logging.getLogger("telegram_message_handler")
class TelegramMessageHandler:
def __init__(self, telegram_bot):
"""Initialize the TelegramMessageHandler class."""
self.telegram_bot = telegram_bot
async def format_order_update(self, message, tg_id):
"""Handle order updates."""
try:
# logger.info("Order update: %s", json.dumps(message))
user_additional_data = await rq.get_user_additional_settings(tg_id=tg_id)
trigger_price = safe_float(user_additional_data.trigger_price)
if trigger_price > 0:
@@ -55,8 +58,12 @@ class TelegramMessageHandler:
logger.error("Error in format_order_update: %s", e)
async def format_execution_update(self, message, tg_id):
"""Handle execution updates without duplicate processing."""
try:
# logger.info("Execution update: %s", json.dumps(message))
execution = message.get("data", [{}])[0]
exec_type = format_value(execution.get("execType"))
if exec_type == "Trade" or exec_type == "BustTrade":
closed_size = format_value(execution.get("closedSize"))
symbol = format_value(execution.get("symbol"))
exec_price = format_value(execution.get("execPrice"))
@@ -64,6 +71,9 @@ class TelegramMessageHandler:
exec_fees = format_value(execution.get("execFee"))
fee_rate = format_value(execution.get("feeRate"))
side = format_value(execution.get("side"))
exec_pnl = format_value(execution.get("execPnl"))
stop_order_type = format_value(execution.get("stopOrderType"))
create_type = format_value(execution.get("createType"))
user_auto_trading = await rq.get_user_auto_trading(
tg_id=tg_id, symbol=symbol
@@ -71,6 +81,7 @@ class TelegramMessageHandler:
auto_trading = (
user_auto_trading.auto_trading if user_auto_trading else False
)
if auto_trading:
side_rus = (
"Покупка"
@@ -89,14 +100,13 @@ class TelegramMessageHandler:
tg_id=tg_id, symbol=symbol, fee=safe_float(exec_fee)
)
get_total_fee = 0
if user_auto_trading is not None:
get_total_fee = user_auto_trading.total_fee
total_fee = safe_float(exec_fee) + safe_float(get_total_fee)
exec_pnl = format_value(execution.get("execPnl"))
total_fee = safe_float(exec_fee) + safe_float(get_total_fee)
ex_pnl = safe_float(exec_pnl)
pnl = safe_float(exec_pnl)
header = (
"Сделка закрыта:" if safe_float(closed_size) > 0 else "Сделка открыта:"
@@ -105,18 +115,18 @@ class TelegramMessageHandler:
user_deals_data = await rq.get_user_deal_by_symbol(
tg_id=tg_id, symbol=symbol
)
commission_fee = user_deals_data.commission_fee or "Yes_commission_fee"
commission_place = user_deals_data.commission_place or "Commission_for_qty"
commission_fee = user_deals_data.commission_fee
commission_place = user_deals_data.commission_place
current_series = user_deals_data.current_series
current_step = user_deals_data.current_step
order_quantity = user_deals_data.order_quantity
pnl_series = user_deals_data.pnl_series
margin_type = user_deals_data.margin_type
take_profit_percent = user_deals_data.take_profit_percent
stop_loss_percent = user_deals_data.stop_loss_percent
leverage = safe_float(user_deals_data.leverage)
fee = safe_float(user_auto_trading.fee)
total_pnl = safe_float(exec_pnl) - safe_float(exec_fee) - fee
leverage = safe_float(user_deals_data.leverage)
if commission_fee == "Yes_commission_fee":
if commission_place == "Commission_for_qty":
@@ -128,7 +138,7 @@ class TelegramMessageHandler:
else:
total_quantity = safe_float(order_quantity)
if user_deals_data is not None and auto_trading and safe_float(closed_size) == 0:
if user_deals_data is not None and safe_float(closed_size) == 0:
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=total_fee
)
@@ -155,16 +165,6 @@ class TelegramMessageHandler:
if commission_place == "Commission_for_tp":
total_commission = safe_float(total_fee) / qty_formatted
if margin_type == "ISOLATED_MARGIN":
if side == "Buy":
take_profit_price = safe_float(exec_price) * (
1 + take_profit_percent / 100) + total_commission
stop_loss_price = None
else:
take_profit_price = safe_float(exec_price) * (
1 - take_profit_percent / 100) - total_commission
stop_loss_price = None
else:
if side == "Buy":
take_profit_price = safe_float(exec_price) * (
1 + take_profit_percent / 100) + total_commission
@@ -174,23 +174,23 @@ class TelegramMessageHandler:
1 - take_profit_percent / 100) - total_commission
stop_loss_price = safe_float(exec_price) * (1 + stop_loss_percent / 100)
take_profit_price = max(take_profit_price, 0)
stop_loss_price = max(stop_loss_price, 0)
await set_tp_sl_for_position(tg_id=tg_id,
ress = await set_tp_sl_for_position(tg_id=tg_id,
symbol=symbol,
take_profit_price=take_profit_price,
stop_loss_price=stop_loss_price,
position_idx=0)
if ress or ress == "not modified":
take_profit_truncated = await truncate_float(take_profit_price, 6)
stop_loss_truncated = await truncate_float(stop_loss_price, 6)
text += (f"Движение: {side_rus}\n"
f"Тейк-профит: {take_profit_truncated}\n"
f"Стоп-лосс: {stop_loss_truncated}\n"
)
else:
text += (f"Движение: {side_rus}\n"
"Не удалось установить ТП и СЛ\n")
elif safe_float(closed_size) > 0 and auto_trading:
new_pnl = safe_float(pnl_series) + total_pnl
await rq.set_pnl_series_by_symbol(
tg_id=tg_id, symbol=symbol, pnl_series=new_pnl)
@@ -202,14 +202,7 @@ class TelegramMessageHandler:
chat_id=tg_id, text=text, reply_markup=kbi.profile_bybit
)
user_symbols = user_auto_trading.symbol if user_auto_trading else None
if (
auto_trading
and safe_float(closed_size) > 0
and user_symbols is not None
):
if safe_float(pnl) > 0:
if stop_order_type == "TakeProfit":
profit_text = "📈 Начинаю новую серию с базовой ставки\n"
await self.telegram_bot.send_message(
chat_id=tg_id, text=profit_text, reply_markup=kbi.profile_bybit
@@ -237,7 +230,64 @@ class TelegramMessageHandler:
pass
else:
errors = {
"Max bets in series": "❗️ Максимальное количество сделок в серии достигнуто",
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
"The number of contracts exceeds maximum limit allowed": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки",
}
error_text = errors.get(
res, "❗️ Не удалось открыть новую сделку"
)
await rq.set_auto_trading(
tg_id=tg_id, symbol=symbol, auto_trading=False
)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
await self.telegram_bot.send_message(
chat_id=tg_id,
text=error_text,
reply_markup=kbi.profile_bybit,
)
elif stop_order_type == "StopLoss" or exec_type == "BustTrade":
current_step = user_deals_data.current_step
max_bets_in_series = user_deals_data.max_bets_in_series
current_step += 1
if max_bets_in_series < current_step:
text_series = ("\n❗️ Максимальное количество сделок в серии достигнуто.\n"
"📈 Начинаю новую серию с базовой ставки\n")
await self.telegram_bot.send_message(
chat_id=tg_id, text=text_series
)
if side == "Buy":
r_side = "Sell"
else:
r_side = "Buy"
await rq.set_last_side_by_symbol(
tg_id=tg_id, symbol=symbol, last_side=r_side)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
await rq.set_pnl_series_by_symbol(tg_id=tg_id, symbol=symbol, pnl_series=0)
res = await trading_cycle_profit(
tg_id=tg_id, symbol=symbol, side=r_side
)
if res == "OK":
pass
else:
errors = {
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
@@ -281,12 +331,11 @@ class TelegramMessageHandler:
pass
else:
errors = {
"Max bets in series": "❗️ Максимальное количество сделок в серии достигнуто",
"Risk is too high for this trade": "❗️ Риск сделки слишком высок для продолжения",
"ab not enough for new order": "❗️ Недостаточно средств для продолжения торговли",
"InvalidRequestError": "❗️ Недостаточно средств для размещения нового ордера с заданным количеством и плечом.",
"The number of contracts exceeds maximum limit allowed": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки",
"Order placement failed as your position may exceed the max": "❗️ Превышен максимальный лимит ставки с текущим плечом",
}
error_text = errors.get(
res, "❗️ Не удалось открыть новую сделку"
@@ -306,6 +355,25 @@ class TelegramMessageHandler:
text=error_text,
reply_markup=kbi.profile_bybit,
)
elif create_type == "CreateByClosing":
await self.telegram_bot.send_message(
chat_id=tg_id,
text=f"❗️ Торговля для {symbol} остановлена",
reply_markup=kbi.profile_bybit,
)
await rq.set_auto_trading(
tg_id=tg_id, symbol=symbol, auto_trading=False
)
await rq.set_total_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, total_fee=0
)
await rq.set_fee_user_auto_trading(
tg_id=tg_id, symbol=symbol, fee=0
)
logger.info("Stop trading for symbol: %s, create_type: %s, stop_order_type: %s: %s",
symbol, create_type, stop_order_type, tg_id)
else:
logger.info("Execution update: %s", json.dumps(message))
except Exception as e:
logger.error("Error in telegram_message_handler: %s", e, exc_info=True)

View File

@@ -1,4 +1,5 @@
import asyncio
from collections import deque
import logging.config
from pybit.unified_trading import WebSocket
@@ -11,26 +12,49 @@ logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("web_socket")
class CustomWebSocket(WebSocket):
"""Custom WebSocket wrapper with enhanced error handling."""
def _on_error(self, error):
logger.error(f"WebSocket error: {error}")
return super()._on_error(error)
def _on_close(self):
logger.warning("WebSocket connection closed")
super()._on_close()
class WebSocketBot:
"""
Class to handle WebSocket connections and messages.
Manages multiple Bybit private WebSocket connections for Telegram users.
Uses queue-based message processing to handle thread-safe async calls.
"""
def __init__(self, telegram_bot):
"""Initialize the TradingBot class."""
"""
Initialize WebSocketBot.
Args:
telegram_bot: Telegram bot instance for message handling
"""
self.telegram_bot = telegram_bot
self.ws_private = None
self.user_messages = {}
self.user_sockets = {}
self.user_messages = {}
self.user_keys = {}
self.loop = None
self.message_handler = TelegramMessageHandler(telegram_bot)
self.last_execution_seq = {}
self.order_queues = {} # {tg_id: deque}
self.execution_queues = {} # {tg_id: deque}
self.processing_tasks = {} # {tg_id: task}
async def run_user_check_loop(self):
"""Run a loop to check for users and connect them to the WebSocket."""
"""Main loop that continuously checks users and maintains connections."""
self.loop = asyncio.get_running_loop()
logger.info("Starting WebSocket user check loop")
while True:
try:
users = await WebSocketBot.get_users_from_db()
for user in users:
tg_id = user.tg_id
@@ -40,16 +64,13 @@ class WebSocketBot:
continue
keys_stored = self.user_keys.get(tg_id)
if tg_id in self.user_sockets and keys_stored == (api_key, api_secret):
socket_exists = tg_id in self.user_sockets
if socket_exists and keys_stored == (api_key, api_secret):
continue
if tg_id in self.user_sockets:
self.user_sockets.clear()
self.user_messages.clear()
self.user_keys.clear()
logger.info(
"Closed old websocket for user %s due to key change", tg_id
)
if socket_exists:
await self.close_user_socket(tg_id)
success = await self.try_connect_user(api_key, api_secret, tg_id)
if success:
@@ -57,69 +78,112 @@ class WebSocketBot:
self.user_messages.setdefault(
tg_id, {"position": None, "order": None, "execution": None}
)
logger.info("User %s connected to WebSocket", tg_id)
else:
await asyncio.sleep(5)
await self.try_connect_user(api_key, api_secret, tg_id)
logger.info("User %s successfully connected", tg_id)
except Exception as e:
logger.error("Error in user check loop: %s", e)
await asyncio.sleep(10)
async def clear_user_sockets(self):
"""Clear the user_sockets and user_messages dictionaries."""
self.user_sockets.clear()
self.user_messages.clear()
self.user_keys.clear()
logger.info("Cleared user_sockets")
async def try_connect_user(self, api_key, api_secret, tg_id):
"""Try to connect a user to the WebSocket."""
"""
Create and setup WebSocket streams with thread-safe queues.
"""
try:
self.ws_private = WebSocket(
ws = CustomWebSocket(
demo=True,
testnet=False,
channel_type="private",
api_key=api_key,
api_secret=api_secret,
api_secret=api_secret
)
self.user_sockets[tg_id] = self.ws_private
# Connect to the WebSocket private channel
# Handle order updates
self.ws_private.order_stream(
lambda msg: self.loop.call_soon_threadsafe(
asyncio.create_task, self.handle_order_update(msg, tg_id)
)
)
# Handle execution updates
self.ws_private.execution_stream(
lambda msg: self.loop.call_soon_threadsafe(
asyncio.create_task, self.handle_execution_update(msg, tg_id)
self.user_sockets[tg_id] = ws
self.order_queues[tg_id] = deque()
self.execution_queues[tg_id] = deque()
self.processing_tasks[tg_id] = asyncio.create_task(
self._process_order_queue(tg_id)
)
self.processing_tasks[tg_id + 1] = asyncio.create_task(
self._process_execution_queue(tg_id)
)
def order_callback(msg):
self.order_queues[tg_id].append(msg)
def execution_callback(msg):
self.execution_queues[tg_id].append(msg)
ws.order_stream(order_callback)
ws.execution_stream(execution_callback)
logger.info("WebSocket streams configured for user %s", tg_id)
return True
except Exception as e:
logger.error("Error connecting user %s: %s", tg_id, e)
self.user_sockets.pop(tg_id, None)
return False
async def _process_order_queue(self, tg_id):
"""Continuously process order queue for user."""
while tg_id in self.user_sockets:
try:
if self.order_queues[tg_id]:
msg = self.order_queues[tg_id].popleft()
await self.handle_order_update(msg, tg_id)
except Exception as e:
logger.error("Error processing order queue %s: %s", tg_id, e)
await asyncio.sleep(0.01)
async def _process_execution_queue(self, tg_id):
"""Continuously process execution queue for user."""
while tg_id in self.user_sockets:
try:
if self.execution_queues[tg_id]:
msg = self.execution_queues[tg_id].popleft()
await self.handle_execution_update(msg, tg_id)
except Exception as e:
logger.error("Error processing execution queue %s: %s", tg_id, e)
await asyncio.sleep(0.01)
async def close_user_socket(self, tg_id):
"""Gracefully close user connection."""
if tg_id in self.user_sockets:
self.user_sockets.pop(tg_id, None)
for key in (tg_id, tg_id + 1):
task = self.processing_tasks.pop(key, None)
if task and not task.done():
task.cancel()
self.order_queues.pop(tg_id, None)
self.execution_queues.pop(tg_id, None)
self.user_messages.pop(tg_id, None)
self.user_keys.pop(tg_id, None)
logger.info("Cleaned up user %s", tg_id)
async def handle_order_update(self, message, tg_id):
"""Handle order updates."""
"""Process order updates."""
try:
await self.message_handler.format_order_update(message, tg_id)
except Exception as e:
logger.error("Error handling order update for %s: %s", tg_id, e)
async def handle_execution_update(self, message, tg_id):
"""Handle execution updates."""
data_items = message.get('data', [])
if not data_items:
return
for exec_data in data_items:
seq = exec_data.get('seq')
if tg_id not in self.last_execution_seq:
self.last_execution_seq[tg_id] = -1
if seq <= self.last_execution_seq[tg_id]:
continue
self.last_execution_seq[tg_id] = seq
"""Process execution updates."""
try:
await self.message_handler.format_execution_update(message, tg_id)
except Exception as e:
logger.error("Error handling execution update for %s: %s", tg_id, e)
@staticmethod
async def get_users_from_db():
"""Get all users from the database."""
"""Fetch all users from database."""
try:
return await rq.get_users()
except Exception as e:
logger.error("Error getting users from DB: %s", e)
return []

View File

@@ -85,6 +85,18 @@ async def cmd_to_main(message: Message, state: FSMContext) -> None:
None: Exceptions are caught and logged internally.
"""
try:
await state.clear()
user = await rq.get_user(tg_id=message.from_user.id)
if user:
await user_profile_tg(tg_id=message.from_user.id, message=message)
else:
await rq.create_user(
tg_id=message.from_user.id, username=message.from_user.username
)
await rq.set_user_symbol(tg_id=message.from_user.id, symbol="BTCUSDT")
await rq.create_user_additional_settings(tg_id=message.from_user.id)
await rq.create_user_risk_management(tg_id=message.from_user.id)
await rq.create_user_conditional_settings(tg_id=message.from_user.id)
await user_profile_tg(tg_id=message.from_user.id, message=message)
logger.debug(
"Command to_profile_tg processed successfully for user: %s",
@@ -117,9 +129,21 @@ async def profile_bybit(message: Message, state: FSMContext) -> None:
"""
try:
await state.clear()
user = await rq.get_user(tg_id=message.from_user.id)
if user:
await user_profile_bybit(
tg_id=message.from_user.id, message=message, state=state
)
else:
await rq.create_user(
tg_id=message.from_user.id, username=message.from_user.username
)
await rq.set_user_symbol(tg_id=message.from_user.id, symbol="BTCUSDT")
await rq.create_user_additional_settings(tg_id=message.from_user.id)
await rq.create_user_risk_management(tg_id=message.from_user.id)
await rq.create_user_conditional_settings(tg_id=message.from_user.id)
await user_profile_bybit(
tg_id=message.from_user.id, message=message, state=state)
logger.debug(
"Command to_profile_bybit processed successfully for user: %s",
message.from_user.id,
@@ -150,6 +174,9 @@ async def profile_bybit_callback(
"""
try:
await state.clear()
user = await rq.get_user(tg_id=callback_query.from_user.id)
if user:
await user_profile_bybit(
tg_id=callback_query.from_user.id,
message=callback_query.message,
@@ -159,6 +186,19 @@ async def profile_bybit_callback(
"Callback profile_bybit processed successfully for user: %s",
callback_query.from_user.id,
)
else:
await rq.create_user(
tg_id=callback_query.from_user.id, username=callback_query.from_user.username
)
await rq.set_user_symbol(tg_id=callback_query.from_user.id, symbol="BTCUSDT")
await rq.create_user_additional_settings(tg_id=callback_query.from_user.id)
await rq.create_user_risk_management(tg_id=callback_query.from_user.id)
await rq.create_user_conditional_settings(tg_id=callback_query.from_user.id)
await user_profile_bybit(
tg_id=callback_query.from_user.id,
message=callback_query.message,
state=state,
)
await callback_query.answer()
except Exception as e:
logger.error(

View File

@@ -299,6 +299,12 @@ async def settings_for_margin_type(
deals = await get_active_positions_by_symbol(
tg_id=callback_query.from_user.id, symbol=symbol
)
if deals == "Invalid API key permissions":
await callback_query.answer(
text="API ключ не имеет достаточных прав для смены маржи",
)
return
position = next((d for d in deals if d.get("symbol") == symbol), None)
if position:
@@ -676,6 +682,15 @@ async def set_leverage_handler(message: Message, state: FSMContext) -> None:
await state.clear()
except Exception as e:
errors_text = str(e)
known_errors = {
"Permission denied, please check your API key permissions": "API ключ не имеет достаточных прав для установки кредитного плеча"
}
for key, msg in known_errors.items():
if key in errors_text:
await message.answer(msg, reply_markup=kbi.back_to_additional_settings)
else:
await message.answer(
text="Произошла ошибка при установке кредитного плеча. Пожалуйста, попробуйте позже.",
reply_markup=kbi.back_to_additional_settings,

View File

@@ -38,6 +38,12 @@ async def start_trading(callback_query: CallbackQuery, state: FSMContext) -> Non
deals = await get_active_positions_by_symbol(
tg_id=callback_query.from_user.id, symbol=symbol
)
if deals == "Invalid API key permissions":
await callback_query.answer(
text="API ключ не имеет достаточных прав для запуска торговли",
)
return
position = next((d for d in deals if d.get("symbol") == symbol), None)
if position:
@@ -109,7 +115,9 @@ async def start_trading(callback_query: CallbackQuery, state: FSMContext) -> Non
"The number of contracts exceeds minimum limit allowed": "️️Лимит ставки меньше минимально допустимого",
"Order placement failed as your position may exceed the max":
"Не удалось разместить ордер, так как ваша позиция может превышать максимальный лимит."
"Пожалуйста, уменьшите кредитное плечо, чтобы увеличить максимальное значение"
"Пожалуйста, уменьшите кредитное плечо, чтобы увеличить максимальное значение",
"Permission denied, please check your API key permissions": "API ключ не имеет достаточных прав для запуска торговли"
}
if res == "OK":
@@ -131,6 +139,15 @@ async def start_trading(callback_query: CallbackQuery, state: FSMContext) -> Non
await add_start_task_merged(user_id=callback_query.from_user.id, task=task)
except Exception as e:
error_text = str(e)
known_errors = {
"Permission denied, please check your API key permissions": "API ключ не имеет достаточных прав для запуска торговли"
}
for key, msg in known_errors.items():
if key in error_text:
await callback_query.answer(msg)
else:
await callback_query.answer(text="Произошла ошибка при запуске торговли")
logger.error(
"Error processing command start_trading for user %s: %s",

View File

@@ -39,13 +39,13 @@ async def stop_all_trading(callback_query: CallbackQuery, state: FSMContext):
await rq.set_stop_timer(tg_id=callback_query.from_user.id, timer_end=0)
await asyncio.sleep(timer_end * 60)
await close_position_by_symbol(
tg_id=callback_query.from_user.id, symbol=symbol)
await rq.set_auto_trading(
tg_id=callback_query.from_user.id,
symbol=symbol,
auto_trading=False,
)
await close_position_by_symbol(
tg_id=callback_query.from_user.id, symbol=symbol)
await callback_query.message.edit_text(text=f"Торговля для {symbol} остановлена", reply_markup=kbi.profile_bybit)

View File

@@ -10,10 +10,9 @@ logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("database")
BASE_DIR = Path(__file__).parent.resolve()
DATA_DIR = BASE_DIR / "db"
DATA_DIR.mkdir(parents=True, exist_ok=True)
BASE_DIR.mkdir(parents=True, exist_ok=True)
DATABASE_URL = f"sqlite+aiosqlite:///{DATA_DIR / 'stcs.db'}"
DATABASE_URL = f"sqlite+aiosqlite:///{BASE_DIR / 'stcs.db'}"
async_engine = create_async_engine(
DATABASE_URL,
@@ -39,7 +38,7 @@ async_session = async_sessionmaker(
async def init_db():
try:
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await conn.run_sync(lambda sync_conn: Base.metadata.create_all(bind=sync_conn, checkfirst=True))
logger.info("Database initialized.")
except Exception as e:
logger.error("Database initialization failed: %s", e)

View File

@@ -154,8 +154,8 @@ class UserDeals(Base):
order_quantity = Column(Float, nullable=True)
martingale_factor = Column(Float, nullable=True)
max_bets_in_series = Column(Integer, nullable=True)
take_profit_percent = Column(Integer, nullable=True)
stop_loss_percent = Column(Integer, nullable=True)
take_profit_percent = Column(Float, nullable=True)
stop_loss_percent = Column(Float, nullable=True)
trigger_price = Column(Float, nullable=True)
current_series = Column(Integer, nullable=True)
commission_fee = Column(String, nullable=True)

View File

@@ -86,7 +86,7 @@ async def set_user_api(tg_id: int, api_key: str, api_secret: str) -> bool:
else:
# Creating new record
user_api = UserApi(
user=user, api_key=api_key, api_secret=api_secret
user_id=user.id, api_key=api_key, api_secret=api_secret
)
session.add(user_api)
@@ -141,7 +141,7 @@ async def set_user_symbol(tg_id: int, symbol: str) -> bool:
# Creating new record
user_symbol = UserSymbol(
symbol=symbol,
user=user,
user_id=user.id,
)
session.add(user_symbol)
@@ -197,7 +197,7 @@ async def create_user_additional_settings(tg_id: int) -> None:
# Create the user additional settings
user_additional_settings = UserAdditionalSettings(
user=user,
user_id=user.id,
trade_mode="Long", # Default value
switch_side="По направлению",
side="Buy",
@@ -267,7 +267,7 @@ async def set_trade_mode(tg_id: int, trade_mode: str) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
trade_mode=trade_mode,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -306,7 +306,7 @@ async def set_margin_type(tg_id: int, margin_type: str) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
margin_type=margin_type,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -345,7 +345,7 @@ async def set_switch_side(tg_id: int, switch_side: str) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
switch_side=switch_side,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -384,7 +384,7 @@ async def set_side(tg_id: int, side: str) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
side=side,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -423,7 +423,7 @@ async def set_leverage(tg_id: int, leverage: str) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
leverage=leverage,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -462,7 +462,7 @@ async def set_order_quantity(tg_id: int, order_quantity: float) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
order_quantity=order_quantity,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -503,7 +503,7 @@ async def set_martingale_factor(tg_id: int, martingale_factor: float) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
martingale_factor=martingale_factor,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -546,7 +546,7 @@ async def set_max_bets_in_series(tg_id: int, max_bets_in_series: int) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
max_bets_in_series=max_bets_in_series,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -587,7 +587,7 @@ async def set_trigger_price(tg_id: int, trigger_price: float) -> bool:
# Creating new record
user_additional_settings = UserAdditionalSettings(
trigger_price=trigger_price,
user=user,
user_id=user.id,
)
session.add(user_additional_settings)
@@ -627,7 +627,7 @@ async def create_user_risk_management(tg_id: int) -> None:
# Create the user risk management
user_risk_management = UserRiskManagement(
user=user,
user_id=user.id,
take_profit_percent=1.0,
stop_loss_percent=1.0,
commission_fee="Yes_commission_fee",
@@ -692,7 +692,7 @@ async def set_take_profit_percent(tg_id: int, take_profit_percent: float) -> boo
# Creating new record
user_risk_management = UserRiskManagement(
take_profit_percent=take_profit_percent,
user=user,
user_id=user.id,
)
session.add(user_risk_management)
@@ -733,7 +733,7 @@ async def set_stop_loss_percent(tg_id: int, stop_loss_percent: float) -> bool:
# Creating new record
user_risk_management = UserRiskManagement(
stop_loss_percent=stop_loss_percent,
user=user,
user_id=user.id,
)
session.add(user_risk_management)
@@ -774,7 +774,7 @@ async def set_commission_fee(tg_id: int, commission_fee: str) -> bool:
# Creating new record
user_risk_management = UserRiskManagement(
commission_fee=commission_fee,
user=user,
user_id=user.id,
)
session.add(user_risk_management)
@@ -815,7 +815,7 @@ async def set_commission_place(tg_id: int, commission_place: str) -> bool:
# Creating new record
user_risk_management = UserRiskManagement(
commission_place=commission_place,
user=user,
user_id=user.id,
)
session.add(user_risk_management)
@@ -855,7 +855,7 @@ async def create_user_conditional_settings(tg_id: int) -> None:
# Create the user conditional settings
user_conditional_settings = UserConditionalSettings(
user=user,
user_id=user.id,
timer_start=0,
timer_end=0,
)
@@ -920,7 +920,7 @@ async def set_start_timer(tg_id: int, timer_start: int) -> bool:
# Creating new record
user_conditional_settings = UserConditionalSettings(
timer_start=timer_start,
user=user,
user_id=user.id,
)
session.add(user_conditional_settings)
@@ -959,7 +959,7 @@ async def set_stop_timer(tg_id: int, timer_end: int) -> bool:
# Creating new record
user_conditional_settings = UserConditionalSettings(
timer_end=timer_end,
user=user,
user_id=user.id,
)
session.add(user_conditional_settings)
@@ -1051,7 +1051,7 @@ async def set_user_deal(
else:
# Creating new record
new_deal = UserDeals(
user=user,
user_id=user.id,
symbol=symbol,
current_step=current_step,
current_series=current_series,

1
run.py
View File

@@ -31,7 +31,6 @@ async def main():
dp = Dispatcher(storage=storage)
dp.include_router(router)
web_socket = WebSocketBot(telegram_bot=bot)
await web_socket.clear_user_sockets()
ws_task = asyncio.create_task(web_socket.run_user_check_loop())
tg_task = asyncio.create_task(dp.start_polling(bot))