""" Database configuration for BackgroundFX Pro. Handles PostgreSQL, MongoDB, and other database connections. """ from dataclasses import dataclass, field from typing import Optional, Dict, Any from sqlalchemy import create_engine, Engine from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.pool import QueuePool, NullPool import pymongo from pymongo import MongoClient import logging logger = logging.getLogger(__name__) @dataclass class DatabaseConfig: """Database configuration settings.""" # PostgreSQL settings postgres_url: str = "postgresql://user:password@localhost/backgroundfx" postgres_pool_size: int = 20 postgres_max_overflow: int = 40 postgres_pool_timeout: int = 30 postgres_pool_recycle: int = 3600 postgres_echo: bool = False postgres_echo_pool: bool = False # MongoDB settings mongodb_url: str = "mongodb://localhost:27017/backgroundfx" mongodb_database: str = "backgroundfx" mongodb_max_pool_size: int = 50 mongodb_min_pool_size: int = 10 mongodb_max_idle_time: int = 60000 # milliseconds mongodb_server_selection_timeout: int = 30000 # milliseconds # Connection retry settings max_retries: int = 3 retry_delay: int = 1 # seconds # Query settings query_timeout: int = 30 # seconds slow_query_threshold: float = 1.0 # seconds log_slow_queries: bool = True # Migration settings auto_migrate: bool = False migration_directory: str = "migrations" class PostgreSQLManager: """Manages PostgreSQL database connections.""" def __init__(self, config: DatabaseConfig): self.config = config self._engine: Optional[Engine] = None self._session_factory: Optional[sessionmaker] = None @property def engine(self) -> Engine: """Get or create database engine.""" if self._engine is None: self._engine = self._create_engine() return self._engine @property def session_factory(self) -> sessionmaker: """Get or create session factory.""" if self._session_factory is None: self._session_factory = sessionmaker( bind=self.engine, expire_on_commit=False ) return self._session_factory def _create_engine(self) -> Engine: """Create SQLAlchemy engine with connection pooling.""" # Determine pool class based on environment if self.config.postgres_pool_size == 0: poolclass = NullPool pool_kwargs = {} else: poolclass = QueuePool pool_kwargs = { 'pool_size': self.config.postgres_pool_size, 'max_overflow': self.config.postgres_max_overflow, 'pool_timeout': self.config.postgres_pool_timeout, 'pool_recycle': self.config.postgres_pool_recycle, } engine = create_engine( self.config.postgres_url, poolclass=poolclass, echo=self.config.postgres_echo, echo_pool=self.config.postgres_echo_pool, pool_pre_ping=True, # Verify connections before using **pool_kwargs ) logger.info(f"PostgreSQL engine created with pool size: {self.config.postgres_pool_size}") return engine def get_session(self) -> Session: """Get a new database session.""" return self.session_factory() def healthcheck(self) -> bool: """Check database health.""" try: with self.engine.connect() as conn: result = conn.execute("SELECT 1") return result.scalar() == 1 except Exception as e: logger.error(f"PostgreSQL healthcheck failed: {e}") return False def close(self): """Close all database connections.""" if self._engine: self._engine.dispose() logger.info("PostgreSQL connections closed") class MongoDBManager: """Manages MongoDB connections.""" def __init__(self, config: DatabaseConfig): self.config = config self._client: Optional[MongoClient] = None self._database = None @property def client(self) -> MongoClient: """Get or create MongoDB client.""" if self._client is None: self._client = self._create_client() return self._client @property def database(self): """Get MongoDB database.""" if self._database is None: self._database = self.client[self.config.mongodb_database] return self._database def _create_client(self) -> MongoClient: """Create MongoDB client with connection pooling.""" client = MongoClient( self.config.mongodb_url, maxPoolSize=self.config.mongodb_max_pool_size, minPoolSize=self.config.mongodb_min_pool_size, maxIdleTimeMS=self.config.mongodb_max_idle_time, serverSelectionTimeoutMS=self.config.mongodb_server_selection_timeout, retryWrites=True, retryReads=True ) # Verify connection client.admin.command('ping') logger.info(f"MongoDB client created for database: {self.config.mongodb_database}") return client def get_collection(self, name: str): """Get a MongoDB collection.""" return self.database[name] def healthcheck(self) -> bool: """Check MongoDB health.""" try: self.client.admin.command('ping') return True except Exception as e: logger.error(f"MongoDB healthcheck failed: {e}") return False def close(self): """Close MongoDB connection.""" if self._client: self._client.close() logger.info("MongoDB connection closed") class DatabaseManager: """Central database manager for all database connections.""" def __init__(self, config: DatabaseConfig): self.config = config self.postgres = PostgreSQLManager(config) self.mongodb = MongoDBManager(config) def initialize(self): """Initialize all database connections.""" logger.info("Initializing database connections...") # Initialize PostgreSQL if self.postgres.healthcheck(): logger.info("PostgreSQL connection established") else: logger.error("Failed to connect to PostgreSQL") # Initialize MongoDB if self.mongodb.healthcheck(): logger.info("MongoDB connection established") else: logger.error("Failed to connect to MongoDB") def healthcheck(self) -> Dict[str, bool]: """Check health of all databases.""" return { 'postgresql': self.postgres.healthcheck(), 'mongodb': self.mongodb.healthcheck() } def close_all(self): """Close all database connections.""" self.postgres.close() self.mongodb.close() logger.info("All database connections closed") def get_stats(self) -> Dict[str, Any]: """Get database statistics.""" stats = {} # PostgreSQL stats if self.postgres._engine: pool = self.postgres.engine.pool stats['postgresql'] = { 'size': pool.size() if hasattr(pool, 'size') else None, 'checked_in': pool.checkedin() if hasattr(pool, 'checkedin') else None, 'overflow': pool.overflow() if hasattr(pool, 'overflow') else None, 'total': pool.total() if hasattr(pool, 'total') else None } # MongoDB stats if self.mongodb._client: stats['mongodb'] = self.mongodb.client.server_info() return stats def create_database_config(settings: Any) -> DatabaseConfig: """ Create database configuration from settings. Args: settings: Application settings Returns: DatabaseConfig instance """ return DatabaseConfig( postgres_url=settings.database_url, postgres_pool_size=settings.database_pool_size, postgres_max_overflow=settings.database_max_overflow, postgres_echo=settings.database_echo, mongodb_url=settings.mongodb_url, mongodb_database=settings.mongodb_database ) # Global database manager instance db_manager: Optional[DatabaseManager] = None def initialize_databases(settings: Any): """ Initialize global database manager. Args: settings: Application settings """ global db_manager config = create_database_config(settings) db_manager = DatabaseManager(config) db_manager.initialize() return db_manager def get_db_manager() -> DatabaseManager: """Get global database manager instance.""" if db_manager is None: raise RuntimeError("Database manager not initialized") return db_manager