Spaces:
Runtime error
Runtime error
File size: 4,164 Bytes
f0322a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
import os
from contextlib import contextmanager
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.pool import NullPool, QueuePool
from typing import Generator, Optional
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manager para conexão e operações com PostgreSQL."""
def __init__(self, database_url: Optional[str] = None):
"""
Inicializa o DatabaseManager.
Args:
database_url: String de conexão PostgreSQL. Se None, usa variável de ambiente.
"""
self.database_url = database_url or os.getenv(
'DATABASE_URL',
'postgresql://user:password@localhost:5432/para_ai'
)
self.engine = None
self.SessionLocal = None
self._initialize_engine()
def _initialize_engine(self):
"""Inicializa a engine SQLAlchemy com pooling otimizado."""
try:
# Configuração de pool para produção
pool_size = int(os.getenv('DB_POOL_SIZE', 20))
max_overflow = int(os.getenv('DB_MAX_OVERFLOW', 40))
self.engine = create_engine(
self.database_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_pre_ping=True, # Verifica conexão antes de usar
echo=os.getenv('SQL_ECHO', 'false').lower() == 'true',
connect_args={
'connect_timeout': 10,
'application_name': 'para_ai'
}
)
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False,
expire_on_commit=False
)
# Event listener para logging de conexões
@event.listens_for(self.engine, "connect")
def receive_connect(dbapi_conn, connection_record):
logger.debug(f"Conexão PostgreSQL estabelecida: {dbapi_conn}")
logger.info("DatabaseManager inicializado com sucesso")
except Exception as e:
logger.error(f"Erro ao inicializar engine: {e}")
raise
@contextmanager
def get_session(self) -> Generator[Session, None, None]:
"""
Context manager para obter uma sessão do banco.
Yields:
Session: Sessão SQLAlchemy.
"""
session = self.SessionLocal()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Erro na transação: {e}")
raise
finally:
session.close()
def health_check(self) -> bool:
"""
Verifica se a conexão com banco está ok.
Returns:
bool: True se conexão ok, False caso contrário.
"""
try:
with self.get_session() as session:
session.execute("SELECT 1")
logger.info("Health check OK")
return True
except Exception as e:
logger.error(f"Health check FAILED: {e}")
return False
def close(self):
"""Fecha todas as conexões da pool."""
if self.engine:
self.engine.dispose()
logger.info("Engine disposed")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# Instância global
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Retorna a instância global do DatabaseManager."""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
def init_db_manager(database_url: Optional[str] = None) -> DatabaseManager:
"""Inicializa o DatabaseManager global."""
global _db_manager
_db_manager = DatabaseManager(database_url)
return _db_manager
|