from __future__ import annotations import asyncio from typing import Any from app.core.database.base import BaseExecutor, ConnectionConfig from app.core.database.mysql import MySQLExecutor from app.core.database.postgresql import PostgreSQLExecutor from app.core.database.mongodb import MongoDBExecutor from app.core.logger import get_logger _logger = get_logger(__name__) _EXECUTOR_MAP: dict[str, type[BaseExecutor]] = { "mysql": MySQLExecutor, "postgresql": PostgreSQLExecutor, "mongodb": MongoDBExecutor, } class PoolManager: def __init__(self) -> None: self._executors: dict[str, BaseExecutor] = {} self._lock = asyncio.Lock() self._closed = False async def get_executor(self, config: ConnectionConfig) -> BaseExecutor: if self._closed: raise RuntimeError("PoolManager has been shut down") key = config.pool_key async with self._lock: if key not in self._executors: executor_cls = _EXECUTOR_MAP.get(config.db_type) if executor_cls is None: raise ValueError(f"Unsupported database type: {config.db_type}") self._executors[key] = executor_cls(config) _logger.info( "Created executor for %s (key=%s)", config.safe_repr, key, ) return self._executors[key] async def close_all(self) -> None: async with self._lock: if self._closed: return self._closed = True for key, executor in self._executors.items(): try: await executor.close() except Exception as exc: _logger.error("Error closing executor %s: %s", key, exc) self._executors.clear() _logger.info("All database executors closed") pool_manager = PoolManager()