from __future__ import annotations from pathlib import Path from sqlalchemy import inspect, text from sqlalchemy.orm import Session from app.config import settings from app.database import Base, engine from app.models import Admin from app.security import hash_password, verify_password from app.services.images import persist_task_asset def ensure_column(table_name: str, column_name: str, ddl: str) -> None: inspector = inspect(engine) if table_name not in inspector.get_table_names(): return columns = {column["name"] for column in inspector.get_columns(table_name)} if column_name in columns: return with engine.begin() as connection: connection.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}")) def submission_indexes() -> tuple[set[str], set[str]]: inspector = inspect(engine) unique_names = { item["name"] for item in inspector.get_unique_constraints("submissions") if item.get("name") } index_names = { item["name"] for item in inspector.get_indexes("submissions") if item.get("name") } return unique_names, index_names def task_column_details() -> dict[str, dict]: inspector = inspect(engine) if "tasks" not in inspector.get_table_names(): return {} return {column["name"]: column for column in inspector.get_columns("tasks")} def ensure_submission_constraints() -> None: inspector = inspect(engine) if "submissions" not in inspector.get_table_names(): return unique_names, index_names = submission_indexes() with engine.begin() as connection: if "uq_user_task_submission" in unique_names or "uq_user_task_submission" in index_names: try: connection.execute(text("ALTER TABLE submissions DROP INDEX uq_user_task_submission")) except Exception: pass unique_names, index_names = submission_indexes() if "uq_group_task_submission" in unique_names or "uq_group_task_submission" in index_names: return with engine.begin() as connection: try: connection.execute( text( "ALTER TABLE submissions ADD CONSTRAINT uq_group_task_submission UNIQUE (group_id, task_id)" ) ) except Exception: pass def relax_legacy_task_media_columns() -> None: if engine.dialect.name != "mysql": return column_details = task_column_details() if not column_details: return legacy_columns = ["image_data", "clue_image_data"] statements: list[str] = [] for column_name in legacy_columns: detail = column_details.get(column_name) if not detail or detail.get("nullable", True): continue column_type = detail.get("type") if column_type is None: continue compiled_type = column_type.compile(dialect=engine.dialect) statements.append( f"ALTER TABLE tasks MODIFY COLUMN {column_name} {compiled_type} NULL" ) if not statements: return with engine.begin() as connection: for statement in statements: connection.execute(text(statement)) def migrate_task_media_to_files() -> None: column_details = task_column_details() if not column_details: return has_image_data = "image_data" in column_details has_clue_image_data = "clue_image_data" in column_details can_clear_image_data = bool(has_image_data and column_details["image_data"].get("nullable", True)) can_clear_clue_image_data = bool( has_clue_image_data and column_details["clue_image_data"].get("nullable", True) ) if not has_image_data and not has_clue_image_data: return settings.task_media_root.mkdir(parents=True, exist_ok=True) select_sql = f""" SELECT id, activity_id, image_path, clue_image_path, image_url, clue_image_url, {'image_data' if has_image_data else 'NULL'} AS image_data, {'clue_image_data' if has_clue_image_data else 'NULL'} AS clue_image_data FROM tasks """ with engine.begin() as connection: rows = connection.execute(text(select_sql)).mappings().all() for row in rows: updates: list[str] = [] params = {"task_id": row["id"]} current_image_path = row.get("image_path") image_exists = bool(current_image_path and Path(current_image_path).exists()) if row.get("image_data") and not row.get("image_url") and not image_exists: image_path = persist_task_asset( settings.task_media_root, row["activity_id"], row["id"], "image", row["image_data"], ) updates.append("image_path = :image_path") params["image_path"] = image_path if can_clear_image_data: updates.append("image_data = NULL") current_clue_path = row.get("clue_image_path") clue_exists = bool(current_clue_path and Path(current_clue_path).exists()) if row.get("clue_image_data") and not row.get("clue_image_url") and not clue_exists: clue_image_path = persist_task_asset( settings.task_media_root, row["activity_id"], row["id"], "clue", row["clue_image_data"], ) updates.append("clue_image_path = :clue_image_path") params["clue_image_path"] = clue_image_path if can_clear_clue_image_data: updates.append("clue_image_data = NULL") if updates: connection.execute( text(f"UPDATE tasks SET {', '.join(updates)} WHERE id = :task_id"), params, ) def upgrade_schema() -> None: ensure_column("admins", "last_seen_at", "last_seen_at DATETIME NULL") ensure_column("users", "last_seen_at", "last_seen_at DATETIME NULL") ensure_column("activities", "is_visible", "is_visible TINYINT(1) NOT NULL DEFAULT 1") ensure_column("activities", "leaderboard_visible", "leaderboard_visible TINYINT(1) NOT NULL DEFAULT 1") ensure_column("activities", "clue_interval_minutes", "clue_interval_minutes INT NULL") ensure_column("tasks", "display_order", "display_order INT NOT NULL DEFAULT 1") ensure_column("tasks", "image_url", "image_url VARCHAR(1000) NULL") ensure_column("tasks", "image_path", "image_path VARCHAR(600) NULL") ensure_column("tasks", "clue_image_url", "clue_image_url VARCHAR(1000) NULL") ensure_column("tasks", "clue_image_path", "clue_image_path VARCHAR(600) NULL") ensure_column("tasks", "clue_image_mime", "clue_image_mime VARCHAR(120) NULL") ensure_column("tasks", "clue_image_filename", "clue_image_filename VARCHAR(255) NULL") ensure_column("tasks", "clue_release_at", "clue_release_at DATETIME NULL") ensure_column("submissions", "group_id", "group_id INT NULL") ensure_column("submissions", "assigned_admin_id", "assigned_admin_id INT NULL") ensure_column("submissions", "assigned_at", "assigned_at DATETIME NULL") ensure_column("submissions", "reviewed_at", "reviewed_at DATETIME NULL") ensure_column("submissions", "approved_at", "approved_at DATETIME NULL") with engine.begin() as connection: if engine.dialect.name == "mysql": connection.execute( text( """ UPDATE submissions AS s JOIN users AS u ON s.user_id = u.id SET s.group_id = u.group_id WHERE s.group_id IS NULL """ ) ) else: connection.execute( text( """ UPDATE submissions SET group_id = ( SELECT users.group_id FROM users WHERE users.id = submissions.user_id ) WHERE group_id IS NULL """ ) ) ensure_submission_constraints() relax_legacy_task_media_columns() migrate_task_media_to_files() def initialize_database() -> None: settings.docker_root.mkdir(parents=True, exist_ok=True) settings.upload_root.mkdir(parents=True, exist_ok=True) settings.task_media_root.mkdir(parents=True, exist_ok=True) Base.metadata.create_all(bind=engine) upgrade_schema() def seed_super_admin(db: Session) -> None: admin = db.query(Admin).filter(Admin.username == settings.admin_username).first() if not admin: admin = Admin( username=settings.admin_username, display_name="超级管理员", role="superadmin", password_hash=hash_password(settings.admin_password), ) db.add(admin) db.commit() return changed = False if admin.role != "superadmin": admin.role = "superadmin" changed = True if not verify_password(settings.admin_password, admin.password_hash): admin.password_hash = hash_password(settings.admin_password) changed = True if changed: db.add(admin) db.commit()