Added new functions and documentation

This commit is contained in:
algizn97
2025-08-26 19:36:11 +05:00
parent 8bc4c634fe
commit 07df16dbe9
2 changed files with 585 additions and 246 deletions

View File

@@ -1,9 +1,11 @@
import asyncio
import functools
import nest_asyncio
import time
import logging.config
from pybit import exceptions
from pybit.unified_trading import HTTP
from pybit.unified_trading import HTTP, WebSocket
from websocket import WebSocketConnectionClosedException
from logger_helper.logger_helper import LOGGING_CONFIG
import app.services.Bybit.functions.price_symbol as price_symbol
@@ -14,47 +16,166 @@ import app.telegram.Keyboards.inline_keyboards as inline_markup
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("futures")
active_start_tasks = {}
active_close_tasks = {}
nest_asyncio.apply()
def safe_float(val):
def safe_float(val) -> float:
"""
Безопасное преобразование значения в float.
Возвращает 0.0, если значение None, пустое или некорректное.
"""
try:
if val is None or val == '':
return 0.0
return float(val)
except (ValueError, TypeError):
logger.error("Некорректное значение для преобразования в float")
return 0.0
async def info_access_open_deal(message, symbol, trade_mode, margin_mode, leverage, qty, tp, sl, entry_price, limit_price, order_type):
def parse_pnl_from_msg(msg) -> float:
"""
Извлекает реализованную прибыль/убыток из сообщения.
"""
try:
return float(msg.get('realisedPnl', 0))
except Exception as e:
logger.error(f"Ошибка при извлечении реализованной прибыли: {e}")
return 0.0
async def handle_execution_message(message, msg: dict) -> None:
"""
Обработчик сообщений об исполнении сделки.
Логирует событие и проверяет условия для мартингейла и TP.
"""
logger.info(f"Исполнена сделка: {msg}")
await message.answer(f"Исполнена сделка: {msg}")
pnl = parse_pnl_from_msg(msg)
tg_id = message.from_user.id
data_main_stgs = await rq.get_user_main_settings(tg_id)
take_profit_percent = safe_float(data_main_stgs.get('take_profit_percent', 2))
symbol = await rq.get_symbol(tg_id)
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
client = HTTP(api_key=api_key, api_secret=api_secret)
positions_resp = client.get_positions(category='linear', symbol=symbol)
positions_list = positions_resp.get('result', {}).get('list', [])
position = positions_list[0] if positions_list else None
liquidation_threshold = -100
if pnl <= liquidation_threshold:
current_step = int(await rq.get_martingale_step(tg_id))
current_step += 1
await rq.update_martingale_step(tg_id, current_step)
side = 'Buy' if position and position.get('side', '').lower() == 'long' else 'Sell'
margin_mode = data_main_stgs.get('margin_type', 'Isolated')
await open_position(tg_id, message, side=side, margin_mode=margin_mode)
elif position:
entry_price = safe_float(position.get('avgPrice'))
side = position.get('side', '')
current_price = float(position.get('markPrice', 0))
if side.lower() == 'long':
take_profit_trigger_price = entry_price * (1 + take_profit_percent / 100)
if current_price >= take_profit_trigger_price:
await close_user_trade(tg_id, symbol, message)
await rq.update_martingale_step(tg_id, 0)
elif side.lower() == 'short':
take_profit_trigger_price = entry_price * (1 - take_profit_percent / 100)
if current_price <= take_profit_trigger_price:
await close_user_trade(tg_id, symbol, message)
await rq.update_martingale_step(tg_id, 0)
async def start_execution_ws(tg_id, message) -> None:
"""
Запускает WebSocket для отслеживания исполнения сделок в режиме реального времени.
Переподключается при ошибках.
"""
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
reconnect_delay = 5
while True:
try:
ws = WebSocket(api_key=api_key,
api_secret=api_secret,
testnet=False,
channel_type="private")
async def on_execution(msg):
await handle_execution_message(message, msg)
def on_execution_sync(msg):
asyncio.create_task(on_execution(msg))
ws.execution_stream(on_execution_sync)
while True:
await asyncio.sleep(1)
except WebSocketConnectionClosedException:
logging.warning("WebSocket закрыт, переподключение через 5 секунд...")
await asyncio.sleep(reconnect_delay)
except Exception as e:
logging.error(f"Ошибка WebSocket: {e}")
await asyncio.sleep(reconnect_delay)
async def info_access_open_deal(message, symbol, trade_mode, margin_mode, leverage, qty, tp, sl, entry_price,
limit_price, order_type) -> None:
"""
Отправляет сообщение об успешном открытии позиции или выставлении лимитного ордера.
"""
human_margin_mode = 'Isolated' if margin_mode == 'ISOLATED_MARGIN' else 'Cross'
text = f'''{'Позиция была успешна открыта' if order_type == 'Market' else 'Лимитный ордер установлен'}!
Торговая пара: {symbol}
Цена входа: {entry_price if order_type == 'Market' else round(limit_price, 5)}
Движение: {trade_mode}
Тип-маржи: {human_margin_mode}
Кредитное плечо: {leverage}x
Количество: {qty}
Тейк-профит: {round(tp, 5)}
Стоп-лосс: {round(sl, 5)}
'''
text = (
f"{'Позиция была успешна открыта' if order_type == 'Market' else 'Лимитный ордер установлен'}!\n"
f"Торговая пара: {symbol}\n"
f"Цена входа: {entry_price if order_type == 'Market' else round(limit_price, 5)}\n"
f"Движение: {trade_mode}\n"
f"Тип-маржи: {human_margin_mode}\n"
f"Кредитное плечо: {leverage}x\n"
f"Количество: {qty}\n"
f"Тейк-профит: {round(tp, 5)}\n"
f"Стоп-лосс: {round(sl, 5)}\n"
)
await message.answer(text=text, parse_mode='html', reply_markup=inline_markup.create_close_deal_markup(symbol))
async def error_max_step(message):
async def error_max_step(message) -> None:
"""
Сообщение об ошибке превышения максимального количества шагов мартингейла.
"""
logger.error('Сделка не была совершена, превышен лимит максимального количества ставок в серии.')
await message.answer('Сделка не была совершена, превышен лимит максимального количества ставок в серии.',
reply_markup=inline_markup.back_to_main)
async def error_max_risk(message):
async def error_max_risk(message) -> None:
"""
Сообщение об ошибке превышения риск-лимита сделки.
"""
logger.error('Сделка не была совершена, риск убытка превышает допустимый лимит.')
await message.answer('Сделка не была совершена, риск убытка превышает допустимый лимит.',
reply_markup=inline_markup.back_to_main)
async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='Full'):
"""
Открывает позицию на Bybit с учётом настроек пользователя, маржи, размера лота, платформы и риска.
Возвращает True при успехе, False при ошибках открытия ордера, None при исключениях.
"""
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
symbol = await rq.get_symbol(tg_id)
@@ -91,7 +212,6 @@ async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='
max_risk_percent = safe_float(data_risk_stgs.get('max_risk_deal'))
loss_profit = safe_float(data_risk_stgs.get('price_loss'))
take_profit = safe_float(data_risk_stgs.get('price_profit'))
commission_fee = safe_float(data_risk_stgs.get('commission_fee', 0))
positions_resp = client.get_positions(category='linear', symbol=symbol)
positions_list = positions_resp.get('result', {}).get('list', [])
@@ -173,7 +293,7 @@ async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='
await message.answer(
f"Сумма ордера слишком мала: {order_value:.2f} USDT. "
f"Минимум для торговли — {min_order_value} USDT. "
f"Пожалуйста, увеличьте количество позиций.")
f"Пожалуйста, увеличьте количество позиций.", reply_markup=inline_markup.back_to_main)
return False
leverage = int(data_main_stgs.get('size_leverage', 1))
@@ -186,7 +306,6 @@ async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='
)
except exceptions.InvalidRequestError as e:
if "110043" in str(e):
# Плечо уже установлено с таким значением, можем игнорировать
logger.info(f"Leverage already set to {leverage} for {symbol}")
else:
raise e
@@ -206,7 +325,7 @@ async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='
category='linear',
symbol=symbol,
side=side,
orderType=order_type, # Market или Limit
orderType=order_type,
qty=str(next_quantity),
price=str(limit_price) if order_type == 'Limit' and limit_price else None,
takeProfit=str(take_profit_price),
@@ -242,6 +361,9 @@ async def open_position(tg_id, message, side: str, margin_mode: str, tpsl_mode='
async def trading_cycle(tg_id, message):
"""
Цикл торговой логики с учётом таймера пользователя.
"""
try:
timer_data = await rq.get_user_timer(tg_id)
timer_min = 0
@@ -264,47 +386,210 @@ async def trading_cycle(tg_id, message):
logger.info(f"Торговый цикл для пользователя {tg_id} был отменён.")
async def fetch_positions_async(client, symbol):
loop = asyncio.get_running_loop()
# запускаем блокирующий вызов get_positions в отдельном потоке
return await loop.run_in_executor(None, functools.partial(client.get_positions, category='linear', symbol=symbol))
async def set_take_profit_stop_loss(tg_id: int, message, take_profit_price: float, stop_loss_price: float,
tpsl_mode='Full'):
"""
Устанавливает уровни Take Profit и Stop Loss для открытой позиции.
"""
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
symbol = await rq.get_symbol(tg_id)
data_main_stgs = await rq.get_user_main_settings(tg_id)
order_type = data_main_stgs.get('entry_order_type')
starting_quantity = safe_float(data_main_stgs.get('starting_quantity'))
async def get_active_positions(message, api_key, secret_key):
client = HTTP(
api_key=api_key,
api_secret=secret_key
)
instruments_resp = client.get_instruments_info(category='linear')
if instruments_resp.get('retCode') != 0:
return []
symbols = [item['symbol'] for item in instruments_resp.get('result', {}).get('list', [])]
limit_price = None
if order_type == 'Limit':
limit_price = await rq.get_limit_price(tg_id)
active_positions = []
data_risk_stgs = await rq.get_user_risk_management_settings(tg_id)
trading_mode = data_main_stgs.get('trading_mode')
for sym in symbols:
side = None
if trading_mode == 'Long':
side = 'Buy'
elif trading_mode == 'Short':
side = 'Sell'
if side is None:
await message.answer("Не удалось определить сторону сделки.")
return
client = HTTP(api_key=api_key, api_secret=secret_key)
await cancel_all_tp_sl_orders(client, symbol)
try:
try:
resp = await fetch_positions_async(client, sym)
if resp.get('retCode') == 0:
positions = resp.get('result', {}).get('list', [])
for pos in positions:
if pos.get('size') and safe_float(pos['size']) > 0:
active_positions.append(pos)
except Exception as e:
logger.error(f"Ошибка при получении позиций: {e}")
await message.answer('⚠️ Ошибка при получении позиций', reply_markup=inline_markup.back_to_main)
client.set_tp_sl_mode(symbol=symbol, category='linear', tpSlMode=tpsl_mode)
except exceptions.InvalidRequestError as e:
if 'same tp sl mode' in str(e).lower():
logger.info(f"Режим TP/SL уже установлен для {symbol}")
else:
raise
return active_positions
positions_resp = client.get_positions(category='linear', symbol=symbol)
positions = positions_resp.get('result', {}).get('list', [])
if not positions or abs(float(positions[0].get('size', 0))) == 0:
params = dict(
category='linear',
symbol=symbol,
side=side,
orderType=order_type,
qty=str(starting_quantity),
timeInForce='GTC',
orderLinkId=f"deal_{symbol}_{int(time.time())}",
takeProfit=str(take_profit_price),
stopLoss=str(stop_loss_price),
tpOrderType='Limit' if tpsl_mode == 'Partial' else 'Market',
slOrderType='Limit' if tpsl_mode == 'Partial' else 'Market',
tpslMode=tpsl_mode
)
if order_type == 'Limit' and limit_price is not None:
params['price'] = str(limit_price)
if tpsl_mode == 'Partial':
params['tpLimitPrice'] = str(take_profit_price)
params['slLimitPrice'] = str(stop_loss_price)
response = client.place_order(**params)
if response.get('retCode') != 0:
await message.answer(f"Ошибка создания ордера с TP/SL: {response.get('retMsg')}",
reply_markup=inline_markup.back_to_main)
return
else:
resp = client.set_trading_stop(
category='linear',
symbol=symbol,
takeProfit=str(round(take_profit_price, 5)),
stopLoss=str(round(stop_loss_price, 5)),
tpTriggerBy='LastPrice',
slTriggerBy='LastPrice',
reduceOnly=False
)
if resp.get('retCode') != 0:
await message.answer(f"Ошибка обновления TP/SL: {resp.get('retMsg')}",
reply_markup=inline_markup.back_to_main)
return
await message.answer(
f"ТП и СЛ успешно установлены:\nТейк-профит: {take_profit_price:.5f}\nСтоп-лосс: {stop_loss_price:.5f}",
reply_markup=inline_markup.back_to_main)
except Exception as e:
logger.error(f"Ошибка установки TP/SL для {symbol}: {e}", exc_info=True)
await message.answer("Произошла ошибка при установке TP и SL.", reply_markup=inline_markup.back_to_main)
async def close_user_trade(tg_id: int, symbol: str, message=None) -> bool:
async def cancel_all_tp_sl_orders(tg_id, symbol):
"""
Отменяет все открытые ордера TP/SL для указанного символа.
"""
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
client = HTTP(api_key=api_key, api_secret=secret_key)
last_response = None
try:
orders_resp = client.get_open_orders(category='linear', symbol=symbol)
orders = orders_resp.get('result', {}).get('list', [])
for order in orders:
order_id = order.get('orderId')
cancel_resp = client.cancel_order(category='linear', symbol=symbol, orderId=order_id)
last_response = cancel_resp
if cancel_resp.get('retCode') != 0:
logger.warning(f"Не удалось отменить ордер {order_id}: {cancel_resp.get('retMsg')}")
except Exception as e:
logger.error(f"Ошибка при отмене ордеров TP/SL: {e}")
return last_response
async def get_active_positions_by_symbol(tg_id, message):
"""
Показывает активные позиции пользователя по символу.
"""
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
client = HTTP(api_key=api_key, api_secret=secret_key)
symbol = await rq.get_symbol(tg_id)
active_positions = client.get_positions(category='linear', symbol=symbol)
positions = active_positions.get('result', {}).get('list', [])
pos = positions[0] if positions else None
if float(pos.get('size', 0)) == 0:
await message.answer("❗️ У вас нет активных позиций.", reply_markup=inline_markup.back_to_main)
return
text = (
f"Торговая пара: {pos.get('symbol')}\n"
f"Цена входа: {pos.get('avgPrice')}\n"
f"Движение: {pos.get('side')}\n"
f"Кредитное плечо: {pos.get('leverage')}x\n"
f"Количество: {pos.get('size')}\n"
f"Тейк-профит: {pos.get('takeProfit')}\n"
f"Стоп-лосс: {pos.get('stopLoss')}\n"
)
await message.answer(text, reply_markup=inline_markup.create_close_deal_markup(symbol))
async def get_active_orders_by_symbol(tg_id, message):
"""
Показывает активные лимитные ордера пользователя по символу.
"""
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
symbol = await rq.get_symbol(tg_id)
client = HTTP(api_key=api_key, api_secret=secret_key)
active_orders = client.get_open_orders(category='linear', symbol=symbol)
limit_orders = [
order for order in active_orders.get('result', {}).get('list', [])
if order.get('orderType') == 'Limit'
]
if not limit_orders:
await message.answer("Нет активных лимитных ордеров по данной торговой паре.",
reply_markup=inline_markup.back_to_main)
return
texts = []
for order in limit_orders:
text = (
f"Торговая пара: {order.get('symbol')}\n"
f"Тип ордера: {order.get('orderType')}\n"
f"Сторона: {order.get('side')}\n"
f"Цена: {order.get('price')}\n"
f"Количество: {order.get('qty')}\n"
f"Тейк-профит: {order.get('takeProfit')}\n"
f"Стоп-лосс: {order.get('stopLoss')}\n"
f"Кредитное плечо: {order.get('leverage')}\n"
)
texts.append(text)
await message.answer("\n\n".join(texts), reply_markup=inline_markup.create_close_deal_markup(symbol))
async def close_user_trade(tg_id: int, symbol: str, message):
"""
Закрывает открытые позиции пользователя по символу рыночным ордером.
Возвращает True при успехе, False при ошибках.
"""
try:
api_key = await rq.get_bybit_api_key(tg_id)
secret_key = await rq.get_bybit_secret_key(tg_id)
data_risk_stgs = await rq.get_user_risk_management_settings(tg_id)
limit_price = await rq.get_limit_price(tg_id)
include_fee = data_risk_stgs.get('commission_fee', 'Нет') == 'Да'
client = HTTP(api_key=api_key, api_secret=secret_key)
# Получаем текущие открытые позиции
positions_resp = client.get_positions(category="linear", symbol=symbol)
if positions_resp.get('retCode') != 0:
return False
positions_list = positions_resp.get('result', {}).get('list', [])
@@ -318,24 +603,15 @@ async def close_user_trade(tg_id: int, symbol: str, message=None) -> bool:
if qty == 0:
return False
# Получаем настройки пользователя
data_main_stgs = await rq.get_user_main_settings(tg_id)
order_type = data_main_stgs.get('entry_order_type')
limit_price = await rq.get_limit_price(tg_id)
# Получаем открытые ордера
orders = client.get_open_orders(category='linear', symbol=symbol)
cancel_resp = await cancel_all_tp_sl_orders(tg_id, symbol)
open_orders_list = orders.get('result', {}).get('list', [])
order_id = open_orders_list[0].get('orderId') if open_orders_list else None
close_side = "Sell" if side == "Buy" else "Buy"
# Получаем текущую цену
ticker_resp = client.get_tickers(category="linear", symbol=symbol)
current_price = 0.0
if ticker_resp.get('retCode') == 0:
result = ticker_resp.get('result', {})
# поддержать оба варианта: result это dict с key 'list', или list
ticker_list = []
if isinstance(result, dict):
ticker_list = result.get('list', [])
@@ -344,24 +620,6 @@ async def close_user_trade(tg_id: int, symbol: str, message=None) -> bool:
if ticker_list:
current_price = float(ticker_list[0].get('lastPrice', 0.0))
if order_type == 'Limit':
# Если есть открытый лимитный ордер отменяем его
if order_id:
cancel_resp = client.cancel_order(category='linear', symbol=symbol, orderId=order_id)
if cancel_resp.get('retCode') != 0:
if message:
await message.answer("Ошибка при отмене лимитного ордера.",
reply_markup=inline_markup.back_to_main)
return False
# Можно здесь добавить логику выставления лимитного ордера на закрытие, если нужно
# В текущем коде отсутствует
if message:
await message.answer(f"Лимитный ордер отменён, позиция не закрыта автоматически.",
reply_markup=inline_markup.back_to_main)
return False
else:
# Рыночный ордер для закрытия позиции
place_resp = client.place_order(
category="linear",
symbol=symbol,
@@ -371,19 +629,39 @@ async def close_user_trade(tg_id: int, symbol: str, message=None) -> bool:
timeInForce="GTC",
reduceOnly=True
)
if place_resp.get('retCode', -1) == 0:
if message:
pnl = (current_price - entry_price) * qty if side == "Buy" else (entry_price - current_price) * qty
pnl_percent = (pnl / (entry_price * qty)) * 100 if entry_price * qty > 0 else 0
text = (f"Сделка {symbol} успешно закрыта.\n"
f"Цена входа: {entry_price if entry_price else limit_price}\n"
f"Цена закрытия: {current_price}\n"
f"Результат: {pnl:.4f} USDT ({pnl_percent:.2f}%)")
await message.answer(text, reply_markup=inline_markup.back_to_main)
trade_fee = 0
try:
trades_resp = client.get_closed_pnl(category="linear", symbol=symbol)
if trades_resp.get('retCode') == 0:
trades = trades_resp.get('result', {}).get('list', [])
for trade in trades:
if trade.get('orderId') == order_id:
trade_fee += float(trade.get('execFee', 0))
except Exception as e:
logger.error(f"Ошибка при получении сделок: {e}")
trade_fee = 0
pnl = (current_price - entry_price) * qty if side == "Buy" else (entry_price - current_price) * qty
if include_fee:
pnl -= trade_fee
pnl_percent = (pnl / (entry_price * qty)) * 100 if entry_price * qty > 0 else 0
text = (
f"Сделка {symbol} успешно закрыта.\n"
f"Цена входа: {entry_price if entry_price else limit_price}\n"
f"Цена закрытия: {current_price}\n"
f"Прибыль: {pnl:.4f} USDT ({pnl_percent:.2f}%)\n"
f"{'Включая комиссию биржи' if include_fee else 'Без учета комиссии'}"
)
await message.answer(text)
return True
else:
if message:
await message.answer(f"Ошибка закрытия сделки {symbol}.", reply_markup=inline_markup.back_to_main)
await message.answer(f"Ошибка закрытия сделки {symbol}.",
reply_markup=inline_markup.back_to_main)
return False
except Exception as e:
@@ -394,9 +672,12 @@ async def close_user_trade(tg_id: int, symbol: str, message=None) -> bool:
async def close_trade_after_delay(tg_id: int, message, symbol: str, delay_sec: int):
"""
Закрывает сделку пользователя после задержки delay_sec секунд.
"""
try:
await asyncio.sleep(delay_sec)
result = await close_user_trade(tg_id, symbol)
result = await close_user_trade(tg_id, symbol, message)
if result:
await message.answer(f"Сделка {symbol} успешно закрыта по таймеру.",
reply_markup=inline_markup.back_to_main)
@@ -405,11 +686,3 @@ async def close_trade_after_delay(tg_id: int, message, symbol: str, delay_sec: i
reply_markup=inline_markup.back_to_main)
except asyncio.CancelledError:
await message.answer(f"Закрытие сделки {symbol} по таймеру отменено.", reply_markup=inline_markup.back_to_main)
finally:
active_close_tasks.pop(tg_id, None)
def get_positive_percent(negative_percent: float, manual_positive_percent: float | None) -> float:
if manual_positive_percent and manual_positive_percent > 0:
return manual_positive_percent
return abs(negative_percent)