Spaces:
Runtime error
Runtime error
| import os | |
| from contextlib import contextmanager | |
| from sqlalchemy import create_engine, event | |
| from sqlalchemy.orm import sessionmaker, Session | |
| from sqlalchemy.pool import NullPool, QueuePool | |
| from typing import Generator, Optional | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class DatabaseManager: | |
| """Manager para conexão e operações com PostgreSQL.""" | |
| def __init__(self, database_url: Optional[str] = None): | |
| """ | |
| Inicializa o DatabaseManager. | |
| Args: | |
| database_url: String de conexão PostgreSQL. Se None, usa variável de ambiente. | |
| """ | |
| self.database_url = database_url or os.getenv( | |
| 'DATABASE_URL', | |
| 'postgresql://user:password@localhost:5432/para_ai' | |
| ) | |
| self.engine = None | |
| self.SessionLocal = None | |
| self._initialize_engine() | |
| def _initialize_engine(self): | |
| """Inicializa a engine SQLAlchemy com pooling otimizado.""" | |
| try: | |
| # Configuração de pool para produção | |
| pool_size = int(os.getenv('DB_POOL_SIZE', 20)) | |
| max_overflow = int(os.getenv('DB_MAX_OVERFLOW', 40)) | |
| self.engine = create_engine( | |
| self.database_url, | |
| poolclass=QueuePool, | |
| pool_size=pool_size, | |
| max_overflow=max_overflow, | |
| pool_pre_ping=True, # Verifica conexão antes de usar | |
| echo=os.getenv('SQL_ECHO', 'false').lower() == 'true', | |
| connect_args={ | |
| 'connect_timeout': 10, | |
| 'application_name': 'para_ai' | |
| } | |
| ) | |
| self.SessionLocal = sessionmaker( | |
| bind=self.engine, | |
| autocommit=False, | |
| autoflush=False, | |
| expire_on_commit=False | |
| ) | |
| # Event listener para logging de conexões | |
| def receive_connect(dbapi_conn, connection_record): | |
| logger.debug(f"Conexão PostgreSQL estabelecida: {dbapi_conn}") | |
| logger.info("DatabaseManager inicializado com sucesso") | |
| except Exception as e: | |
| logger.error(f"Erro ao inicializar engine: {e}") | |
| raise | |
| def get_session(self) -> Generator[Session, None, None]: | |
| """ | |
| Context manager para obter uma sessão do banco. | |
| Yields: | |
| Session: Sessão SQLAlchemy. | |
| """ | |
| session = self.SessionLocal() | |
| try: | |
| yield session | |
| session.commit() | |
| except Exception as e: | |
| session.rollback() | |
| logger.error(f"Erro na transação: {e}") | |
| raise | |
| finally: | |
| session.close() | |
| def health_check(self) -> bool: | |
| """ | |
| Verifica se a conexão com banco está ok. | |
| Returns: | |
| bool: True se conexão ok, False caso contrário. | |
| """ | |
| try: | |
| with self.get_session() as session: | |
| session.execute("SELECT 1") | |
| logger.info("Health check OK") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Health check FAILED: {e}") | |
| return False | |
| def close(self): | |
| """Fecha todas as conexões da pool.""" | |
| if self.engine: | |
| self.engine.dispose() | |
| logger.info("Engine disposed") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.close() | |
| # Instância global | |
| _db_manager: Optional[DatabaseManager] = None | |
| def get_db_manager() -> DatabaseManager: | |
| """Retorna a instância global do DatabaseManager.""" | |
| global _db_manager | |
| if _db_manager is None: | |
| _db_manager = DatabaseManager() | |
| return _db_manager | |
| def init_db_manager(database_url: Optional[str] = None) -> DatabaseManager: | |
| """Inicializa o DatabaseManager global.""" | |
| global _db_manager | |
| _db_manager = DatabaseManager(database_url) | |
| return _db_manager | |