import logging.config from logger_helper.logger_helper import LOGGING_CONFIG from datetime import datetime, timedelta from typing import Any from app.telegram.database.models import ( async_session, User_Telegram_Id as UTi, User_Main_Settings as UMS, User_Bybit_API as UBA, User_Symbol, User_Risk_Management_Settings as URMS, User_Condition_Settings as UCS, User_Additional_Settings as UAS, Trading_Mode, Margin_type, Trigger, USER_DEALS, UserTimer, ) from sqlalchemy import select, update logging.config.dictConfig(LOGGING_CONFIG) logger = logging.getLogger("requests") # --- Функции сохранения в БД --- async def save_tg_id_new_user(tg_id) -> None: """ Сохраняет Telegram ID нового пользователя в базу, если такого ещё нет. Args: tg_id (int): Telegram ID пользователя. """ async with async_session() as session: user = await session.scalar(select(UTi).where(UTi.tg_id == tg_id)) if not user: session.add(UTi(tg_id=tg_id)) logger.info("Новый пользователь был добавлен в бд %s", tg_id) await session.commit() async def set_new_user_bybit_api(tg_id) -> None: """ Создаёт запись API пользователя Bybit, если её ещё нет. Args: tg_id (int): Telegram ID пользователя. """ async with async_session() as session: user = await session.scalar(select(UBA).where(UBA.tg_id == tg_id)) if not user: session.add(UBA(tg_id=tg_id)) await session.commit() async def set_new_user_symbol(tg_id) -> None: """ Создаёт запись торгового символа пользователя, если её нет. Args: tg_id (int): Telegram ID пользователя. """ async with async_session() as session: user = await session.scalar(select(User_Symbol).where(User_Symbol.tg_id == tg_id)) if not user: session.add(User_Symbol(tg_id=tg_id)) logger.info(f"Symbol был успешно добавлен %s", tg_id) await session.commit() async def set_new_user_default_main_settings(tg_id, trading_mode, margin_type) -> None: """ Создаёт основные настройки пользователя по умолчанию. Args: tg_id (int): Telegram ID пользователя. trading_mode (str): Режим торговли. margin_type (str): Тип маржи. """ async with async_session() as session: settings = await session.scalar(select(UMS).where(UMS.tg_id == tg_id)) if not settings: session.add(UMS( tg_id=tg_id, trading_mode=trading_mode, margin_type=margin_type, )) logger.info("Основные настройки нового пользователя были заполнены%s", tg_id) await session.commit() async def set_new_user_default_risk_management_settings(tg_id) -> None: """ Создаёт настройки риск-менеджмента по умолчанию. Args: tg_id (int): Telegram ID пользователя. """ async with async_session() as session: settings = await session.scalar(select(URMS).where(URMS.tg_id == tg_id)) if not settings: session.add(URMS( tg_id=tg_id )) logger.info("Риск-Менеджмент настройки нового пользователя были заполнены %s", tg_id) await session.commit() async def set_new_user_default_condition_settings(tg_id, trigger) -> None: """ Создаёт условные настройки по умолчанию. Args: tg_id (int): Telegram ID пользователя. trigger (Any): Значение триггера по умолчанию. """ async with async_session() as session: settings = await session.scalar(select(UCS).where(UCS.tg_id == tg_id)) if not settings: session.add(UCS( tg_id=tg_id, trigger=trigger )) logger.info("Условные настройки нового пользователя были заполнены %s", tg_id) await session.commit() async def set_new_user_default_additional_settings(tg_id) -> None: """ Создаёт дополнительные настройки по умолчанию. Args: tg_id (int): Telegram ID пользователя. """ async with async_session() as session: settings = await session.scalar(select(UAS).where(UAS.tg_id == tg_id)) if not settings: session.add(UAS( tg_id=tg_id, )) logger.info("Дополнительные настройки нового пользователя были заполнены %s", tg_id) await session.commit() # --- Функции получения данных из БД --- async def check_user(tg_id): """ Проверяет наличие пользователя в базе. Args: tg_id (int): Telegram ID пользователя. Returns: Optional[UTi]: Пользователь или None. """ async with async_session() as session: user = await session.scalar(select(UTi).where(UTi.tg_id == tg_id)) return user async def get_bybit_api_key(tg_id): """Получить API ключ Bybit пользователя.""" async with async_session() as session: api_key = await session.scalar(select(UBA.api_key).where(UBA.tg_id == tg_id)) return api_key async def get_bybit_secret_key(tg_id): """Получить секретный ключ Bybit пользователя.""" async with async_session() as session: secret_key = await session.scalar(select(UBA.secret_key).where(UBA.tg_id == tg_id)) return secret_key async def get_symbol(tg_id): """Получить символ пользователя.""" async with async_session() as session: symbol = await session.scalar(select(User_Symbol.symbol).where(User_Symbol.tg_id == tg_id)) return symbol async def get_user_trades(tg_id): """Получить сделки пользователя.""" async with async_session() as session: query = select(USER_DEALS.symbol, USER_DEALS.side).where(USER_DEALS.tg_id == tg_id) result = await session.execute(query) trades = result.all() return trades async def get_entry_order_type(tg_id: object) -> str | None | Any: """Получить тип входного ордера пользователя.""" async with async_session() as session: order_type = await session.scalar( select(UMS.entry_order_type).where(UMS.tg_id == tg_id) ) # Если в базе не установлен тип — возвращаем значение по умолчанию return order_type or 'Market' # --- Функции обновления данных --- async def update_user_trades(tg_id, **kwargs): """Обновить сделки пользователя.""" async with async_session() as session: query = update(USER_DEALS).where(USER_DEALS.tg_id == tg_id).values(**kwargs) await session.execute(query) await session.commit() async def update_symbol(tg_id: int, symbol: str) -> None: """Обновить торговый символ пользователя.""" async with async_session() as session: await session.execute(update(User_Symbol).where(User_Symbol.tg_id == tg_id).values(symbol=symbol)) await session.commit() async def update_api_key(tg_id: int, api: str) -> None: """Обновить API ключ пользователя.""" async with async_session() as session: await session.execute(update(UBA).where(UBA.tg_id == tg_id).values(api_key=api)) await session.commit() async def update_secret_key(tg_id: int, api: str) -> None: """Обновить секретный ключ пользователя.""" async with async_session() as session: await session.execute(update(UBA).where(UBA.tg_id == tg_id).values(secret_key=api)) await session.commit() # --- Более мелкие обновления и запросы по настройкам --- async def update_trade_mode_user(tg_id, trading_mode) -> None: """Обновить режим торговли пользователя.""" async with async_session() as session: mode = await session.scalar(select(Trading_Mode.mode).where(Trading_Mode.mode == trading_mode)) if mode: logger.info("Изменён торговый режим для пользователя %s", tg_id) await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(trading_mode=mode)) await session.commit() async def delete_user_trade(tg_id: int, symbol: str): """Удалить сделку пользователя.""" async with async_session() as session: await session.execute( USER_DEALS.__table__.delete().where( (USER_DEALS.tg_id == tg_id) & (USER_DEALS.symbol == symbol) ) ) await session.commit() async def get_for_registration_trading_mode(): """Получить режим торговли по умолчанию.""" async with async_session() as session: mode = await session.scalar(select(Trading_Mode.mode).where(Trading_Mode.id == 1)) return mode async def get_for_registration_margin_type(): """Получить тип маржи по умолчанию.""" async with async_session() as session: type = await session.scalar(select(Margin_type.type).where(Margin_type.id == 1)) return type async def get_for_registration_trigger(tg_id): """Получить триггер по умолчанию.""" async with async_session() as session: trigger = await session.scalar(select(UCS.trigger).where(tg_id == tg_id)) return trigger async def get_user_main_settings(tg_id): """Получить основные настройки пользователя.""" async with async_session() as session: user = await session.scalar(select(UMS).where(UMS.tg_id == tg_id)) if user: logger.info("Получение основных настроек пользователя %s", tg_id) trading_mode = await session.scalar(select(UMS.trading_mode).where(UMS.tg_id == tg_id)) margin_mode = await session.scalar(select(UMS.margin_type).where(UMS.tg_id == tg_id)) switch_mode_enabled = await session.scalar(select(UMS.switch_mode_enabled).where(UMS.tg_id == tg_id)) switch_state = await session.scalar(select(UMS.switch_state).where(UMS.tg_id == tg_id)) size_leverage = await session.scalar(select(UMS.size_leverage).where(UMS.tg_id == tg_id)) starting_quantity = await session.scalar(select(UMS.starting_quantity).where(UMS.tg_id == tg_id)) martingale_factor = await session.scalar(select(UMS.martingale_factor).where(UMS.tg_id == tg_id)) maximal_quantity = await session.scalar(select(UMS.maximal_quantity).where(UMS.tg_id == tg_id)) entry_order_type = await session.scalar(select(UMS.entry_order_type).where(UMS.tg_id == tg_id)) limit_order_price = await session.scalar(select(UMS.limit_order_price).where(UMS.tg_id == tg_id)) martingale_step = await session.scalar(select(UMS.martingale_step).where(UMS.tg_id == tg_id)) data = { 'trading_mode': trading_mode, 'margin_type': margin_mode, 'switch_mode_enabled': switch_mode_enabled, 'switch_state': switch_state, 'size_leverage': size_leverage, 'starting_quantity': starting_quantity, 'martingale_factor': martingale_factor, 'maximal_quantity': maximal_quantity, 'entry_order_type': entry_order_type, 'limit_order_price': limit_order_price, 'martingale_step': martingale_step, } return data async def get_user_risk_management_settings(tg_id): """Получить риск-менеджмента настройки пользователя.""" async with async_session() as session: user = await session.scalar(select(URMS).where(URMS.tg_id == tg_id)) if user: logger.info("Получение риск-менеджмента настроек пользователя %s", tg_id) price_profit = await session.scalar(select(URMS.price_profit).where(URMS.tg_id == tg_id)) price_loss = await session.scalar(select(URMS.price_loss).where(URMS.tg_id == tg_id)) max_risk_deal = await session.scalar(select(URMS.max_risk_deal).where(URMS.tg_id == tg_id)) commission_fee = await session.scalar(select(URMS.commission_fee).where(URMS.tg_id == tg_id)) data = { 'price_profit': price_profit, 'price_loss': price_loss, 'max_risk_deal': max_risk_deal, 'commission_fee': commission_fee, } return data async def update_margin_type(tg_id, margin_type) -> None: """Обновить тип маржи пользователя.""" async with async_session() as session: type = await session.scalar(select(Margin_type.type).where(Margin_type.type == margin_type)) if type: logger.info("Изменен тип маржи %s", tg_id) await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(margin_type=type)) await session.commit() async def update_size_leverange(tg_id, num): """Обновить размер левеража пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(size_leverage=num)) await session.commit() async def update_starting_quantity(tg_id, num): """Обновить размер левеража пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(starting_quantity=num)) await session.commit() async def update_martingale_factor(tg_id, num): """Обновить размер левеража пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(martingale_factor=num)) await session.commit() async def update_maximal_quantity(tg_id, num): """Обновить размер левеража пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(maximal_quantity=num)) await session.commit() # ОБНОВЛЕНИЕ НАСТРОЕК РИСК-МЕНЕДЖМЕНТА async def update_price_profit(tg_id, num): """Обновить цену тейк-профита (прибыль) пользователя.""" async with async_session() as session: await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(price_profit=num)) await session.commit() async def update_price_loss(tg_id, num): """Обновить цену тейк-лосса (убыток) пользователя.""" async with async_session() as session: await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(price_loss=num)) await session.commit() async def update_max_risk_deal(tg_id, num): """Обновить максимальную сумму риска пользователя.""" async with async_session() as session: await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(max_risk_deal=num)) await session.commit() async def update_entry_order_type(tg_id, order_type): """Обновить тип входного ордера пользователя.""" async with async_session() as session: await session.execute( update(UMS) .where(UMS.tg_id == tg_id) .values(entry_order_type=order_type) ) await session.commit() async def get_limit_price(tg_id): """Получить лимитную цену пользователя как float, либо None.""" async with async_session() as session: result = await session.execute( select(UMS.limit_order_price) .where(UMS.tg_id == tg_id) ) price = result.scalar_one_or_none() if price: try: return float(price) except ValueError: return None return None async def update_limit_price(tg_id, price): """Обновить лимитную цену пользователя.""" async with async_session() as session: await session.execute( update(UMS) .where(UMS.tg_id == tg_id) .values(limit_order_price=str(price)) ) await session.commit() async def update_commission_fee(tg_id, num): """Обновить комиссию пользователя.""" async with async_session() as session: await session.execute(update(URMS).where(URMS.tg_id == tg_id).values(commission_fee=num)) await session.commit() async def get_user_timer(tg_id): """Получить данные о таймере пользователя.""" async with async_session() as session: result = await session.execute(select(UserTimer).where(UserTimer.tg_id == tg_id)) user_timer = result.scalars().first() if not user_timer: logging.info(f"No timer found for user {tg_id}") return None timer_minutes = user_timer.timer_minutes timer_start = user_timer.timer_start timer_end = user_timer.timer_end logging.info(f"Timer data for tg_id={tg_id}: " f"timer_minutes={timer_minutes}, " f"timer_start={timer_start}, " f"timer_end={timer_end}") remaining = None if timer_end: remaining = max(0, int((timer_end - datetime.utcnow()).total_seconds() // 60)) return { "timer_minutes": timer_minutes, "timer_start": timer_start, "timer_end": timer_end, "remaining_minutes": remaining } async def update_user_timer(tg_id, minutes: int): """Обновить данные о таймере пользователя.""" async with async_session() as session: try: timer_start = None timer_end = None if minutes > 0: timer_start = datetime.utcnow() timer_end = timer_start + timedelta(minutes=minutes) result = await session.execute(select(UserTimer).where(UserTimer.tg_id == tg_id)) user_timer = result.scalars().first() if user_timer: user_timer.timer_minutes = minutes user_timer.timer_start = timer_start user_timer.timer_end = timer_end else: user_timer = UserTimer( tg_id=tg_id, timer_minutes=minutes, timer_start=timer_start, timer_end=timer_end ) session.add(user_timer) await session.commit() except Exception as e: logging.error(f"Ошибка обновления таймера пользователя {tg_id}: {e}") async def get_martingale_step(tg_id): """Получить шаг мартингейла пользователя.""" async with async_session() as session: result = await session.execute(select(UMS).where(UMS.tg_id == tg_id)) user_settings = result.scalars().first() return user_settings.martingale_step async def update_martingale_step(tg_id, step): """Обновить шаг мартингейла пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(martingale_step=step)) await session.commit() async def update_switch_mode_enabled(tg_id, switch_mode): """Обновить режим переключения пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(switch_mode_enabled=switch_mode)) await session.commit() async def update_switch_state(tg_id, switch_state): """Обновить состояние переключения пользователя.""" async with async_session() as session: await session.execute(update(UMS).where(UMS.tg_id == tg_id).values(switch_state=switch_state)) await session.commit() async def update_trigger(tg_id, trigger): """Обновить триггер пользователя.""" async with async_session() as session: await session.execute(update(UCS).where(UCS.tg_id == tg_id).values(trigger=trigger)) await session.commit()