Sadeep Sachintha
feat: implement async database session management with auto-migration and historical data seeding
63bcaad | import logging | |
| from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession | |
| from sqlalchemy.pool import NullPool | |
| from core.config import settings | |
| from db.models import Base | |
| import ssl | |
| logger = logging.getLogger(__name__) | |
| connect_args = {} | |
| if "supabase" in settings.database_url or "postgres" in settings.database_url or "postgresql" in settings.database_url: | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| connect_args["ssl"] = ssl_context | |
| connect_args["statement_cache_size"] = 0 | |
| connect_args["prepared_statement_cache_size"] = 0 | |
| # Initialize the async engine | |
| engine = create_async_engine( | |
| settings.database_url, | |
| echo=(settings.log_level == "DEBUG"), | |
| future=True, | |
| poolclass=NullPool, | |
| connect_args=connect_args | |
| ) | |
| # Create an async session factory | |
| async_session = async_sessionmaker( | |
| bind=engine, | |
| class_=AsyncSession, | |
| expire_on_commit=False, | |
| autoflush=False | |
| ) | |
| async def init_db(): | |
| """Initializes the database and creates all tables.""" | |
| try: | |
| async with engine.begin() as conn: | |
| # 1. Create tables defined in models.py (creates users table if not exists) | |
| await conn.run_sync(Base.metadata.create_all) | |
| # 2. Database migrations and cleanups | |
| is_postgres = "postgresql" in engine.url.drivername or "postgres" in engine.url.drivername | |
| from sqlalchemy import text | |
| if is_postgres: | |
| logger.info("Running on PostgreSQL. Ensuring schema matches simplified design...") | |
| # Run each statement individually to comply with asyncpg single-statement execution rules | |
| await conn.execute(text("DROP TABLE IF EXISTS thresholds CASCADE")) | |
| await conn.execute(text("DROP TABLE IF EXISTS subscriptions CASCADE")) | |
| await conn.execute(text("ALTER TABLE users ALTER COLUMN chat_id TYPE BIGINT")) | |
| await conn.execute(text("ALTER TABLE users ADD COLUMN IF NOT EXISTS is_subscribed BOOLEAN DEFAULT TRUE")) | |
| logger.info("PostgreSQL database schema successfully migrated and simplified.") | |
| else: | |
| logger.info("Running on SQLite. Running schema verification...") | |
| # SQLite fallback to add is_subscribed column if it does not exist | |
| try: | |
| await conn.execute(text("ALTER TABLE users ADD COLUMN is_subscribed BOOLEAN DEFAULT 1")) | |
| logger.info("Successfully added is_subscribed column to SQLite users table.") | |
| except Exception: | |
| # Column already exists | |
| pass | |
| # 3. Dynamic Seeding of ExchangeRateHistory if empty | |
| async with async_session() as session: | |
| from sqlalchemy import select | |
| from db.models import ExchangeRateHistory | |
| from datetime import timedelta, timezone, datetime | |
| import random | |
| # Check if history exists | |
| result = await session.execute(select(ExchangeRateHistory.id).limit(1)) | |
| if not result.first(): | |
| logger.info("Exchange rate history table is empty. Generating 15-day random-walk seed data...") | |
| base_rates = { | |
| "USD": 325.0, | |
| "EUR": 375.0, | |
| "GBP": 430.0, | |
| "AUD": 230.0, | |
| "JPY": 2.05, | |
| "AED": 88.5, | |
| "SAR": 86.5, | |
| "INR": 3.40, | |
| "CNY": 47.5, | |
| "QAR": 89.0 | |
| } | |
| now = datetime.now(timezone.utc) | |
| for cur, base_val in base_rates.items(): | |
| current_val = base_val | |
| for day in range(15, -1, -1): | |
| change = current_val * random.uniform(-0.012, 0.012) | |
| current_val = round(current_val + change, 4) | |
| dt = (now - timedelta(days=day)).replace(hour=8, minute=0, second=0, microsecond=0, tzinfo=None) | |
| session.add( | |
| ExchangeRateHistory( | |
| currency=cur, | |
| rate_to_lkr=current_val, | |
| timestamp=dt | |
| ) | |
| ) | |
| await session.commit() | |
| logger.info("Successfully seeded 15-day historical exchange rates.") | |
| logger.info("Database initialized successfully.") | |
| except Exception as e: | |
| logger.error(f"Error initializing database: {e}") | |
| raise | |
| async def get_session() -> AsyncSession: | |
| """Dependency to get a database session.""" | |
| async with async_session() as session: | |
| yield session | |