| | from __future__ import annotations |
| | from typing import TYPE_CHECKING |
| | from uuid import uuid4 |
| |
|
| | from asyncpg import Connection |
| | from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine |
| |
|
| | from bot.core.config import settings |
| |
|
| | if TYPE_CHECKING: |
| | from sqlalchemy.engine.url import URL |
| |
|
| |
|
| | class CConnection(Connection): |
| | def _get_unique_id(self, prefix: str) -> str: |
| | return f"__asyncpg_{prefix}_{uuid4()}__" |
| |
|
| |
|
| | def get_engine(url: URL | str = settings.database_url) -> AsyncEngine: |
| | return create_async_engine( |
| | url=url, |
| | echo=settings.DEBUG, |
| | pool_size=0, |
| | connect_args={ |
| | "connection_class": CConnection, |
| | }, |
| | ) |
| |
|
| |
|
| | def get_sessionmaker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: |
| | return async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False) |
| |
|
| |
|
| | db_url = settings.database_url |
| | engine = get_engine(url=db_url) |
| | sessionmaker = get_sessionmaker(engine) |
| |
|