1367 lines
48 KiB
Python
1367 lines
48 KiB
Python
import logging.config
|
|
|
|
from asyncpg.exceptions import UniqueViolationError
|
|
from logger_helper.logger_helper import LOGGING_CONFIG
|
|
from sqlalchemy import distinct, select
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from database import async_session
|
|
from database.models import (
|
|
User,
|
|
UserAdditionalSettings,
|
|
UserApi,
|
|
UserConditionalSettings,
|
|
UserDeals,
|
|
UserRiskManagement,
|
|
UserSymbol,
|
|
UserAutoTrading,
|
|
)
|
|
|
|
logging.config.dictConfig(LOGGING_CONFIG)
|
|
logger = logging.getLogger("request")
|
|
|
|
|
|
async def create_user(tg_id: int, username: str) -> None:
|
|
"""Create a new user in the database."""
|
|
try:
|
|
existing_user = await get_user(tg_id)
|
|
if existing_user:
|
|
logger.info("User already exists: %s", tg_id)
|
|
return
|
|
async with async_session() as session:
|
|
user = User(tg_id=tg_id, username=username)
|
|
session.add(user)
|
|
await session.commit()
|
|
logger.info("User created: %s", tg_id)
|
|
|
|
except IntegrityError as e:
|
|
if isinstance(e.orig, UniqueViolationError):
|
|
logger.info("User already exists: %s", tg_id)
|
|
else:
|
|
logger.error("Error creating user %s: %s", tg_id, e)
|
|
|
|
|
|
async def get_users():
|
|
"""Get all users from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User))
|
|
return result.scalars().all()
|
|
except Exception as e:
|
|
logger.error("Error getting users: %s", e)
|
|
return []
|
|
|
|
|
|
async def get_user(tg_id: int):
|
|
"""Get a user from the database by Telegram ID."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
return result.scalars().first()
|
|
except Exception as e:
|
|
logger.error("Error getting user for tg_id %s: %s", tg_id, e)
|
|
|
|
|
|
async def set_user_api(tg_id: int, api_key: str, api_secret: str) -> bool:
|
|
"""
|
|
Set API key and secret for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param api_key: API key
|
|
:param api_secret: API secret
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User).options(joinedload(User.user_api)).filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_api:
|
|
# Updating existing record
|
|
user.user_api.api_key = api_key
|
|
user.user_api.api_secret = api_secret
|
|
else:
|
|
# Creating new record
|
|
user_api = UserApi(
|
|
user=user, api_key=api_key, api_secret=api_secret
|
|
)
|
|
session.add(user_api)
|
|
|
|
await session.commit()
|
|
logger.info("User API keys updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user API keys for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def get_user_api(tg_id: int):
|
|
"""Get API key and secret for a user from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User).options(joinedload(User.user_api)).filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
if user and user.user_api:
|
|
return user.user_api.api_key, user.user_api.api_secret
|
|
return None, None
|
|
except Exception as e:
|
|
logger.error("Error getting user API for user %s: %s", tg_id, e)
|
|
return None, None
|
|
|
|
|
|
async def set_user_symbol(tg_id: int, symbol: str) -> bool:
|
|
"""
|
|
Set symbol for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param symbol: Symbol to set
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_symbol))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_symbol:
|
|
# Updating existing record
|
|
user.user_symbol.symbol = symbol
|
|
else:
|
|
# Creating new record
|
|
user_symbol = UserSymbol(
|
|
symbol=symbol,
|
|
user=user,
|
|
)
|
|
session.add(user_symbol)
|
|
|
|
await session.commit()
|
|
logger.info("User symbol updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user symbol for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def get_user_symbol(tg_id: int):
|
|
"""Get symbol for a user from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_symbol))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
if user and user.user_symbol:
|
|
return user.user_symbol.symbol
|
|
return None
|
|
except Exception as e:
|
|
logger.error("Error getting symbol for user %s: %s", tg_id, e)
|
|
return None
|
|
|
|
|
|
# USER ADDITIONAL SETTINGS
|
|
|
|
|
|
async def create_user_additional_settings(tg_id: int) -> None:
|
|
"""Create a new user additional settings in the database."""
|
|
try:
|
|
existing_user_additional_settings = await get_user_additional_settings(tg_id)
|
|
|
|
if existing_user_additional_settings:
|
|
logger.info("User additional settings already exists: %s", tg_id)
|
|
return
|
|
|
|
async with async_session() as session:
|
|
# Get the user
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
|
|
if not user:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return
|
|
|
|
# Create the user additional settings
|
|
user_additional_settings = UserAdditionalSettings(
|
|
user=user,
|
|
trade_mode="Merged_Single", # Default value
|
|
order_type="Market",
|
|
conditional_order_type="Market",
|
|
margin_type="ISOLATED_MARGIN",
|
|
leverage="10",
|
|
leverage_to_buy="10",
|
|
leverage_to_sell="10",
|
|
order_quantity=5.0,
|
|
martingale_factor=1.0,
|
|
max_bets_in_series=10,
|
|
)
|
|
session.add(user_additional_settings)
|
|
await session.commit()
|
|
logger.info("User additional settings created: %s", tg_id)
|
|
|
|
except IntegrityError as e:
|
|
if isinstance(e.orig, UniqueViolationError):
|
|
logger.info("User additional settings already exists: %s", tg_id)
|
|
else:
|
|
logger.error(
|
|
"Error creating user additional settings for user %s: %s", tg_id, e
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
"General error creating user additional settings for user %s: %s", tg_id, e
|
|
)
|
|
|
|
|
|
async def get_user_additional_settings(tg_id: int):
|
|
"""Get user additional settings from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
if user and user.user_additional_settings:
|
|
return user.user_additional_settings
|
|
return None
|
|
except Exception as e:
|
|
logger.error("Error getting user additional settings for user %s: %s", tg_id, e)
|
|
return None
|
|
|
|
|
|
async def set_trade_mode(tg_id: int, trade_mode: str) -> bool:
|
|
"""
|
|
Set trade mode for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param trade_mode: "Both_Sides" or "Merged_Single"
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.trade_mode = trade_mode
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
trade_mode=trade_mode,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User trade mode updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user trade mode for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_order_type(tg_id: int, order_type: str) -> bool:
|
|
"""
|
|
Set order type for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param order_type: "Market" or "Limit"
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.order_type = order_type
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
order_type=order_type,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User order type updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user order type for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_conditional_order_type(tg_id: int, conditional_order_type: str) -> bool:
|
|
"""
|
|
Set conditional order type for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param conditional_order_type: "Market" or "Limit"
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.conditional_order_type = (
|
|
conditional_order_type
|
|
)
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
conditional_order_type=conditional_order_type,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User conditional order type updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user conditional order type for user %s: %s",
|
|
tg_id,
|
|
e,
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_margin_type(tg_id: int, margin_type: str) -> bool:
|
|
"""
|
|
Set margin type for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param margin_type: "ISOLATED_MARGIN" or "REGULAR_MARGIN"
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.margin_type = margin_type
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
margin_type=margin_type,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User margin type updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user margin type for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_leverage(tg_id: int, leverage: str) -> bool:
|
|
"""
|
|
Set leverage for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param leverage: Leverage
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.leverage = leverage
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
leverage=leverage,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User leverage updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user leverage for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_leverage_to_buy_and_sell(
|
|
tg_id: int, leverage_to_buy: str, leverage_to_sell: str
|
|
) -> bool:
|
|
"""
|
|
Set leverage for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param leverage_to_buy: Leverage to buy
|
|
:param leverage_to_sell: Leverage to sell
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.leverage_to_buy = leverage_to_buy
|
|
user.user_additional_settings.leverage_to_sell = leverage_to_sell
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
leverage_to_buy=leverage_to_buy,
|
|
leverage_to_sell=leverage_to_sell,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User leverage updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user leverage for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_order_quantity(tg_id: int, order_quantity: float) -> bool:
|
|
"""
|
|
Set order quantity for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param order_quantity: Order quantity
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.order_quantity = order_quantity
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
order_quantity=order_quantity,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User order quantity updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user order quantity for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_martingale_factor(tg_id: int, martingale_factor: float) -> bool:
|
|
"""
|
|
Set martingale factor for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param martingale_factor: Martingale factor
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.martingale_factor = martingale_factor
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
martingale_factor=martingale_factor,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User martingale factor updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user martingale factor for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_max_bets_in_series(tg_id: int, max_bets_in_series: int) -> bool:
|
|
"""
|
|
Set max steps for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param max_bets_in_series: Max steps
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.max_bets_in_series = (
|
|
max_bets_in_series
|
|
)
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
max_bets_in_series=max_bets_in_series,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User max bets in series updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user max bets in series for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_limit_price(tg_id: int, limit_price: float) -> bool:
|
|
"""
|
|
Set limit price for a user in the database.
|
|
:param tg_id:
|
|
:param limit_price:
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.limit_price = limit_price
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
limit_price=limit_price,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User limit price updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user limit price for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_trigger_price(tg_id: int, trigger_price: float) -> bool:
|
|
"""
|
|
Set trigger price for a user in the database.
|
|
:param tg_id:
|
|
:param trigger_price:
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_additional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_additional_settings:
|
|
# Updating existing record
|
|
user.user_additional_settings.trigger_price = trigger_price
|
|
else:
|
|
# Creating new record
|
|
user_additional_settings = UserAdditionalSettings(
|
|
trigger_price=trigger_price,
|
|
user=user,
|
|
)
|
|
session.add(user_additional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User trigger price updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user trigger price for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
# USER RISK MANAGEMENT
|
|
|
|
|
|
async def create_user_risk_management(tg_id: int) -> None:
|
|
"""Create a new user risk management in the database."""
|
|
try:
|
|
existing_user_risk_management = await get_user_risk_management(tg_id)
|
|
|
|
if existing_user_risk_management:
|
|
logger.info("User risk management already exists: %s", tg_id)
|
|
return
|
|
|
|
async with async_session() as session:
|
|
# Get the user
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
|
|
if not user:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return
|
|
|
|
# Create the user risk management
|
|
user_risk_management = UserRiskManagement(
|
|
user=user,
|
|
take_profit_percent=1,
|
|
stop_loss_percent=1,
|
|
max_risk_percent=100,
|
|
commission_fee="Yes_commission_fee",
|
|
)
|
|
session.add(user_risk_management)
|
|
await session.commit()
|
|
logger.info("User risk management created: %s", tg_id)
|
|
|
|
except IntegrityError as e:
|
|
if isinstance(e.orig, UniqueViolationError):
|
|
logger.info("User risk management already exists: %s", tg_id)
|
|
else:
|
|
logger.error(
|
|
"Error creating user risk management for user %s: %s", tg_id, e
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
"General error creating user risk management for user %s: %s", tg_id, e
|
|
)
|
|
|
|
|
|
async def get_user_risk_management(tg_id: int):
|
|
"""Get user risk management from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_risk_management))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
if user and user.user_risk_management:
|
|
return user.user_risk_management
|
|
return None
|
|
except Exception as e:
|
|
logger.error("Error getting user risk management for user %s: %s", tg_id, e)
|
|
return None
|
|
|
|
|
|
async def set_take_profit_percent(tg_id: int, take_profit_percent: int) -> bool:
|
|
"""
|
|
Set take profit percent for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param take_profit_percent: Take profit percent
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_risk_management))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_risk_management:
|
|
# Updating existing record
|
|
user.user_risk_management.take_profit_percent = take_profit_percent
|
|
else:
|
|
# Creating new record
|
|
user_risk_management = UserRiskManagement(
|
|
take_profit_percent=take_profit_percent,
|
|
user=user,
|
|
)
|
|
session.add(user_risk_management)
|
|
|
|
await session.commit()
|
|
logger.info("User take profit percent updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user take profit percent for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_stop_loss_percent(tg_id: int, stop_loss_percent: int) -> bool:
|
|
"""
|
|
Set stop loss percent for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param stop_loss_percent: Stop loss percent
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_risk_management))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_risk_management:
|
|
# Updating existing record
|
|
user.user_risk_management.stop_loss_percent = stop_loss_percent
|
|
else:
|
|
# Creating new record
|
|
user_risk_management = UserRiskManagement(
|
|
stop_loss_percent=stop_loss_percent,
|
|
user=user,
|
|
)
|
|
session.add(user_risk_management)
|
|
|
|
await session.commit()
|
|
logger.info("User stop loss percent updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user stop loss percent for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_max_risk_percent(tg_id: int, max_risk_percent: int) -> bool:
|
|
"""
|
|
Set max risk percent for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param max_risk_percent: Max risk percent
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_risk_management))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_risk_management:
|
|
# Updating existing record
|
|
user.user_risk_management.max_risk_percent = max_risk_percent
|
|
else:
|
|
# Creating new record
|
|
user_risk_management = UserRiskManagement(
|
|
max_risk_percent=max_risk_percent,
|
|
user=user,
|
|
)
|
|
session.add(user_risk_management)
|
|
|
|
await session.commit()
|
|
logger.info("User max risk percent updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user max risk percent for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
async def set_commission_fee(tg_id: int, commission_fee: str) -> bool:
|
|
"""
|
|
Set commission fee for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param commission_fee: Commission fee
|
|
:return: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_risk_management))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_risk_management:
|
|
# Updating existing record
|
|
user.user_risk_management.commission_fee = commission_fee
|
|
else:
|
|
# Creating new record
|
|
user_risk_management = UserRiskManagement(
|
|
commission_fee=commission_fee,
|
|
user=user,
|
|
)
|
|
session.add(user_risk_management)
|
|
|
|
await session.commit()
|
|
logger.info("User commission fee updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error adding/updating user commission fee for user %s: %s", tg_id, e
|
|
)
|
|
return False
|
|
|
|
|
|
# USER CONDITIONAL SETTINGS
|
|
|
|
|
|
async def create_user_conditional_settings(tg_id: int) -> None:
|
|
"""Create a new user conditional settings in the database."""
|
|
try:
|
|
existing_user_conditional_settings = await get_user_conditional_settings(tg_id)
|
|
|
|
if existing_user_conditional_settings:
|
|
logger.info("User conditional settings already exists: %s", tg_id)
|
|
return
|
|
|
|
async with async_session() as session:
|
|
# Get the user
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
|
|
if not user:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return
|
|
|
|
# Create the user conditional settings
|
|
user_conditional_settings = UserConditionalSettings(
|
|
user=user,
|
|
timer_start=0,
|
|
timer_end=0,
|
|
)
|
|
session.add(user_conditional_settings)
|
|
await session.commit()
|
|
logger.info("User conditional settings created: %s", tg_id)
|
|
|
|
except IntegrityError as e:
|
|
if isinstance(e.orig, UniqueViolationError):
|
|
logger.info("User conditional settings already exists: %s", tg_id)
|
|
else:
|
|
logger.error(
|
|
"Error creating user conditional settings for user %s: %s", tg_id, e
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
"General error creating user conditional settings for user %s: %s", tg_id, e
|
|
)
|
|
|
|
|
|
async def get_user_conditional_settings(tg_id: int):
|
|
"""Get user conditional settings from the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_conditional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
if user and user.user_conditional_settings:
|
|
return user.user_conditional_settings
|
|
return None
|
|
except Exception as e:
|
|
logger.error(
|
|
"Error getting user conditional settings for user %s: %s", tg_id, e
|
|
)
|
|
return None
|
|
|
|
|
|
async def set_start_timer(tg_id: int, timer_start: int) -> bool:
|
|
"""
|
|
Set the start timer for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param timer_start: Start timer
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_conditional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_conditional_settings:
|
|
# Updating existing record
|
|
user.user_conditional_settings.timer_start = timer_start
|
|
else:
|
|
# Creating new record
|
|
user_conditional_settings = UserConditionalSettings(
|
|
timer_start=timer_start,
|
|
user=user,
|
|
)
|
|
session.add(user_conditional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User start timer updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user start timer for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
async def set_stop_timer(tg_id: int, timer_end: int) -> bool:
|
|
"""
|
|
Set the stop timer for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param timer_end: Stop timer
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(User)
|
|
.options(joinedload(User.user_conditional_settings))
|
|
.filter_by(tg_id=tg_id)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if user:
|
|
if user.user_conditional_settings:
|
|
# Updating existing record
|
|
user.user_conditional_settings.timer_end = timer_end
|
|
else:
|
|
# Creating new record
|
|
user_conditional_settings = UserConditionalSettings(
|
|
timer_end=timer_end,
|
|
user=user,
|
|
)
|
|
session.add(user_conditional_settings)
|
|
|
|
await session.commit()
|
|
logger.info("User stop timer updated for user: %s", tg_id)
|
|
return True
|
|
else:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
except Exception as e:
|
|
logger.error("Error adding/updating user stop timer for user %s: %s", tg_id, e)
|
|
return False
|
|
|
|
|
|
# USER DEALS
|
|
async def set_user_deal(
|
|
tg_id: int,
|
|
symbol: str,
|
|
last_side: str,
|
|
current_step: int,
|
|
trade_mode: str,
|
|
margin_type: str,
|
|
leverage: str,
|
|
leverage_to_buy: str,
|
|
leverage_to_sell: str,
|
|
order_type: str,
|
|
conditional_order_type: str,
|
|
order_quantity: float,
|
|
limit_price: float,
|
|
trigger_price: float,
|
|
martingale_factor: float,
|
|
max_bets_in_series: int,
|
|
take_profit_percent: int,
|
|
stop_loss_percent: int,
|
|
max_risk_percent: int,
|
|
switch_side_mode: bool,
|
|
):
|
|
"""
|
|
Set the user deal in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param symbol: Symbol
|
|
:param last_side: Last side
|
|
:param current_step: Current step
|
|
:param trade_mode: Trade mode
|
|
:param margin_type: Margin type
|
|
:param leverage: Leverage
|
|
:param leverage_to_buy: Leverage to buy
|
|
:param leverage_to_sell: Leverage to sell
|
|
:param order_type: Order type
|
|
:param conditional_order_type: Conditional order type
|
|
:param order_quantity: Order quantity
|
|
:param limit_price: Limit price
|
|
:param trigger_price: Trigger price
|
|
:param martingale_factor: Martingale factor
|
|
:param max_bets_in_series: Max bets in series
|
|
:param take_profit_percent: Take profit percent
|
|
:param stop_loss_percent: Stop loss percent
|
|
:param max_risk_percent: Max risk percent
|
|
:param switch_side_mode: Switch side mode
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
if not user:
|
|
logger.error("User not found with tg_id: %s", tg_id)
|
|
return False
|
|
|
|
result_deal = await session.execute(
|
|
select(UserDeals).filter_by(user_id=user.id, symbol=symbol)
|
|
)
|
|
deal = result_deal.scalars().first()
|
|
|
|
if deal:
|
|
# Updating existing record
|
|
deal.last_side = last_side
|
|
deal.current_step = current_step
|
|
deal.trade_mode = trade_mode
|
|
deal.margin_type = margin_type
|
|
deal.leverage = leverage
|
|
deal.leverage_to_buy = leverage_to_buy
|
|
deal.leverage_to_sell = leverage_to_sell
|
|
deal.order_type = order_type
|
|
deal.conditional_order_type = conditional_order_type
|
|
deal.order_quantity = order_quantity
|
|
deal.limit_price = limit_price
|
|
deal.trigger_price = trigger_price
|
|
deal.martingale_factor = martingale_factor
|
|
deal.max_bets_in_series = max_bets_in_series
|
|
deal.take_profit_percent = take_profit_percent
|
|
deal.stop_loss_percent = stop_loss_percent
|
|
deal.max_risk_percent = max_risk_percent
|
|
deal.switch_side_mode = switch_side_mode
|
|
else:
|
|
# Creating new record
|
|
new_deal = UserDeals(
|
|
user=user,
|
|
symbol=symbol,
|
|
last_side=last_side,
|
|
current_step=current_step,
|
|
trade_mode=trade_mode,
|
|
margin_type=margin_type,
|
|
leverage=leverage,
|
|
leverage_to_buy=leverage_to_buy,
|
|
leverage_to_sell=leverage_to_sell,
|
|
order_type=order_type,
|
|
conditional_order_type=conditional_order_type,
|
|
order_quantity=order_quantity,
|
|
limit_price=limit_price,
|
|
trigger_price=trigger_price,
|
|
martingale_factor=martingale_factor,
|
|
max_bets_in_series=max_bets_in_series,
|
|
take_profit_percent=take_profit_percent,
|
|
stop_loss_percent=stop_loss_percent,
|
|
max_risk_percent=max_risk_percent,
|
|
switch_side_mode=switch_side_mode,
|
|
)
|
|
session.add(new_deal)
|
|
|
|
await session.commit()
|
|
logger.info("User deals set for user %s and symbol %s", tg_id, symbol)
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error("Error setting user deal for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return False
|
|
|
|
|
|
async def get_user_deal_by_symbol(tg_id: int, symbol: str):
|
|
"""Get user deal by symbol from the database asynchronously."""
|
|
try:
|
|
async with async_session() as session:
|
|
|
|
result_user = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result_user.scalars().first()
|
|
if not user:
|
|
return None
|
|
|
|
result_deal = await session.execute(
|
|
select(UserDeals).filter_by(user_id=user.id, symbol=symbol)
|
|
)
|
|
deal = result_deal.scalars().first()
|
|
return deal
|
|
except Exception as e:
|
|
logger.error("Error getting deal for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return None
|
|
|
|
|
|
async def get_all_symbols(tg_id: int):
|
|
"""Get all symbols from the database asynchronously."""
|
|
try:
|
|
async with async_session() as session:
|
|
|
|
result_user = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result_user.scalars().first()
|
|
if not user:
|
|
return []
|
|
|
|
result_symbols = await session.execute(
|
|
select(distinct(UserDeals.symbol)).filter_by(user_id=user.id)
|
|
)
|
|
symbols = [row[0] for row in result_symbols.all() if row[0] is not None]
|
|
return symbols
|
|
except Exception as e:
|
|
logger.error("Error getting symbols for user %s: %s", tg_id, e)
|
|
return []
|
|
|
|
|
|
async def set_fee_user_deal_by_symbol(tg_id: int, symbol: str, fee: float):
|
|
"""Set fee for a user deal by symbol in the database."""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
if user is None:
|
|
logger.error(f"User with tg_id={tg_id} not found")
|
|
return False
|
|
|
|
result = await session.execute(
|
|
select(UserDeals).filter_by(user_id=user.id, symbol=symbol)
|
|
)
|
|
record = result.scalars().first()
|
|
|
|
if record:
|
|
record.fee = fee
|
|
else:
|
|
logger.error(f"User deal with user_id={user.id} and symbol={symbol} not found")
|
|
return False
|
|
await session.commit()
|
|
logger.info("Set fee for user %s and symbol %s", tg_id, symbol)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Error setting user deal fee for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return False
|
|
|
|
|
|
# USER AUTO TRADING
|
|
|
|
async def get_all_user_auto_trading(tg_id: int):
|
|
"""Get all user auto trading from the database asynchronously."""
|
|
try:
|
|
async with async_session() as session:
|
|
result_user = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result_user.scalars().first()
|
|
if not user:
|
|
return []
|
|
|
|
result_auto_trading = await session.execute(
|
|
select(UserAutoTrading).filter_by(user_id=user.id)
|
|
)
|
|
auto_trading = result_auto_trading.scalars().all()
|
|
return auto_trading
|
|
except Exception as e:
|
|
logger.error("Error getting auto trading for user %s: %s", tg_id, e)
|
|
return []
|
|
|
|
|
|
async def get_user_auto_trading(tg_id: int, symbol: str, side: str):
|
|
"""Get user auto trading from the database asynchronously."""
|
|
try:
|
|
async with async_session() as session:
|
|
result_user = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result_user.scalars().first()
|
|
if not user:
|
|
return None
|
|
|
|
result_auto_trading = await session.execute(
|
|
select(UserAutoTrading).filter_by(user_id=user.id, symbol=symbol, side=side)
|
|
)
|
|
auto_trading = result_auto_trading.scalars().first()
|
|
return auto_trading
|
|
except Exception as e:
|
|
logger.error("Error getting auto trading for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return None
|
|
|
|
|
|
async def set_auto_trading(tg_id: int, symbol: str, auto_trading: bool, side: str) -> bool:
|
|
"""
|
|
Set the auto trading for a user in the database.
|
|
:param tg_id: Telegram user ID
|
|
:param symbol: Symbol
|
|
:param auto_trading: Auto trading
|
|
:param side: Side
|
|
:return: bool
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
if user is None:
|
|
logger.error(f"User with tg_id={tg_id} not found")
|
|
return False
|
|
|
|
result = await session.execute(
|
|
select(UserAutoTrading).filter_by(user_id=user.id, symbol=symbol, side=side)
|
|
)
|
|
record = result.scalars().first()
|
|
if record:
|
|
record.auto_trading = auto_trading
|
|
else:
|
|
new_record = UserAutoTrading(
|
|
user_id=user.id,
|
|
symbol=symbol,
|
|
auto_trading=auto_trading,
|
|
side=side
|
|
)
|
|
session.add(new_record)
|
|
await session.commit()
|
|
logger.info("Set auto_trading=%s for user %s and symbol %s", auto_trading, tg_id, symbol)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Error setting auto_trading for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return False
|
|
|
|
|
|
async def set_fee_user_auto_trading(tg_id: int, symbol: str, side: str, fee: float) -> bool:
|
|
"""
|
|
Set the fee for a user auto trading in the database.
|
|
:param tg_id:
|
|
:param symbol:
|
|
:param side:
|
|
:param fee:
|
|
:return:
|
|
"""
|
|
try:
|
|
async with async_session() as session:
|
|
result = await session.execute(select(User).filter_by(tg_id=tg_id))
|
|
user = result.scalars().first()
|
|
if user is None:
|
|
logger.error(f"User with tg_id={tg_id} not found")
|
|
return False
|
|
|
|
result = await session.execute(
|
|
select(UserAutoTrading).filter_by(user_id=user.id, symbol=symbol, side=side)
|
|
)
|
|
record = result.scalars().first()
|
|
|
|
if record:
|
|
record.fee = fee
|
|
else:
|
|
user_fee = UserAutoTrading(
|
|
user_id=user.id,
|
|
symbol=symbol,
|
|
fee=fee,
|
|
side=side
|
|
)
|
|
session.add(user_fee)
|
|
await session.commit()
|
|
logger.info("Set fee for user %s and symbol %s", tg_id, symbol)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Error setting user auto trading fee for user %s and symbol %s: %s", tg_id, symbol, e)
|
|
return False
|