1
0
forked from kodorvan/stcs
Files
stcs/BybitBot_API.py

74 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging.config
from aiogram import Bot, Dispatcher
import app.services.Bybit.functions.bybit_ws as bybit_ws
import app.telegram.database.requests as rq
from app.telegram.database.models import async_main
from app.telegram.handlers.handlers import router
from app.telegram.functions.main_settings.settings import router_main_settings
from app.telegram.functions.risk_management_settings.settings import router_risk_management_settings
from app.telegram.functions.condition_settings.settings import condition_settings_router
from app.services.Bybit.functions.Add_Bybit_API import router_register_bybit_api
from app.services.Bybit.functions.functions import router_functions_bybit_trade
from logger_helper.logger_helper import LOGGING_CONFIG
from config import TOKEN_TG_BOT_1
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("main")
bot = Bot(token=TOKEN_TG_BOT_1)
dp = Dispatcher()
def get_or_create_event_loop() -> asyncio.AbstractEventLoop:
"""
Возвращает текущий активный цикл событий asyncio или создает новый, если его нет.
"""
try:
return asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
loop = get_or_create_event_loop()
bybit_ws.set_event_loop(loop)
async def run_ws_for_user(tg_id, message) -> None:
"""
Запускает WebSocket Bybit для пользователя с указанным tg_id.
"""
api_key = await rq.get_bybit_api_key(tg_id)
api_secret = await rq.get_bybit_secret_key(tg_id)
await bybit_ws.start_execution_ws(api_key, api_secret, message)
async def main() -> None:
"""
Основная асинхронная функция запуска бота:
"""
await async_main()
dp.include_router(router)
dp.include_router(router_main_settings)
dp.include_router(router_risk_management_settings)
dp.include_router(condition_settings_router)
dp.include_router(router_register_bybit_api)
dp.include_router(router_functions_bybit_trade)
await dp.start_polling(bot)
if __name__ == '__main__':
try:
logger.info("Bot is on")
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Bot is off")