Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| from typing import Any | |
| from app.core.database.base import BaseExecutor, ConnectionConfig | |
| from app.core.database.mysql import MySQLExecutor | |
| from app.core.database.postgresql import PostgreSQLExecutor | |
| from app.core.database.mongodb import MongoDBExecutor | |
| from app.core.logger import get_logger | |
| _logger = get_logger(__name__) | |
| _EXECUTOR_MAP: dict[str, type[BaseExecutor]] = { | |
| "mysql": MySQLExecutor, | |
| "postgresql": PostgreSQLExecutor, | |
| "mongodb": MongoDBExecutor, | |
| } | |
| class PoolManager: | |
| def __init__(self) -> None: | |
| self._executors: dict[str, BaseExecutor] = {} | |
| self._lock = asyncio.Lock() | |
| self._closed = False | |
| async def get_executor(self, config: ConnectionConfig) -> BaseExecutor: | |
| if self._closed: | |
| raise RuntimeError("PoolManager has been shut down") | |
| key = config.pool_key | |
| async with self._lock: | |
| if key not in self._executors: | |
| executor_cls = _EXECUTOR_MAP.get(config.db_type) | |
| if executor_cls is None: | |
| raise ValueError(f"Unsupported database type: {config.db_type}") | |
| self._executors[key] = executor_cls(config) | |
| _logger.info( | |
| "Created executor for %s (key=%s)", config.safe_repr, key, | |
| ) | |
| return self._executors[key] | |
| async def close_all(self) -> None: | |
| async with self._lock: | |
| if self._closed: | |
| return | |
| self._closed = True | |
| for key, executor in self._executors.items(): | |
| try: | |
| await executor.close() | |
| except Exception as exc: | |
| _logger.error("Error closing executor %s: %s", key, exc) | |
| self._executors.clear() | |
| _logger.info("All database executors closed") | |
| pool_manager = PoolManager() | |