| from sqlmodel import SQLModel, Session, create_engine |
| from sqlalchemy import inspect, text |
| from sqlalchemy.pool import StaticPool |
| from .config import get_settings |
|
|
| settings = get_settings() |
| if settings.meta_bridge_database_url == "sqlite://": |
| engine = create_engine(settings.meta_bridge_database_url, echo=False, connect_args={"check_same_thread": False}, poolclass=StaticPool) |
| else: |
| connect_args = {"check_same_thread": False} if settings.meta_bridge_database_url.startswith("sqlite") else {} |
| engine = create_engine(settings.meta_bridge_database_url, echo=False, connect_args=connect_args) |
|
|
| def init_db() -> None: |
| SQLModel.metadata.create_all(engine) |
| _ensure_draft_media_columns() |
|
|
| def _ensure_draft_media_columns() -> None: |
| inspector = inspect(engine) |
| if "draft" not in inspector.get_table_names(): |
| return |
| existing = {column["name"] for column in inspector.get_columns("draft")} |
| statements = [] |
| if "media_attachments" not in existing: |
| statements.append("ALTER TABLE draft ADD COLUMN media_attachments JSON DEFAULT '[]'") |
| if "media_required" not in existing: |
| statements.append("ALTER TABLE draft ADD COLUMN media_required BOOLEAN DEFAULT 0") |
| if "publish_mode" not in existing: |
| statements.append("ALTER TABLE draft ADD COLUMN publish_mode VARCHAR DEFAULT 'now'") |
| if "scheduled_publish_time" not in existing: |
| statements.append("ALTER TABLE draft ADD COLUMN scheduled_publish_time DATETIME") |
| if not statements: |
| return |
| with engine.begin() as conn: |
| for statement in statements: |
| conn.execute(text(statement)) |
|
|
| def get_session(): |
| with Session(engine) as session: |
| yield session |
|
|