muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Enterprise-Grade Database Engine with Connection Pooling and Async Support
"""
import os
import logging
from typing import Optional, AsyncGenerator
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import (
create_async_engine,
AsyncSession,
AsyncEngine,
async_sessionmaker
)
from sqlalchemy.pool import NullPool, QueuePool
from sqlalchemy import event, text
from .models import Base
logger = logging.getLogger(__name__)
class DatabaseConfig:
"""Database configuration with environment variable support"""
def __init__(self):
# Database URL (supports SQLite, PostgreSQL, MySQL)
self.database_url = os.getenv(
"DATABASE_URL",
"sqlite+aiosqlite:///./data/cx_agent.db"
)
# Convert postgres:// to postgresql:// for SQLAlchemy
if self.database_url.startswith("postgres://"):
self.database_url = self.database_url.replace(
"postgres://", "postgresql+asyncpg://", 1
)
# Connection pool settings
self.pool_size = int(os.getenv("DB_POOL_SIZE", "20"))
self.max_overflow = int(os.getenv("DB_MAX_OVERFLOW", "10"))
self.pool_timeout = int(os.getenv("DB_POOL_TIMEOUT", "30"))
self.pool_recycle = int(os.getenv("DB_POOL_RECYCLE", "3600"))
self.pool_pre_ping = os.getenv("DB_POOL_PRE_PING", "true").lower() == "true"
# Echo SQL for debugging
self.echo = os.getenv("DB_ECHO", "false").lower() == "true"
# Enable SQLite WAL mode for better concurrency
self.enable_wal = os.getenv("SQLITE_WAL", "true").lower() == "true"
def is_sqlite(self) -> bool:
"""Check if using SQLite"""
return "sqlite" in self.database_url
def is_postgres(self) -> bool:
"""Check if using PostgreSQL"""
return "postgresql" in self.database_url
class DatabaseManager:
"""Singleton database manager with connection pooling"""
_instance: Optional["DatabaseManager"] = None
_engine: Optional[AsyncEngine] = None
_session_factory: Optional[async_sessionmaker[AsyncSession]] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if self._engine is None:
self._initialize()
def _initialize(self):
"""Initialize database engine and session factory"""
config = DatabaseConfig()
# Engine kwargs
engine_kwargs = {
"echo": config.echo,
"future": True,
}
# Configure connection pool based on database type
if config.is_sqlite():
# SQLite specific settings
logger.info(f"Initializing SQLite database: {config.database_url}")
engine_kwargs.update({
"poolclass": NullPool, # SQLite doesn't need pooling in the same way
"connect_args": {
"check_same_thread": False,
"timeout": 30,
}
})
# Enable WAL mode for better concurrency
if config.enable_wal:
engine_kwargs["connect_args"]["pragmas"] = {
"journal_mode": "WAL",
"synchronous": "NORMAL",
"cache_size": -64000, # 64MB cache
"foreign_keys": 1,
"busy_timeout": 5000,
}
else:
# PostgreSQL/MySQL settings
logger.info(f"Initializing database: {config.database_url}")
engine_kwargs.update({
"poolclass": QueuePool,
"pool_size": config.pool_size,
"max_overflow": config.max_overflow,
"pool_timeout": config.pool_timeout,
"pool_recycle": config.pool_recycle,
"pool_pre_ping": config.pool_pre_ping,
})
# Create async engine
self._engine = create_async_engine(
config.database_url,
**engine_kwargs
)
# Create session factory
self._session_factory = async_sessionmaker(
self._engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False
)
# Register event listeners
self._register_event_listeners()
logger.info("Database engine initialized successfully")
def _register_event_listeners(self):
"""Register SQLAlchemy event listeners"""
@event.listens_for(self._engine.sync_engine, "connect")
def receive_connect(dbapi_conn, connection_record):
"""Event listener for new connections"""
logger.debug("New database connection established")
@event.listens_for(self._engine.sync_engine, "close")
def receive_close(dbapi_conn, connection_record):
"""Event listener for closed connections"""
logger.debug("Database connection closed")
@property
def engine(self) -> AsyncEngine:
"""Get the database engine"""
if self._engine is None:
raise RuntimeError("Database engine not initialized")
return self._engine
@property
def session_factory(self) -> async_sessionmaker[AsyncSession]:
"""Get the session factory"""
if self._session_factory is None:
raise RuntimeError("Session factory not initialized")
return self._session_factory
async def create_tables(self):
"""Create all database tables"""
logger.info("Creating database tables...")
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Database tables created successfully")
async def drop_tables(self):
"""Drop all database tables (use with caution!)"""
logger.warning("Dropping all database tables...")
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
logger.info("Database tables dropped")
async def health_check(self) -> bool:
"""Check database health"""
try:
async with self.get_session() as session:
await session.execute(text("SELECT 1"))
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False
@asynccontextmanager
async def get_session(self) -> AsyncGenerator[AsyncSession, None]:
"""Get a database session with automatic cleanup"""
session = self.session_factory()
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
logger.error(f"Database session error: {e}")
raise
finally:
await session.close()
async def close(self):
"""Close database engine and connections"""
if self._engine is not None:
await self._engine.dispose()
logger.info("Database engine closed")
# Global database manager instance
_db_manager: Optional[DatabaseManager] = None
def get_db_manager() -> DatabaseManager:
"""Get or create the global database manager instance"""
global _db_manager
if _db_manager is None:
_db_manager = DatabaseManager()
return _db_manager
async def get_session() -> AsyncGenerator[AsyncSession, None]:
"""Convenience function to get a database session"""
db_manager = get_db_manager()
async with db_manager.get_session() as session:
yield session
async def init_database():
"""Initialize database (create tables if needed)"""
db_manager = get_db_manager()
await db_manager.create_tables()
logger.info("Database initialized")
async def close_database():
"""Close database connections"""
db_manager = get_db_manager()
await db_manager.close()
logger.info("Database closed")