FlyRates / db /session.py
Sadeep Sachintha
feat: implement async database session management with auto-migration and historical data seeding
63bcaad
import logging
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import NullPool
from core.config import settings
from db.models import Base
import ssl
logger = logging.getLogger(__name__)
connect_args = {}
if "supabase" in settings.database_url or "postgres" in settings.database_url or "postgresql" in settings.database_url:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
connect_args["ssl"] = ssl_context
connect_args["statement_cache_size"] = 0
connect_args["prepared_statement_cache_size"] = 0
# Initialize the async engine
engine = create_async_engine(
settings.database_url,
echo=(settings.log_level == "DEBUG"),
future=True,
poolclass=NullPool,
connect_args=connect_args
)
# Create an async session factory
async_session = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
autoflush=False
)
async def init_db():
"""Initializes the database and creates all tables."""
try:
async with engine.begin() as conn:
# 1. Create tables defined in models.py (creates users table if not exists)
await conn.run_sync(Base.metadata.create_all)
# 2. Database migrations and cleanups
is_postgres = "postgresql" in engine.url.drivername or "postgres" in engine.url.drivername
from sqlalchemy import text
if is_postgres:
logger.info("Running on PostgreSQL. Ensuring schema matches simplified design...")
# Run each statement individually to comply with asyncpg single-statement execution rules
await conn.execute(text("DROP TABLE IF EXISTS thresholds CASCADE"))
await conn.execute(text("DROP TABLE IF EXISTS subscriptions CASCADE"))
await conn.execute(text("ALTER TABLE users ALTER COLUMN chat_id TYPE BIGINT"))
await conn.execute(text("ALTER TABLE users ADD COLUMN IF NOT EXISTS is_subscribed BOOLEAN DEFAULT TRUE"))
logger.info("PostgreSQL database schema successfully migrated and simplified.")
else:
logger.info("Running on SQLite. Running schema verification...")
# SQLite fallback to add is_subscribed column if it does not exist
try:
await conn.execute(text("ALTER TABLE users ADD COLUMN is_subscribed BOOLEAN DEFAULT 1"))
logger.info("Successfully added is_subscribed column to SQLite users table.")
except Exception:
# Column already exists
pass
# 3. Dynamic Seeding of ExchangeRateHistory if empty
async with async_session() as session:
from sqlalchemy import select
from db.models import ExchangeRateHistory
from datetime import timedelta, timezone, datetime
import random
# Check if history exists
result = await session.execute(select(ExchangeRateHistory.id).limit(1))
if not result.first():
logger.info("Exchange rate history table is empty. Generating 15-day random-walk seed data...")
base_rates = {
"USD": 325.0,
"EUR": 375.0,
"GBP": 430.0,
"AUD": 230.0,
"JPY": 2.05,
"AED": 88.5,
"SAR": 86.5,
"INR": 3.40,
"CNY": 47.5,
"QAR": 89.0
}
now = datetime.now(timezone.utc)
for cur, base_val in base_rates.items():
current_val = base_val
for day in range(15, -1, -1):
change = current_val * random.uniform(-0.012, 0.012)
current_val = round(current_val + change, 4)
dt = (now - timedelta(days=day)).replace(hour=8, minute=0, second=0, microsecond=0, tzinfo=None)
session.add(
ExchangeRateHistory(
currency=cur,
rate_to_lkr=current_val,
timestamp=dt
)
)
await session.commit()
logger.info("Successfully seeded 15-day historical exchange rates.")
logger.info("Database initialized successfully.")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise
async def get_session() -> AsyncSession:
"""Dependency to get a database session."""
async with async_session() as session:
yield session