LIBRE / migrations /env.py
RyZ
feat: adding full working local ETL Pipeline
e391a84
Raw
History Blame Contribute Delete
2.68 kB
"""
migrations/env.py
──────────────────
Alembic async environment configuration.
Supports:
β€’ Online mode (direct DB connection) β€” used by ``alembic upgrade head``
β€’ Offline mode (SQL script generation) β€” used by ``alembic upgrade head --sql``
"""
from __future__ import annotations
import asyncio
from logging.config import fileConfig
from alembic import context
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
# ── Import ORM models so Alembic sees them in metadata ───────────────────────
from src.infrastructure.database.models.base import Base
import src.infrastructure.database.models.ppg_model # noqa: F401
import src.infrastructure.database.models.prediction_model # noqa: F401
from src.shared.config import get_settings
# Alembic Config object (from alembic.ini)
config = context.config
# Set up logging from the alembic.ini [loggers] section
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Provide the target metadata to Alembic for auto-generation
target_metadata = Base.metadata
# Override sqlalchemy.url from our settings
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode β€” generates SQL script without DB connection."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
render_as_batch=True, # Required for SQLite ALTER TABLE support
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode using async engine."""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()