| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| from sqlalchemy import inspect, text |
| from sqlalchemy.orm import Session |
|
|
| from app.config import settings |
| from app.database import Base, engine |
| from app.models import Admin |
| from app.security import hash_password, verify_password |
| from app.services.images import persist_task_asset |
|
|
|
|
| def ensure_column(table_name: str, column_name: str, ddl: str) -> None: |
| inspector = inspect(engine) |
| if table_name not in inspector.get_table_names(): |
| return |
| columns = {column["name"] for column in inspector.get_columns(table_name)} |
| if column_name in columns: |
| return |
| with engine.begin() as connection: |
| connection.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}")) |
|
|
|
|
| def submission_indexes() -> tuple[set[str], set[str]]: |
| inspector = inspect(engine) |
| unique_names = { |
| item["name"] |
| for item in inspector.get_unique_constraints("submissions") |
| if item.get("name") |
| } |
| index_names = { |
| item["name"] |
| for item in inspector.get_indexes("submissions") |
| if item.get("name") |
| } |
| return unique_names, index_names |
|
|
|
|
| def task_column_details() -> dict[str, dict]: |
| inspector = inspect(engine) |
| if "tasks" not in inspector.get_table_names(): |
| return {} |
| return {column["name"]: column for column in inspector.get_columns("tasks")} |
|
|
|
|
| def ensure_submission_constraints() -> None: |
| inspector = inspect(engine) |
| if "submissions" not in inspector.get_table_names(): |
| return |
|
|
| unique_names, index_names = submission_indexes() |
| with engine.begin() as connection: |
| if "uq_user_task_submission" in unique_names or "uq_user_task_submission" in index_names: |
| try: |
| connection.execute(text("ALTER TABLE submissions DROP INDEX uq_user_task_submission")) |
| except Exception: |
| pass |
|
|
| unique_names, index_names = submission_indexes() |
| if "uq_group_task_submission" in unique_names or "uq_group_task_submission" in index_names: |
| return |
|
|
| with engine.begin() as connection: |
| try: |
| connection.execute( |
| text( |
| "ALTER TABLE submissions ADD CONSTRAINT uq_group_task_submission UNIQUE (group_id, task_id)" |
| ) |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def relax_legacy_task_media_columns() -> None: |
| if engine.dialect.name != "mysql": |
| return |
|
|
| column_details = task_column_details() |
| if not column_details: |
| return |
|
|
| legacy_columns = ["image_data", "clue_image_data"] |
| statements: list[str] = [] |
| for column_name in legacy_columns: |
| detail = column_details.get(column_name) |
| if not detail or detail.get("nullable", True): |
| continue |
| column_type = detail.get("type") |
| if column_type is None: |
| continue |
| compiled_type = column_type.compile(dialect=engine.dialect) |
| statements.append( |
| f"ALTER TABLE tasks MODIFY COLUMN {column_name} {compiled_type} NULL" |
| ) |
|
|
| if not statements: |
| return |
|
|
| with engine.begin() as connection: |
| for statement in statements: |
| connection.execute(text(statement)) |
|
|
| def migrate_task_media_to_files() -> None: |
| column_details = task_column_details() |
| if not column_details: |
| return |
|
|
| has_image_data = "image_data" in column_details |
| has_clue_image_data = "clue_image_data" in column_details |
| can_clear_image_data = bool(has_image_data and column_details["image_data"].get("nullable", True)) |
| can_clear_clue_image_data = bool( |
| has_clue_image_data and column_details["clue_image_data"].get("nullable", True) |
| ) |
| if not has_image_data and not has_clue_image_data: |
| return |
|
|
| settings.task_media_root.mkdir(parents=True, exist_ok=True) |
|
|
| select_sql = f""" |
| SELECT |
| id, |
| activity_id, |
| image_path, |
| clue_image_path, |
| image_url, |
| clue_image_url, |
| {'image_data' if has_image_data else 'NULL'} AS image_data, |
| {'clue_image_data' if has_clue_image_data else 'NULL'} AS clue_image_data |
| FROM tasks |
| """ |
|
|
| with engine.begin() as connection: |
| rows = connection.execute(text(select_sql)).mappings().all() |
| for row in rows: |
| updates: list[str] = [] |
| params = {"task_id": row["id"]} |
|
|
| current_image_path = row.get("image_path") |
| image_exists = bool(current_image_path and Path(current_image_path).exists()) |
| if row.get("image_data") and not row.get("image_url") and not image_exists: |
| image_path = persist_task_asset( |
| settings.task_media_root, |
| row["activity_id"], |
| row["id"], |
| "image", |
| row["image_data"], |
| ) |
| updates.append("image_path = :image_path") |
| params["image_path"] = image_path |
| if can_clear_image_data: |
| updates.append("image_data = NULL") |
|
|
| current_clue_path = row.get("clue_image_path") |
| clue_exists = bool(current_clue_path and Path(current_clue_path).exists()) |
| if row.get("clue_image_data") and not row.get("clue_image_url") and not clue_exists: |
| clue_image_path = persist_task_asset( |
| settings.task_media_root, |
| row["activity_id"], |
| row["id"], |
| "clue", |
| row["clue_image_data"], |
| ) |
| updates.append("clue_image_path = :clue_image_path") |
| params["clue_image_path"] = clue_image_path |
| if can_clear_clue_image_data: |
| updates.append("clue_image_data = NULL") |
|
|
| if updates: |
| connection.execute( |
| text(f"UPDATE tasks SET {', '.join(updates)} WHERE id = :task_id"), |
| params, |
| ) |
|
|
|
|
| def upgrade_schema() -> None: |
| ensure_column("admins", "last_seen_at", "last_seen_at DATETIME NULL") |
| ensure_column("users", "last_seen_at", "last_seen_at DATETIME NULL") |
| ensure_column("activities", "is_visible", "is_visible TINYINT(1) NOT NULL DEFAULT 1") |
| ensure_column("activities", "leaderboard_visible", "leaderboard_visible TINYINT(1) NOT NULL DEFAULT 1") |
| ensure_column("activities", "clue_interval_minutes", "clue_interval_minutes INT NULL") |
| ensure_column("tasks", "display_order", "display_order INT NOT NULL DEFAULT 1") |
| ensure_column("tasks", "image_url", "image_url VARCHAR(1000) NULL") |
| ensure_column("tasks", "image_path", "image_path VARCHAR(600) NULL") |
| ensure_column("tasks", "clue_image_url", "clue_image_url VARCHAR(1000) NULL") |
| ensure_column("tasks", "clue_image_path", "clue_image_path VARCHAR(600) NULL") |
| ensure_column("tasks", "clue_image_mime", "clue_image_mime VARCHAR(120) NULL") |
| ensure_column("tasks", "clue_image_filename", "clue_image_filename VARCHAR(255) NULL") |
| ensure_column("tasks", "clue_release_at", "clue_release_at DATETIME NULL") |
| ensure_column("submissions", "group_id", "group_id INT NULL") |
| ensure_column("submissions", "assigned_admin_id", "assigned_admin_id INT NULL") |
| ensure_column("submissions", "assigned_at", "assigned_at DATETIME NULL") |
| ensure_column("submissions", "reviewed_at", "reviewed_at DATETIME NULL") |
| ensure_column("submissions", "approved_at", "approved_at DATETIME NULL") |
|
|
| with engine.begin() as connection: |
| if engine.dialect.name == "mysql": |
| connection.execute( |
| text( |
| """ |
| UPDATE submissions AS s |
| JOIN users AS u ON s.user_id = u.id |
| SET s.group_id = u.group_id |
| WHERE s.group_id IS NULL |
| """ |
| ) |
| ) |
| else: |
| connection.execute( |
| text( |
| """ |
| UPDATE submissions |
| SET group_id = ( |
| SELECT users.group_id |
| FROM users |
| WHERE users.id = submissions.user_id |
| ) |
| WHERE group_id IS NULL |
| """ |
| ) |
| ) |
|
|
| ensure_submission_constraints() |
| relax_legacy_task_media_columns() |
| migrate_task_media_to_files() |
|
|
|
|
| def initialize_database() -> None: |
| settings.docker_root.mkdir(parents=True, exist_ok=True) |
| settings.upload_root.mkdir(parents=True, exist_ok=True) |
| settings.task_media_root.mkdir(parents=True, exist_ok=True) |
| Base.metadata.create_all(bind=engine) |
| upgrade_schema() |
|
|
|
|
| def seed_super_admin(db: Session) -> None: |
| admin = db.query(Admin).filter(Admin.username == settings.admin_username).first() |
| if not admin: |
| admin = Admin( |
| username=settings.admin_username, |
| display_name="超级管理员", |
| role="superadmin", |
| password_hash=hash_password(settings.admin_password), |
| ) |
| db.add(admin) |
| db.commit() |
| return |
|
|
| changed = False |
| if admin.role != "superadmin": |
| admin.role = "superadmin" |
| changed = True |
| if not verify_password(settings.admin_password, admin.password_hash): |
| admin.password_hash = hash_password(settings.admin_password) |
| changed = True |
| if changed: |
| db.add(admin) |
| db.commit() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|