import os from contextlib import contextmanager from sqlalchemy import create_engine, event from sqlalchemy.orm import sessionmaker, Session from sqlalchemy.pool import NullPool, QueuePool from typing import Generator, Optional import logging logger = logging.getLogger(__name__) class DatabaseManager: """Manager para conexão e operações com PostgreSQL.""" def __init__(self, database_url: Optional[str] = None): """ Inicializa o DatabaseManager. Args: database_url: String de conexão PostgreSQL. Se None, usa variável de ambiente. """ self.database_url = database_url or os.getenv( 'DATABASE_URL', 'postgresql://user:password@localhost:5432/para_ai' ) self.engine = None self.SessionLocal = None self._initialize_engine() def _initialize_engine(self): """Inicializa a engine SQLAlchemy com pooling otimizado.""" try: # Configuração de pool para produção pool_size = int(os.getenv('DB_POOL_SIZE', 20)) max_overflow = int(os.getenv('DB_MAX_OVERFLOW', 40)) self.engine = create_engine( self.database_url, poolclass=QueuePool, pool_size=pool_size, max_overflow=max_overflow, pool_pre_ping=True, # Verifica conexão antes de usar echo=os.getenv('SQL_ECHO', 'false').lower() == 'true', connect_args={ 'connect_timeout': 10, 'application_name': 'para_ai' } ) self.SessionLocal = sessionmaker( bind=self.engine, autocommit=False, autoflush=False, expire_on_commit=False ) # Event listener para logging de conexões @event.listens_for(self.engine, "connect") def receive_connect(dbapi_conn, connection_record): logger.debug(f"Conexão PostgreSQL estabelecida: {dbapi_conn}") logger.info("DatabaseManager inicializado com sucesso") except Exception as e: logger.error(f"Erro ao inicializar engine: {e}") raise @contextmanager def get_session(self) -> Generator[Session, None, None]: """ Context manager para obter uma sessão do banco. Yields: Session: Sessão SQLAlchemy. """ session = self.SessionLocal() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"Erro na transação: {e}") raise finally: session.close() def health_check(self) -> bool: """ Verifica se a conexão com banco está ok. Returns: bool: True se conexão ok, False caso contrário. """ try: with self.get_session() as session: session.execute("SELECT 1") logger.info("Health check OK") return True except Exception as e: logger.error(f"Health check FAILED: {e}") return False def close(self): """Fecha todas as conexões da pool.""" if self.engine: self.engine.dispose() logger.info("Engine disposed") def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() # Instância global _db_manager: Optional[DatabaseManager] = None def get_db_manager() -> DatabaseManager: """Retorna a instância global do DatabaseManager.""" global _db_manager if _db_manager is None: _db_manager = DatabaseManager() return _db_manager def init_db_manager(database_url: Optional[str] = None) -> DatabaseManager: """Inicializa o DatabaseManager global.""" global _db_manager _db_manager = DatabaseManager(database_url) return _db_manager