File size: 9,583 Bytes
ce0719e 3d6b7f2 02a8414 ce0719e 3d6b7f2 ce0719e 02a8414 28da773 3d6b7f2 28da773 3d6b7f2 02a8414 7db57c0 3d6b7f2 28da773 3d6b7f2 28da773 3d6b7f2 28da773 3d6b7f2 28da773 3d6b7f2 02a8414 4fb3744 02a8414 3d6b7f2 02a8414 3d6b7f2 02a8414 7db57c0 3d6b7f2 02a8414 ce0719e 3d6b7f2 ce0719e 3d6b7f2 ce0719e 02a8414 ce0719e 3d6b7f2 02a8414 28da773 4fb3744 7db57c0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | from __future__ import annotations
from pathlib import Path
from sqlalchemy import inspect, text
from sqlalchemy.orm import Session
from app.config import settings
from app.database import Base, engine
from app.models import Admin
from app.security import hash_password, verify_password
from app.services.images import persist_task_asset
def ensure_column(table_name: str, column_name: str, ddl: str) -> None:
inspector = inspect(engine)
if table_name not in inspector.get_table_names():
return
columns = {column["name"] for column in inspector.get_columns(table_name)}
if column_name in columns:
return
with engine.begin() as connection:
connection.execute(text(f"ALTER TABLE {table_name} ADD COLUMN {ddl}"))
def submission_indexes() -> tuple[set[str], set[str]]:
inspector = inspect(engine)
unique_names = {
item["name"]
for item in inspector.get_unique_constraints("submissions")
if item.get("name")
}
index_names = {
item["name"]
for item in inspector.get_indexes("submissions")
if item.get("name")
}
return unique_names, index_names
def task_column_details() -> dict[str, dict]:
inspector = inspect(engine)
if "tasks" not in inspector.get_table_names():
return {}
return {column["name"]: column for column in inspector.get_columns("tasks")}
def ensure_submission_constraints() -> None:
inspector = inspect(engine)
if "submissions" not in inspector.get_table_names():
return
unique_names, index_names = submission_indexes()
with engine.begin() as connection:
if "uq_user_task_submission" in unique_names or "uq_user_task_submission" in index_names:
try:
connection.execute(text("ALTER TABLE submissions DROP INDEX uq_user_task_submission"))
except Exception:
pass
unique_names, index_names = submission_indexes()
if "uq_group_task_submission" in unique_names or "uq_group_task_submission" in index_names:
return
with engine.begin() as connection:
try:
connection.execute(
text(
"ALTER TABLE submissions ADD CONSTRAINT uq_group_task_submission UNIQUE (group_id, task_id)"
)
)
except Exception:
pass
def relax_legacy_task_media_columns() -> None:
if engine.dialect.name != "mysql":
return
column_details = task_column_details()
if not column_details:
return
legacy_columns = ["image_data", "clue_image_data"]
statements: list[str] = []
for column_name in legacy_columns:
detail = column_details.get(column_name)
if not detail or detail.get("nullable", True):
continue
column_type = detail.get("type")
if column_type is None:
continue
compiled_type = column_type.compile(dialect=engine.dialect)
statements.append(
f"ALTER TABLE tasks MODIFY COLUMN {column_name} {compiled_type} NULL"
)
if not statements:
return
with engine.begin() as connection:
for statement in statements:
connection.execute(text(statement))
def migrate_task_media_to_files() -> None:
column_details = task_column_details()
if not column_details:
return
has_image_data = "image_data" in column_details
has_clue_image_data = "clue_image_data" in column_details
can_clear_image_data = bool(has_image_data and column_details["image_data"].get("nullable", True))
can_clear_clue_image_data = bool(
has_clue_image_data and column_details["clue_image_data"].get("nullable", True)
)
if not has_image_data and not has_clue_image_data:
return
settings.task_media_root.mkdir(parents=True, exist_ok=True)
select_sql = f"""
SELECT
id,
activity_id,
image_path,
clue_image_path,
image_url,
clue_image_url,
{'image_data' if has_image_data else 'NULL'} AS image_data,
{'clue_image_data' if has_clue_image_data else 'NULL'} AS clue_image_data
FROM tasks
"""
with engine.begin() as connection:
rows = connection.execute(text(select_sql)).mappings().all()
for row in rows:
updates: list[str] = []
params = {"task_id": row["id"]}
current_image_path = row.get("image_path")
image_exists = bool(current_image_path and Path(current_image_path).exists())
if row.get("image_data") and not row.get("image_url") and not image_exists:
image_path = persist_task_asset(
settings.task_media_root,
row["activity_id"],
row["id"],
"image",
row["image_data"],
)
updates.append("image_path = :image_path")
params["image_path"] = image_path
if can_clear_image_data:
updates.append("image_data = NULL")
current_clue_path = row.get("clue_image_path")
clue_exists = bool(current_clue_path and Path(current_clue_path).exists())
if row.get("clue_image_data") and not row.get("clue_image_url") and not clue_exists:
clue_image_path = persist_task_asset(
settings.task_media_root,
row["activity_id"],
row["id"],
"clue",
row["clue_image_data"],
)
updates.append("clue_image_path = :clue_image_path")
params["clue_image_path"] = clue_image_path
if can_clear_clue_image_data:
updates.append("clue_image_data = NULL")
if updates:
connection.execute(
text(f"UPDATE tasks SET {', '.join(updates)} WHERE id = :task_id"),
params,
)
def upgrade_schema() -> None:
ensure_column("admins", "last_seen_at", "last_seen_at DATETIME NULL")
ensure_column("users", "last_seen_at", "last_seen_at DATETIME NULL")
ensure_column("activities", "is_visible", "is_visible TINYINT(1) NOT NULL DEFAULT 1")
ensure_column("activities", "leaderboard_visible", "leaderboard_visible TINYINT(1) NOT NULL DEFAULT 1")
ensure_column("activities", "clue_interval_minutes", "clue_interval_minutes INT NULL")
ensure_column("tasks", "display_order", "display_order INT NOT NULL DEFAULT 1")
ensure_column("tasks", "image_url", "image_url VARCHAR(1000) NULL")
ensure_column("tasks", "image_path", "image_path VARCHAR(600) NULL")
ensure_column("tasks", "clue_image_url", "clue_image_url VARCHAR(1000) NULL")
ensure_column("tasks", "clue_image_path", "clue_image_path VARCHAR(600) NULL")
ensure_column("tasks", "clue_image_mime", "clue_image_mime VARCHAR(120) NULL")
ensure_column("tasks", "clue_image_filename", "clue_image_filename VARCHAR(255) NULL")
ensure_column("tasks", "clue_release_at", "clue_release_at DATETIME NULL")
ensure_column("submissions", "group_id", "group_id INT NULL")
ensure_column("submissions", "assigned_admin_id", "assigned_admin_id INT NULL")
ensure_column("submissions", "assigned_at", "assigned_at DATETIME NULL")
ensure_column("submissions", "reviewed_at", "reviewed_at DATETIME NULL")
ensure_column("submissions", "approved_at", "approved_at DATETIME NULL")
with engine.begin() as connection:
if engine.dialect.name == "mysql":
connection.execute(
text(
"""
UPDATE submissions AS s
JOIN users AS u ON s.user_id = u.id
SET s.group_id = u.group_id
WHERE s.group_id IS NULL
"""
)
)
else:
connection.execute(
text(
"""
UPDATE submissions
SET group_id = (
SELECT users.group_id
FROM users
WHERE users.id = submissions.user_id
)
WHERE group_id IS NULL
"""
)
)
ensure_submission_constraints()
relax_legacy_task_media_columns()
migrate_task_media_to_files()
def initialize_database() -> None:
settings.docker_root.mkdir(parents=True, exist_ok=True)
settings.upload_root.mkdir(parents=True, exist_ok=True)
settings.task_media_root.mkdir(parents=True, exist_ok=True)
Base.metadata.create_all(bind=engine)
upgrade_schema()
def seed_super_admin(db: Session) -> None:
admin = db.query(Admin).filter(Admin.username == settings.admin_username).first()
if not admin:
admin = Admin(
username=settings.admin_username,
display_name="超级管理员",
role="superadmin",
password_hash=hash_password(settings.admin_password),
)
db.add(admin)
db.commit()
return
changed = False
if admin.role != "superadmin":
admin.role = "superadmin"
changed = True
if not verify_password(settings.admin_password, admin.password_hash):
admin.password_hash = hash_password(settings.admin_password)
changed = True
if changed:
db.add(admin)
db.commit()
|