Files
stcs/app/telegram/database/requests.py
algizn97 2ee8c9916f Fixed
2025-08-30 16:29:56 +05:00

578 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging.config
from logger_helper.logger_helper import LOGGING_CONFIG
from datetime import datetime, timedelta
from typing import Any
from app.telegram.database.models import (
async_session,
User_Telegram_Id as UTi,
User_Main_Settings as UMS,
User_Bybit_API as UBA,
User_Symbol,
User_Risk_Management_Settings as URMS,
User_Condition_Settings as UCS,
User_Additional_Settings as UAS,
Trading_Mode,
Margin_type,
Trigger,
USER_DEALS,
UserTimer,
)
from sqlalchemy import select, update
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("requests")
# --- Функции сохранения в БД ---
async def save_tg_id_new_user(tg_id) -> None:
"""
Сохраняет Telegram ID нового пользователя в базу, если такого ещё нет.
Args:
tg_id (int): Telegram ID пользователя.
"""
async with async_session() as session:
user = await session.scalar(select(UTi).where(UTi.tg_id == tg_id))
if not user:
session.add(UTi(tg_id=tg_id))
logger.info("Новый пользователь был добавлен в бд %s", tg_id)
await session.commit()
async def set_new_user_bybit_api(tg_id) -> None:
"""
Создаёт запись API пользователя Bybit, если её ещё нет.
Args:
tg_id (int): Telegram ID пользователя.
"""
async with async_session() as session:
user = await session.scalar(select(UBA).where(UBA.tg_id == tg_id))
if not user:
session.add(UBA(tg_id=tg_id))
await session.commit()
async def set_new_user_symbol(tg_id) -> None:
"""
Создаёт запись торгового символа пользователя, если её нет.
Args:
tg_id (int): Telegram ID пользователя.
"""
async with async_session() as session:
user = await session.scalar(select(User_Symbol).where(User_Symbol.tg_id == tg_id))
if not user:
session.add(User_Symbol(tg_id=tg_id))
logger.info(f"Symbol был успешно добавлен %s", tg_id)
await session.commit()
async def set_new_user_default_main_settings(tg_id, trading_mode, margin_type) -> None:
"""
Создаёт основные настройки пользователя по умолчанию.
Args:
tg_id (int): Telegram ID пользователя.
trading_mode (str): Режим торговли.
margin_type (str): Тип маржи.
"""
async with async_session() as session:
settings = await session.scalar(select(UMS).where(UMS.tg_id == tg_id))
if not settings:
session.add(UMS(
tg_id=tg_id,
trading_mode=trading_mode,
margin_type=margin_type,
))
logger.info("Основные настройки нового пользователя были заполнены%s", tg_id)
await session.commit()
async def set_new_user_default_risk_management_settings(tg_id) -> None:
"""
Создаёт настройки риск-менеджмента по умолчанию.
Args:
tg_id (int): Telegram ID пользователя.
"""
async with async_session() as session:
settings = await session.scalar(select(URMS).where(URMS.tg_id == tg_id))
if not settings:
session.add(URMS(
tg_id=tg_id
))
logger.info("Риск-Менеджмент настройки нового пользователя были заполнены %s", tg_id)
await session.commit()
async def set_new_user_default_condition_settings(tg_id, trigger) -> None:
"""
Создаёт условные настройки по умолчанию.
Args:
tg_id (int): Telegram ID пользователя.
trigger (Any): Значение триггера по умолчанию.
"""
async with async_session() as session:
settings = await session.scalar(select(UCS).where(UCS.tg_id == tg_id))
if not settings:
session.add(UCS(
tg_id=tg_id,
trigger=trigger
))
logger.info("Условные настройки нового пользователя были заполнены %s", tg_id)
await session.commit()
async def set_new_user_default_additional_settings(tg_id) -> None:
"""
Создаёт дополнительные настройки по умолчанию.
Args:
tg_id (int): Telegram ID пользователя.
"""
async with async_session() as session:
settings = await session.scalar(select(UAS).where(UAS.tg_id == tg_id))
if not settings:
session.add(UAS(
tg_id=tg_id,
))
logger.info("Дополнительные настройки нового пользователя были заполнены %s", tg_id)
await session.commit()
# --- Функции получения данных из БД ---
async def check_user(tg_id):
"""
Проверяет наличие пользователя в базе.
Args:
tg_id (int): Telegram ID пользователя.
Returns:
Optional[UTi]: Пользователь или None.
"""
async with async_session() as session:
user = await session.scalar(select(UTi).where(UTi.tg_id == tg_id))
return user
async def get_bybit_api_key(tg_id):
"""Получить API ключ Bybit пользователя."""
async with async_session() as session:
api_key = await session.scalar(select(UBA.api_key).where(UBA.tg_id == tg_id))
return api_key
async def get_bybit_secret_key(tg_id):
"""Получить секретный ключ Bybit пользователя."""
async with async_session() as session:
secret_key = await session.scalar(select(UBA.secret_key).where(UBA.tg_id == tg_id))
return secret_key
async def get_symbol(tg_id):
"""Получить символ пользователя."""
async with async_session() as session:
symbol = await session.scalar(select(User_Symbol.symbol).where(User_Symbol.tg_id == tg_id))
return symbol
async def get_user_trades(tg_id):
"""Получить сделки пользователя."""
async with async_session() as session:
query = select(USER_DEALS.symbol, USER_DEALS.side).where(USER_DEALS.tg_id == tg_id)
result = await session.execute(query)
trades = result.all()
return trades
async def get_entry_order_type(tg_id: object) -> str | None | Any:
"""Получить тип входного ордера пользователя."""
async with async_session() as session:
order_type = await session.scalar(
select(UMS.entry_order_type).where(UMS.tg_id == tg_id)
)
# Если в базе не установлен тип — возвращаем значение по умолчанию
return order_type or 'Market'
# --- Функции обновления данных ---
async def update_user_trades(tg_id, **kwargs):
"""Обновить сделки пользователя."""
async with async_session() as session:
query = update(USER_DEALS).where(USER_DEALS.tg_id == tg_id).values(**kwargs)
await session.execute(query)
await session.commit()
async def update_symbol(tg_id: int, symbol: str) -> None:
"""Обновить торговый символ пользователя."""
async with async_session() as session:
await session.execute(update(User_Symbol).where(User_Symbol.tg_id == tg_id).values(symbol=symbol))
await session.commit()
async def update_api_key(tg_id: int, api: str) -> None:
"""Обновить API ключ пользователя."""
async with async_session() as session:
await session.execute(update(UBA).where(UBA.tg_id == tg_id).values(api_key=api))
await session.commit()
async def update_secret_key(tg_id: int, api: str) -> None:
"""Обновить секретный ключ пользователя."""
async with async_session() as session:
await session.execute(update(UBA).where(UBA.tg_id == tg_id).values(secret_key=api))
await session.commit()
# --- Более мелкие обновления и запросы по настройкам ---
async def update_trade_mode_user(tg_id, trading_mode) -> None:
"""Обновить режим торговли пользователя."""
async with async_session() as session:
mode = await session.scalar(select(Trading_Mode.mode).where(Trading_Mode.mode == trading_mode))
if mode:
logger.info("Изменён торговый режим для пользователя %s", tg_id)
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(trading_mode=mode))
await session.commit()
async def delete_user_trade(tg_id: int, symbol: str):
"""Удалить сделку пользователя."""
async with async_session() as session:
await session.execute(
USER_DEALS.__table__.delete().where(
(USER_DEALS.tg_id == tg_id) & (USER_DEALS.symbol == symbol)
)
)
await session.commit()
async def get_for_registration_trading_mode():
"""Получить режим торговли по умолчанию."""
async with async_session() as session:
mode = await session.scalar(select(Trading_Mode.mode).where(Trading_Mode.id == 1))
return mode
async def get_for_registration_margin_type():
"""Получить тип маржи по умолчанию."""
async with async_session() as session:
type = await session.scalar(select(Margin_type.type).where(Margin_type.id == 1))
return type
async def get_for_registration_trigger(tg_id):
"""Получить триггер по умолчанию."""
async with async_session() as session:
trigger = await session.scalar(select(UCS.trigger).where(tg_id == tg_id))
return trigger
async def get_user_main_settings(tg_id):
"""Получить основные настройки пользователя."""
async with async_session() as session:
user = await session.scalar(select(UMS).where(UMS.tg_id == tg_id))
if user:
logger.info("Получение основных настроек пользователя %s", tg_id)
trading_mode = await session.scalar(select(UMS.trading_mode).where(UMS.tg_id == tg_id))
margin_mode = await session.scalar(select(UMS.margin_type).where(UMS.tg_id == tg_id))
switch_mode_enabled = await session.scalar(select(UMS.switch_mode_enabled).where(UMS.tg_id == tg_id))
switch_state = await session.scalar(select(UMS.switch_state).where(UMS.tg_id == tg_id))
size_leverage = await session.scalar(select(UMS.size_leverage).where(UMS.tg_id == tg_id))
starting_quantity = await session.scalar(select(UMS.starting_quantity).where(UMS.tg_id == tg_id))
martingale_factor = await session.scalar(select(UMS.martingale_factor).where(UMS.tg_id == tg_id))
maximal_quantity = await session.scalar(select(UMS.maximal_quantity).where(UMS.tg_id == tg_id))
entry_order_type = await session.scalar(select(UMS.entry_order_type).where(UMS.tg_id == tg_id))
limit_order_price = await session.scalar(select(UMS.limit_order_price).where(UMS.tg_id == tg_id))
martingale_step = await session.scalar(select(UMS.martingale_step).where(UMS.tg_id == tg_id))
data = {
'trading_mode': trading_mode,
'margin_type': margin_mode,
'switch_mode_enabled': switch_mode_enabled,
'switch_state': switch_state,
'size_leverage': size_leverage,
'starting_quantity': starting_quantity,
'martingale_factor': martingale_factor,
'maximal_quantity': maximal_quantity,
'entry_order_type': entry_order_type,
'limit_order_price': limit_order_price,
'martingale_step': martingale_step,
}
return data
async def get_user_risk_management_settings(tg_id):
"""Получить риск-менеджмента настройки пользователя."""
async with async_session() as session:
user = await session.scalar(select(URMS).where(URMS.tg_id == tg_id))
if user:
logger.info("Получение риск-менеджмента настроек пользователя %s", tg_id)
price_profit = await session.scalar(select(URMS.price_profit).where(URMS.tg_id == tg_id))
price_loss = await session.scalar(select(URMS.price_loss).where(URMS.tg_id == tg_id))
max_risk_deal = await session.scalar(select(URMS.max_risk_deal).where(URMS.tg_id == tg_id))
commission_fee = await session.scalar(select(URMS.commission_fee).where(URMS.tg_id == tg_id))
data = {
'price_profit': price_profit,
'price_loss': price_loss,
'max_risk_deal': max_risk_deal,
'commission_fee': commission_fee,
}
return data
async def update_margin_type(tg_id, margin_type) -> None:
"""Обновить тип маржи пользователя."""
async with async_session() as session:
type = await session.scalar(select(Margin_type.type).where(Margin_type.type == margin_type))
if type:
logger.info("Изменен тип маржи %s", tg_id)
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(margin_type=type))
await session.commit()
async def update_size_leverange(tg_id, num):
"""Обновить размер левеража пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(size_leverage=num))
await session.commit()
async def update_starting_quantity(tg_id, num):
"""Обновить размер левеража пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(starting_quantity=num))
await session.commit()
async def update_martingale_factor(tg_id, num):
"""Обновить размер левеража пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(martingale_factor=num))
await session.commit()
async def update_maximal_quantity(tg_id, num):
"""Обновить размер левеража пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(maximal_quantity=num))
await session.commit()
# ОБНОВЛЕНИЕ НАСТРОЕК РИСК-МЕНЕДЖМЕНТА
async def update_price_profit(tg_id, num):
"""Обновить цену тейк-профита (прибыль) пользователя."""
async with async_session() as session:
await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(price_profit=num))
await session.commit()
async def update_price_loss(tg_id, num):
"""Обновить цену тейк-лосса (убыток) пользователя."""
async with async_session() as session:
await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(price_loss=num))
await session.commit()
async def update_max_risk_deal(tg_id, num):
"""Обновить максимальную сумму риска пользователя."""
async with async_session() as session:
await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(max_risk_deal=num))
await session.commit()
async def update_entry_order_type(tg_id, order_type):
"""Обновить тип входного ордера пользователя."""
async with async_session() as session:
await session.execute(
update(UMS)
.where(UMS.tg_id == tg_id)
.values(entry_order_type=order_type)
)
await session.commit()
async def get_limit_price(tg_id):
"""Получить лимитную цену пользователя как float, либо None."""
async with async_session() as session:
result = await session.execute(
select(UMS.limit_order_price)
.where(UMS.tg_id == tg_id)
)
price = result.scalar_one_or_none()
if price:
try:
return float(price)
except ValueError:
return None
return None
async def update_limit_price(tg_id, price):
"""Обновить лимитную цену пользователя."""
async with async_session() as session:
await session.execute(
update(UMS)
.where(UMS.tg_id == tg_id)
.values(limit_order_price=str(price))
)
await session.commit()
async def update_commission_fee(tg_id, num):
"""Обновить комиссию пользователя."""
async with async_session() as session:
await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(commission_fee=num))
await session.commit()
async def get_user_timer(tg_id):
"""Получить данные о таймере пользователя."""
async with async_session() as session:
result = await session.execute(select(UserTimer).where(UserTimer.tg_id == tg_id))
user_timer = result.scalars().first()
if not user_timer:
logging.info(f"No timer found for user {tg_id}")
return None
timer_minutes = user_timer.timer_minutes
timer_start = user_timer.timer_start
timer_end = user_timer.timer_end
logging.info(f"Timer data for tg_id={tg_id}: "
f"timer_minutes={timer_minutes}, "
f"timer_start={timer_start}, "
f"timer_end={timer_end}")
remaining = None
if timer_end:
remaining = max(0, int((timer_end - datetime.utcnow()).total_seconds() // 60))
return {
"timer_minutes": timer_minutes,
"timer_start": timer_start,
"timer_end": timer_end,
"remaining_minutes": remaining
}
async def update_user_timer(tg_id, minutes: int):
"""Обновить данные о таймере пользователя."""
async with async_session() as session:
try:
timer_start = None
timer_end = None
if minutes > 0:
timer_start = datetime.utcnow()
timer_end = timer_start + timedelta(minutes=minutes)
result = await session.execute(select(UserTimer).where(UserTimer.tg_id == tg_id))
user_timer = result.scalars().first()
if user_timer:
user_timer.timer_minutes = minutes
user_timer.timer_start = timer_start
user_timer.timer_end = timer_end
else:
user_timer = UserTimer(
tg_id=tg_id,
timer_minutes=minutes,
timer_start=timer_start,
timer_end=timer_end
)
session.add(user_timer)
await session.commit()
except Exception as e:
logging.error(f"Ошибка обновления таймера пользователя {tg_id}: {e}")
async def get_martingale_step(tg_id):
"""Получить шаг мартингейла пользователя."""
async with async_session() as session:
result = await session.execute(select(UMS).where(UMS.tg_id == tg_id))
user_settings = result.scalars().first()
return user_settings.martingale_step
async def update_martingale_step(tg_id, step):
"""Обновить шаг мартингейла пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(martingale_step=step))
await session.commit()
async def update_switch_mode_enabled(tg_id, switch_mode):
"""Обновить режим переключения пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(switch_mode_enabled=switch_mode))
await session.commit()
async def update_switch_state(tg_id, switch_state):
"""Обновить состояние переключения пользователя."""
async with async_session() as session:
await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(switch_state=switch_state))
await session.commit()
async def update_trigger(tg_id, trigger):
"""Обновить триггер пользователя."""
async with async_session() as session:
await session.execute(update(UCS).where(UCS.tg_id == tg_id).values(trigger=trigger))
await session.commit()