cam / app /services /bootstrap.py
cacode's picture
Upload 68 files
7db57c0 verified
from __future__ import annotations
from pathlib import Path
from sqlalchemy import inspect, text
from sqlalchemy.orm import Session
from app.config import settings
from app.database import Base, engine
from app.models import Admin
from app.security import hash_password, verify_password
from app.services.images import persist_task_asset
def ensure_column(table_name: str, column_name: str, ddl: str) -> None:
inspector = inspect(engine)
if table_name not in inspector.get_table_names():
return
columns = {column["name"] for column in inspector.get_columns(table_name)}
if column_name in columns:
return
with engine.begin() as connection:
connection.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def submission_indexes() -> tuple[set[str], set[str]]:
inspector = inspect(engine)
unique_names = {
item["name"]
for item in inspector.get_unique_constraints("submissions")
if item.get("name")
}
index_names = {
item["name"]
for item in inspector.get_indexes("submissions")
if item.get("name")
}
return unique_names, index_names
def task_column_details() -> dict[str, dict]:
inspector = inspect(engine)
if "tasks" not in inspector.get_table_names():
return {}
return {column["name"]: column for column in inspector.get_columns("tasks")}
def ensure_submission_constraints() -> None:
inspector = inspect(engine)
if "submissions" not in inspector.get_table_names():
return
unique_names, index_names = submission_indexes()
with engine.begin() as connection:
if "uq_user_task_submission" in unique_names or "uq_user_task_submission" in index_names:
try:
connection.execute(text("ALTER TABLE submissions DROP INDEX uq_user_task_submission"))
except Exception:
pass
unique_names, index_names = submission_indexes()
if "uq_group_task_submission" in unique_names or "uq_group_task_submission" in index_names:
return
with engine.begin() as connection:
try:
connection.execute(
text(
"ALTER TABLE submissions ADD CONSTRAINT uq_group_task_submission UNIQUE (group_id, task_id)"
)
)
except Exception:
pass
def relax_legacy_task_media_columns() -> None:
if engine.dialect.name != "mysql":
return
column_details = task_column_details()
if not column_details:
return
legacy_columns = ["image_data", "clue_image_data"]
statements: list[str] = []
for column_name in legacy_columns:
detail = column_details.get(column_name)
if not detail or detail.get("nullable", True):
continue
column_type = detail.get("type")
if column_type is None:
continue
compiled_type = column_type.compile(dialect=engine.dialect)
statements.append(
f"ALTER TABLE tasks MODIFY COLUMN {column_name} {compiled_type} NULL"
)
if not statements:
return
with engine.begin() as connection:
for statement in statements:
connection.execute(text(statement))
def migrate_task_media_to_files() -> None:
column_details = task_column_details()
if not column_details:
return
has_image_data = "image_data" in column_details
has_clue_image_data = "clue_image_data" in column_details
can_clear_image_data = bool(has_image_data and column_details["image_data"].get("nullable", True))
can_clear_clue_image_data = bool(
has_clue_image_data and column_details["clue_image_data"].get("nullable", True)
)
if not has_image_data and not has_clue_image_data:
return
settings.task_media_root.mkdir(parents=True, exist_ok=True)
select_sql = f"""
SELECT
id,
activity_id,
image_path,
clue_image_path,
image_url,
clue_image_url,
{'image_data' if has_image_data else 'NULL'} AS image_data,
{'clue_image_data' if has_clue_image_data else 'NULL'} AS clue_image_data
FROM tasks
"""
with engine.begin() as connection:
rows = connection.execute(text(select_sql)).mappings().all()
for row in rows:
updates: list[str] = []
params = {"task_id": row["id"]}
current_image_path = row.get("image_path")
image_exists = bool(current_image_path and Path(current_image_path).exists())
if row.get("image_data") and not row.get("image_url") and not image_exists:
image_path = persist_task_asset(
settings.task_media_root,
row["activity_id"],
row["id"],
"image",
row["image_data"],
)
updates.append("image_path = :image_path")
params["image_path"] = image_path
if can_clear_image_data:
updates.append("image_data = NULL")
current_clue_path = row.get("clue_image_path")
clue_exists = bool(current_clue_path and Path(current_clue_path).exists())
if row.get("clue_image_data") and not row.get("clue_image_url") and not clue_exists:
clue_image_path = persist_task_asset(
settings.task_media_root,
row["activity_id"],
row["id"],
"clue",
row["clue_image_data"],
)
updates.append("clue_image_path = :clue_image_path")
params["clue_image_path"] = clue_image_path
if can_clear_clue_image_data:
updates.append("clue_image_data = NULL")
if updates:
connection.execute(
text(f"UPDATE tasks SET {', '.join(updates)} WHERE id = :task_id"),
params,
)
def upgrade_schema() -> None:
ensure_column("admins", "last_seen_at", "last_seen_at DATETIME NULL")
ensure_column("users", "last_seen_at", "last_seen_at DATETIME NULL")
ensure_column("activities", "is_visible", "is_visible TINYINT(1) NOT NULL DEFAULT 1")
ensure_column("activities", "leaderboard_visible", "leaderboard_visible TINYINT(1) NOT NULL DEFAULT 1")
ensure_column("activities", "clue_interval_minutes", "clue_interval_minutes INT NULL")
ensure_column("tasks", "display_order", "display_order INT NOT NULL DEFAULT 1")
ensure_column("tasks", "image_url", "image_url VARCHAR(1000) NULL")
ensure_column("tasks", "image_path", "image_path VARCHAR(600) NULL")
ensure_column("tasks", "clue_image_url", "clue_image_url VARCHAR(1000) NULL")
ensure_column("tasks", "clue_image_path", "clue_image_path VARCHAR(600) NULL")
ensure_column("tasks", "clue_image_mime", "clue_image_mime VARCHAR(120) NULL")
ensure_column("tasks", "clue_image_filename", "clue_image_filename VARCHAR(255) NULL")
ensure_column("tasks", "clue_release_at", "clue_release_at DATETIME NULL")
ensure_column("submissions", "group_id", "group_id INT NULL")
ensure_column("submissions", "assigned_admin_id", "assigned_admin_id INT NULL")
ensure_column("submissions", "assigned_at", "assigned_at DATETIME NULL")
ensure_column("submissions", "reviewed_at", "reviewed_at DATETIME NULL")
ensure_column("submissions", "approved_at", "approved_at DATETIME NULL")
with engine.begin() as connection:
if engine.dialect.name == "mysql":
connection.execute(
text(
"""
UPDATE submissions AS s
JOIN users AS u ON s.user_id = u.id
SET s.group_id = u.group_id
WHERE s.group_id IS NULL
"""
)
)
else:
connection.execute(
text(
"""
UPDATE submissions
SET group_id = (
SELECT users.group_id
FROM users
WHERE users.id = submissions.user_id
)
WHERE group_id IS NULL
"""
)
)
ensure_submission_constraints()
relax_legacy_task_media_columns()
migrate_task_media_to_files()
def initialize_database() -> None:
settings.docker_root.mkdir(parents=True, exist_ok=True)
settings.upload_root.mkdir(parents=True, exist_ok=True)
settings.task_media_root.mkdir(parents=True, exist_ok=True)
Base.metadata.create_all(bind=engine)
upgrade_schema()
def seed_super_admin(db: Session) -> None:
admin = db.query(Admin).filter(Admin.username == settings.admin_username).first()
if not admin:
admin = Admin(
username=settings.admin_username,
display_name="超级管理员",
role="superadmin",
password_hash=hash_password(settings.admin_password),
)
db.add(admin)
db.commit()
return
changed = False
if admin.role != "superadmin":
admin.role = "superadmin"
changed = True
if not verify_password(settings.admin_password, admin.password_hash):
admin.password_hash = hash_password(settings.admin_password)
changed = True
if changed:
db.add(admin)
db.commit()