jebin2's picture
db services
50c20bf
"""
DBService Database Initialization
Provides utilities for creating and managing database tables.
"""
import logging
from sqlalchemy.ext.asyncio import AsyncEngine
from services.db_service.config import DBServiceConfig
logger = logging.getLogger(__name__)
async def init_database(engine: AsyncEngine) -> None:
"""
Initialize database tables based on registered models.
Creates all tables defined in DBServiceConfig.all_models.
"""
DBServiceConfig.assert_registered()
if not DBServiceConfig.db_base:
raise RuntimeError(
"No database base registered! "
"Pass db_base parameter to DBServiceConfig.register()"
)
logger.info("Creating database tables...")
async with engine.begin() as conn:
await conn.run_sync(DBServiceConfig.db_base.metadata.create_all)
model_count = len(DBServiceConfig.all_models)
logger.info(f"βœ… Database initialized with {model_count} models")
async def drop_database(engine: AsyncEngine) -> None:
"""Drop all database tables. WARNING: Deletes all data!"""
DBServiceConfig.assert_registered()
if not DBServiceConfig.db_base:
raise RuntimeError("No database base registered!")
logger.warning("⚠️ Dropping all database tables...")
async with engine.begin() as conn:
await conn.run_sync(DBServiceConfig.db_base.metadata.drop_all)
logger.info("βœ… All tables dropped")
async def reset_database(engine: AsyncEngine) -> None:
"""Reset database (drop + create). WARNING: Deletes all data!"""
await drop_database(engine)
await init_database(engine)
logger.info("βœ… Database reset complete")
def get_registered_models() -> list:
"""Get list of all registered models."""
DBServiceConfig.assert_registered()
return DBServiceConfig.all_models
def get_model_by_name(model_name: str):
"""Get model class by name."""
DBServiceConfig.assert_registered()
for model in DBServiceConfig.all_models:
if model.__name__ == model_name:
return model
return None