PARA.AI / database /db_manager.py
Carlex22's picture
Revert "ParaAIV3.1"
1f24745
import os
from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import NullPool, QueuePool
from typing import Generator, Optional
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manager para conexão e operações com PostgreSQL."""
def __init__(self, database_url: Optional[str] = None):
"""
Inicializa o DatabaseManager.
Args:
database_url: String de conexão PostgreSQL. Se None, usa variável de ambiente.
"""
self.database_url = database_url or os.getenv(
'DATABASE_URL',
'postgresql://user:password@localhost:5432/para_ai'
)
self.engine = None
self.SessionLocal = None
self._initialize_engine()
def _initialize_engine(self):
"""Inicializa a engine SQLAlchemy com pooling otimizado."""
try:
# Configuração de pool para produção
pool_size = int(os.getenv('DB_POOL_SIZE', 20))
max_overflow = int(os.getenv('DB_MAX_OVERFLOW', 40))
self.engine = create_engine(
self.database_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=True, # Verifica conexão antes de usar
echo=os.getenv('SQL_ECHO', 'false').lower() == 'true',
connect_args={
'connect_timeout': 10,
'application_name': 'para_ai'
}
)
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
# Event listener para logging de conexões
@event.listens_for(self.engine, "connect")
def receive_connect(dbapi_conn, connection_record):
logger.debug(f"Conexão PostgreSQL estabelecida: {dbapi_conn}")
logger.info("DatabaseManager inicializado com sucesso")
except Exception as e:
logger.error(f"Erro ao inicializar engine: {e}")
raise
@contextmanager
def get_session(self) -> Generator[Session, None, None]:
"""
Context manager para obter uma sessão do banco.
Yields:
Session: Sessão SQLAlchemy.
"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Erro na transação: {e}")
raise
finally:
session.close()
def health_check(self) -> bool:
"""
Verifica se a conexão com banco está ok.
Returns:
bool: True se conexão ok, False caso contrário.
"""
try:
with self.get_session() as session:
session.execute("SELECT 1")
logger.info("Health check OK")
return True
except Exception as e:
logger.error(f"Health check FAILED: {e}")
return False
def close(self):
"""Fecha todas as conexões da pool."""
if self.engine:
self.engine.dispose()
logger.info("Engine disposed")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# Instância global
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Retorna a instância global do DatabaseManager."""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
def init_db_manager(database_url: Optional[str] = None) -> DatabaseManager:
"""Inicializa o DatabaseManager global."""
global _db_manager
_db_manager = DatabaseManager(database_url)
return _db_manager