Soumik-404's picture
feat: add db apis
89157f5
Raw
History Blame Contribute Delete
1.91 kB
from __future__ import annotations
import asyncio
from typing import Any
from app.core.database.base import BaseExecutor, ConnectionConfig
from app.core.database.mysql import MySQLExecutor
from app.core.database.postgresql import PostgreSQLExecutor
from app.core.database.mongodb import MongoDBExecutor
from app.core.logger import get_logger
_logger = get_logger(__name__)
_EXECUTOR_MAP: dict[str, type[BaseExecutor]] = {
"mysql": MySQLExecutor,
"postgresql": PostgreSQLExecutor,
"mongodb": MongoDBExecutor,
}
class PoolManager:
def __init__(self) -> None:
self._executors: dict[str, BaseExecutor] = {}
self._lock = asyncio.Lock()
self._closed = False
async def get_executor(self, config: ConnectionConfig) -> BaseExecutor:
if self._closed:
raise RuntimeError("PoolManager has been shut down")
key = config.pool_key
async with self._lock:
if key not in self._executors:
executor_cls = _EXECUTOR_MAP.get(config.db_type)
if executor_cls is None:
raise ValueError(f"Unsupported database type: {config.db_type}")
self._executors[key] = executor_cls(config)
_logger.info(
"Created executor for %s (key=%s)", config.safe_repr, key,
)
return self._executors[key]
async def close_all(self) -> None:
async with self._lock:
if self._closed:
return
self._closed = True
for key, executor in self._executors.items():
try:
await executor.close()
except Exception as exc:
_logger.error("Error closing executor %s: %s", key, exc)
self._executors.clear()
_logger.info("All database executors closed")
pool_manager = PoolManager()