cam / app /routes /admin.py
cacode's picture
Upload 67 files
cf289c1 verified
from __future__ import annotations
import io
import zipfile
from datetime import datetime, timedelta
from urllib.parse import quote
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from sqlalchemy.orm import Session, joinedload
from app.config import settings
from app.database import get_db
from app.models import Activity, Admin, Group, Submission, Task, User
from app.security import hash_password
from app.services.images import (
delete_file_if_exists,
list_image_files,
resolve_managed_path,
)
from app.services.leaderboard import build_leaderboard
from app.services.presence import ONLINE_WINDOW_SECONDS, is_online, unix_seconds
from app.services.review_queue import online_admins, rebalance_pending_reviews
from app.web import add_flash, local_now, redirect, render
router = APIRouter()
def require_admin(request: Request, db: Session) -> Admin | None:
admin = db.query(Admin).filter(Admin.id == (request.session.get("admin_id") or 0)).first()
if not admin or not admin.is_active:
return None
return admin
def require_super_admin(request: Request, db: Session) -> Admin | None:
admin = require_admin(request, db)
if not admin or admin.role != "superadmin":
return None
return admin
def parse_optional_group(group_id_value: str | None, db: Session) -> Group | None:
if not group_id_value:
return None
if not str(group_id_value).isdigit():
return None
return (
db.query(Group)
.options(joinedload(Group.members))
.filter(Group.id == int(group_id_value))
.first()
)
def ensure_group_capacity(group: Group | None, current_users: list[User] | None = None) -> None:
if not group:
return
current_users = current_users or []
current_count = len(group.members)
incoming_count = 0
for user in current_users:
belongs_to_target = user.group_id == group.id or (
getattr(user, "group", None) is not None and getattr(user.group, "id", None) == group.id
)
if not belongs_to_target:
incoming_count += 1
if current_count + incoming_count > group.max_members:
raise ValueError(f"{group.name} 的人数上限不足,请调整人数上限或减少选中人数。")
def normalize_image_url(raw_value: str | None, field_label: str, required: bool = False) -> str | None:
value = str(raw_value or "").strip()
if not value:
if required:
raise ValueError(f"请填写{field_label}。")
return None
if not (value.startswith("http://") or value.startswith("https://")):
raise ValueError(f"{field_label}必须以 http:// 或 https:// 开头。")
if len(value) > 1000:
raise ValueError(f"{field_label}过长,请检查链接是否正确。")
return value
def parse_activity_fields(form) -> dict:
title = str(form.get("title", "")).strip()
description = str(form.get("description", "")).strip()
start_raw = str(form.get("start_at", "")).strip()
deadline_raw = str(form.get("deadline_at", "")).strip()
clue_interval_raw = str(form.get("clue_interval_minutes", "")).strip()
is_visible = form.get("is_visible") == "on"
leaderboard_visible = form.get("leaderboard_visible") == "on"
if not title or not start_raw or not deadline_raw:
raise ValueError("请完整填写活动标题、开始时间和截止时间。")
try:
start_at = datetime.fromisoformat(start_raw)
deadline_at = datetime.fromisoformat(deadline_raw)
except ValueError as exc:
raise ValueError("时间格式不正确。") from exc
if deadline_at <= start_at:
raise ValueError("截止时间必须晚于开始时间。")
try:
clue_interval_minutes = int(clue_interval_raw) if clue_interval_raw else None
except ValueError as exc:
raise ValueError("线索发布时间间隔必须是数字。") from exc
return {
"title": title,
"description": description,
"start_at": start_at,
"deadline_at": deadline_at,
"clue_interval_minutes": clue_interval_minutes,
"is_visible": is_visible,
"leaderboard_visible": leaderboard_visible,
}
def compute_clue_release_at(
start_at: datetime,
clue_interval_minutes: int | None,
display_order: int,
has_clue_image: bool,
) -> datetime | None:
if not has_clue_image:
return None
if clue_interval_minutes and clue_interval_minutes > 0:
return start_at + timedelta(minutes=clue_interval_minutes * display_order)
return start_at
def apply_task_schedule(activity: Activity, tasks: list[Task]) -> None:
for index, task in enumerate(tasks, start=1):
task.display_order = index
task.clue_release_at = compute_clue_release_at(
activity.start_at,
activity.clue_interval_minutes,
index,
bool(task.clue_image_url or task.clue_image_path or task.clue_image_filename),
)
def cleanup_submission_files(submissions: list[Submission]) -> None:
for submission in submissions:
if not submission.file_path:
continue
file_path = Path(submission.file_path)
if file_path.exists():
file_path.unlink(missing_ok=True)
def serialize_submission(submission: Submission) -> dict:
return {
"id": submission.id,
"task_title": submission.task.title if submission.task else "",
"activity_title": submission.task.activity.title if submission.task and submission.task.activity else "",
"group_name": submission.group.name if submission.group else "未分组",
"uploader_name": submission.user.full_name if submission.user else "未知成员",
"student_id": submission.user.student_id if submission.user else "",
"status": submission.status,
"feedback": submission.feedback,
"created_at": submission.created_at.strftime("%Y-%m-%d %H:%M") if submission.created_at else "-",
"reviewed_at": submission.reviewed_at.strftime("%Y-%m-%d %H:%M") if submission.reviewed_at else None,
"reviewed_by_name": submission.reviewed_by.display_name if submission.reviewed_by else None,
"assigned_admin_name": submission.assigned_admin.display_name if submission.assigned_admin else None,
"image_url": f"/media/submissions/{submission.id}",
"download_url": f"/media/submissions/{submission.id}?download=1",
}
def display_admin_role(role: str) -> str:
return "超级管理员" if role == "superadmin" else "管理员"
def serialize_presence_entry(name: str, role_label: str, last_seen_at, now: datetime) -> dict:
return {
"name": name,
"role_label": role_label,
"last_seen_at": last_seen_at.strftime("%Y-%m-%d %H:%M:%S") if last_seen_at else "-",
"last_seen_ts": unix_seconds(last_seen_at),
"is_online": is_online(last_seen_at, now),
}
def cleanup_task_files(task: Task) -> None:
delete_file_if_exists(task.image_path, settings.task_media_root)
delete_file_if_exists(task.clue_image_path, settings.task_media_root)
def cleanup_activity_files(activity: Activity) -> None:
all_submissions: list[Submission] = []
for task in activity.tasks:
all_submissions.extend(task.submissions)
cleanup_task_files(task)
cleanup_submission_files(all_submissions)
def normalize_path_key(file_path: str | Path) -> str:
return str(Path(file_path).resolve())
def format_file_size(file_size: int) -> str:
if file_size >= 1024 * 1024:
return f"{file_size / (1024 * 1024):.2f} MB"
if file_size >= 1024:
return f"{file_size / 1024:.1f} KB"
return f"{file_size} B"
def build_image_reference_index(db: Session) -> dict[str, list[dict]]:
reference_index: dict[str, list[dict]] = {}
tasks = db.query(Task).options(joinedload(Task.activity)).all()
for task in tasks:
if task.image_path:
reference_index.setdefault(normalize_path_key(task.image_path), []).append(
{
"type": "task",
"label": f"任务主图 · {task.activity.title if task.activity else '未知活动'} / {task.title}",
}
)
if task.clue_image_path:
reference_index.setdefault(normalize_path_key(task.clue_image_path), []).append(
{
"type": "task",
"label": f"线索图 · {task.activity.title if task.activity else '未知活动'} / {task.title}",
}
)
submissions = (
db.query(Submission)
.options(
joinedload(Submission.user),
joinedload(Submission.group),
joinedload(Submission.task).joinedload(Task.activity),
)
.all()
)
for submission in submissions:
if not submission.file_path:
continue
reference_index.setdefault(normalize_path_key(submission.file_path), []).append(
{
"type": "submission",
"label": (
f"用户提交图 · {submission.task.activity.title if submission.task and submission.task.activity else '未知活动'}"
f" / {submission.task.title if submission.task else '未知任务'}"
f" / {submission.group.name if submission.group else '未分组'}"
),
}
)
return reference_index
def build_image_inventory(db: Session, scope: str) -> tuple[list[dict], dict]:
reference_index = build_image_reference_index(db)
image_paths = list_image_files(settings.docker_root)
items: list[dict] = []
referenced_count = 0
orphan_count = 0
for image_path in image_paths:
relative_path = image_path.relative_to(settings.docker_root).as_posix()
references = reference_index.get(normalize_path_key(image_path), [])
is_referenced = bool(references)
top_level = relative_path.split("/", 1)[0] if "/" in relative_path else relative_path
if scope == "tasks" and top_level != "tasks":
continue
if scope == "submissions" and top_level != "submissions":
continue
if scope == "referenced" and not is_referenced:
continue
if scope == "orphan" and is_referenced:
continue
if is_referenced:
referenced_count += 1
else:
orphan_count += 1
encoded_path = quote(relative_path, safe="/")
items.append(
{
"relative_path": relative_path,
"file_name": image_path.name,
"category": top_level,
"size_label": format_file_size(image_path.stat().st_size),
"modified_at": datetime.fromtimestamp(image_path.stat().st_mtime).strftime("%Y-%m-%d %H:%M"),
"is_referenced": is_referenced,
"reference_labels": [item["label"] for item in references],
"preview_url": f"/media/library/{encoded_path}",
"download_url": f"/media/library/{encoded_path}?download=1",
}
)
summary = {
"total_count": len(image_paths),
"referenced_count": sum(1 for path in image_paths if reference_index.get(normalize_path_key(path))),
"orphan_count": sum(1 for path in image_paths if not reference_index.get(normalize_path_key(path))),
"docker_root": str(settings.docker_root),
}
return items, summary
def recent_reviews_query(db: Session, activity_id: int | None = None):
query = (
db.query(Submission)
.options(
joinedload(Submission.user),
joinedload(Submission.group),
joinedload(Submission.task).joinedload(Task.activity),
joinedload(Submission.reviewed_by),
joinedload(Submission.assigned_admin),
)
.filter(Submission.status.in_(["approved", "rejected"]))
.order_by(Submission.reviewed_at.desc(), Submission.id.desc())
)
if activity_id is not None:
query = query.join(Submission.task).filter(Task.activity_id == activity_id)
return query
def pick_replacement_user(
db: Session,
group_id: int | None,
task_id: int,
excluded_ids: set[int],
) -> User | None:
if not group_id:
return None
candidates = (
db.query(User)
.filter(User.group_id == group_id, User.id.notin_(excluded_ids))
.order_by(User.id.asc())
.all()
)
for candidate in candidates:
existing_submission = (
db.query(Submission.id)
.filter(Submission.user_id == candidate.id, Submission.task_id == task_id)
.first()
)
if not existing_submission:
return candidate
return None
def delete_users_and_handle_submissions(db: Session, users: list[User]) -> tuple[int, int]:
excluded_ids = {user.id for user in users}
removed_submission_count = 0
for user in users:
submissions = db.query(Submission).filter(Submission.user_id == user.id).all()
for submission in submissions:
replacement = pick_replacement_user(db, submission.group_id, submission.task_id, excluded_ids)
if replacement:
submission.user_id = replacement.id
db.add(submission)
continue
cleanup_submission_files([submission])
db.delete(submission)
removed_submission_count += 1
db.delete(user)
return len(users), removed_submission_count
async def collect_task_payloads(
form,
title_key: str,
description_key: str,
image_key: str,
clue_key: str,
) -> list[dict]:
task_titles = form.getlist(title_key)
task_descriptions = form.getlist(description_key)
task_images = form.getlist(image_key)
task_clue_images = form.getlist(clue_key)
tasks_payload = []
for index, raw_title in enumerate(task_titles):
task_title = str(raw_title).strip()
task_description = (
str(task_descriptions[index]).strip() if index < len(task_descriptions) else ""
)
image_url = normalize_image_url(
task_images[index] if index < len(task_images) else None,
f"第 {index + 1} 个新增任务的主图链接",
required=bool(task_title),
)
clue_image_url = normalize_image_url(
task_clue_images[index] if index < len(task_clue_images) else None,
f"第 {index + 1} 个新增任务的线索图链接",
required=False,
)
if not task_title and not image_url:
continue
if not task_title:
raise ValueError(f"第 {index + 1} 个新增任务需要填写标题。")
if not image_url:
raise ValueError(f"第 {index + 1} 个新增任务需要填写主图链接。")
tasks_payload.append(
{
"title": task_title,
"description": task_description,
"image_url": image_url,
"clue_image_url": clue_image_url,
}
)
return tasks_payload
@router.get("/admin/dashboard")
def admin_dashboard(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
now = local_now()
admin_rows = db.query(Admin).order_by(Admin.id.asc()).all()
user_rows = db.query(User).order_by(User.id.asc()).all()
stats = {
"user_count": len(user_rows),
"group_count": db.query(Group).count(),
"activity_count": db.query(Activity).count(),
"pending_count": db.query(Submission).filter(Submission.status == "pending").count(),
"admin_count": len(admin_rows),
"online_admin_count": sum(1 for item in admin_rows if is_online(item.last_seen_at, now)),
"online_user_count": sum(1 for item in user_rows if is_online(item.last_seen_at, now)),
}
recent_activities = (
db.query(Activity)
.options(joinedload(Activity.tasks))
.order_by(Activity.created_at.desc())
.limit(5)
.all()
)
online_overview = None
if admin.role == "superadmin":
online_overview = {
"admins": [
serialize_presence_entry(item.display_name, "管理员", item.last_seen_at, now)
for item in admin_rows
],
"users": [
serialize_presence_entry(item.full_name, "用户", item.last_seen_at, now)
for item in user_rows
],
}
return render(
request,
"admin_dashboard.html",
{
"page_title": "管理员总览",
"admin": admin,
"stats": stats,
"recent_activities": recent_activities,
"online_overview": online_overview,
},
)
@router.get("/admin/users")
def admin_users(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
users = db.query(User).options(joinedload(User.group)).order_by(User.student_id.asc()).all()
groups = db.query(Group).options(joinedload(Group.members)).order_by(Group.name.asc()).all()
return render(
request,
"admin_users.html",
{"page_title": "用户管理", "admin": admin, "users": users, "groups": groups},
)
@router.post("/admin/users")
async def create_user(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
student_id = str(form.get("student_id", "")).strip()
full_name = str(form.get("full_name", "")).strip()
password = str(form.get("password", "")).strip()
group = parse_optional_group(form.get("group_id"), db)
if not student_id or not full_name or not password:
add_flash(request, "error", "请完整填写学号、姓名和密码。")
return redirect("/admin/users")
if db.query(User).filter(User.student_id == student_id).first():
add_flash(request, "error", "该学号已存在。")
return redirect("/admin/users")
try:
ensure_group_capacity(group, [])
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect("/admin/users")
user = User(
student_id=student_id,
full_name=full_name,
password_hash=hash_password(password),
group=group,
)
db.add(user)
db.commit()
add_flash(request, "success", f"用户 {full_name} 已创建。")
return redirect("/admin/users")
@router.post("/admin/users/import")
async def import_users(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
raw_lines = str(form.get("import_text", "")).splitlines()
group = parse_optional_group(form.get("group_id"), db)
parsed_users = []
skipped = []
seen_student_ids = set()
for index, line in enumerate(raw_lines, start=1):
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 2:
skipped.append(f"第 {index} 行格式不正确")
continue
student_id = parts[-1].strip()
full_name = " ".join(parts[:-1]).strip()
if not full_name or not student_id:
skipped.append(f"第 {index} 行格式不正确")
continue
if student_id in seen_student_ids or db.query(User).filter(User.student_id == student_id).first():
skipped.append(f"{student_id} 已存在,已跳过")
continue
seen_student_ids.add(student_id)
parsed_users.append(
User(
student_id=student_id,
full_name=full_name,
password_hash=hash_password(student_id[-6:] if len(student_id) >= 6 else student_id),
group=group,
)
)
try:
ensure_group_capacity(group, parsed_users)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect("/admin/users")
for user in parsed_users:
db.add(user)
db.commit()
message = f"成功导入 {len(parsed_users)} 位用户。"
if skipped:
message += " 跳过:" + ";".join(skipped[:5])
add_flash(request, "success", message)
return redirect("/admin/users")
@router.post("/admin/users/bulk")
async def bulk_manage_users(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
selected_ids = [int(value) for value in form.getlist("user_ids") if str(value).isdigit()]
action = str(form.get("bulk_action", "")).strip()
if not selected_ids:
add_flash(request, "error", "请先勾选用户。")
return redirect("/admin/users")
users = db.query(User).options(joinedload(User.group)).filter(User.id.in_(selected_ids)).all()
if not users:
add_flash(request, "error", "未找到选中的用户。")
return redirect("/admin/users")
if action == "assign_group":
group = parse_optional_group(form.get("group_id"), db)
try:
ensure_group_capacity(group, users)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect("/admin/users")
for user in users:
user.group = group
db.add(user)
db.commit()
add_flash(request, "success", f"已批量更新 {len(users)} 位用户的小组。")
return redirect("/admin/users")
if action == "delete":
removed_user_count, removed_submission_count = delete_users_and_handle_submissions(db, users)
db.commit()
add_flash(
request,
"success",
f"已删除 {removed_user_count} 位用户,清理 {removed_submission_count} 条无可继承的打卡记录。",
)
return redirect("/admin/users")
add_flash(request, "error", "批量操作无效。")
return redirect("/admin/users")
@router.post("/admin/users/{user_id}/group")
async def assign_user_group(user_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
user = db.query(User).options(joinedload(User.group)).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="用户不存在")
form = await request.form()
group = parse_optional_group(form.get("group_id"), db)
try:
ensure_group_capacity(group, [user])
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect("/admin/users")
user.group = group
db.add(user)
db.commit()
add_flash(request, "success", f"已更新 {user.full_name} 的小组。")
return redirect("/admin/users")
@router.get("/admin/admins")
def admin_admins(request: Request, db: Session = Depends(get_db)):
admin = require_super_admin(request, db)
if not admin:
add_flash(request, "error", "只有超级管理员可以管理管理员账号。")
return redirect("/admin/dashboard")
now = local_now()
server_ts = unix_seconds(now)
admins = db.query(Admin).order_by(Admin.created_at.desc()).all()
admin_statuses = [
{
**serialize_presence_entry(item.display_name, display_admin_role(item.role), item.last_seen_at, now),
"username": item.username,
}
for item in admins
]
return render(
request,
"admin_admins.html",
{
"page_title": "管理员管理",
"admin": admin,
"admins": admins,
"admin_statuses": admin_statuses,
"presence_server_ts": server_ts,
"online_window_seconds": ONLINE_WINDOW_SECONDS,
},
)
@router.get("/api/admin/presence/overview")
def presence_overview(request: Request, db: Session = Depends(get_db)):
admin = require_super_admin(request, db)
if not admin:
return JSONResponse({"error": "forbidden"}, status_code=403)
now = local_now()
server_ts = unix_seconds(now)
admins = db.query(Admin).order_by(Admin.display_name.asc()).all()
return JSONResponse(
{
"server_ts": server_ts,
"online_window_seconds": ONLINE_WINDOW_SECONDS,
"admins": [
{
**serialize_presence_entry(item.display_name, display_admin_role(item.role), item.last_seen_at, now),
"username": item.username,
}
for item in admins
],
},
headers={"Cache-Control": "no-store"},
)
@router.post("/admin/admins")
async def create_admin(request: Request, db: Session = Depends(get_db)):
admin = require_super_admin(request, db)
if not admin:
add_flash(request, "error", "只有超级管理员可以新增管理员。")
return redirect("/admin/dashboard")
form = await request.form()
username = str(form.get("username", "")).strip()
display_name = str(form.get("display_name", "")).strip()
password = str(form.get("password", "")).strip()
role = str(form.get("role", "admin")).strip() or "admin"
if not username or not display_name or not password:
add_flash(request, "error", "请完整填写管理员信息。")
return redirect("/admin/admins")
if db.query(Admin).filter(Admin.username == username).first():
add_flash(request, "error", "管理员账号已存在。")
return redirect("/admin/admins")
new_admin = Admin(
username=username,
display_name=display_name,
password_hash=hash_password(password),
role="superadmin" if role == "superadmin" else "admin",
)
db.add(new_admin)
db.commit()
add_flash(request, "success", f"管理员 {display_name} 已创建。")
return redirect("/admin/admins")
@router.get("/admin/groups")
def admin_groups(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
groups = db.query(Group).options(joinedload(Group.members)).order_by(Group.created_at.asc()).all()
ungrouped_users = (
db.query(User)
.filter(User.group_id.is_(None))
.order_by(User.student_id.asc())
.all()
)
return render(
request,
"admin_groups.html",
{
"page_title": "小组管理",
"admin": admin,
"groups": groups,
"ungrouped_users": ungrouped_users,
},
)
@router.post("/admin/groups")
async def create_group(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
try:
max_members = max(1, int(str(form.get("max_members", "1"))))
except ValueError:
add_flash(request, "error", "小组人数上限需要是数字。")
return redirect("/admin/groups")
name = str(form.get("name", "")).strip()
if not name:
sequence = db.query(Group).count() + 1
name = f"第{sequence}组"
if db.query(Group).filter(Group.name == name).first():
add_flash(request, "error", "小组名称已存在。")
return redirect("/admin/groups")
group = Group(name=name, max_members=max_members)
db.add(group)
db.commit()
add_flash(request, "success", f"小组 {name} 已创建。")
return redirect("/admin/groups")
@router.post("/admin/groups/{group_id}/capacity")
async def update_group_capacity(group_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
group = db.query(Group).options(joinedload(Group.members)).filter(Group.id == group_id).first()
if not group:
raise HTTPException(status_code=404, detail="小组不存在")
form = await request.form()
try:
max_members = max(1, int(str(form.get("max_members", group.max_members))))
except ValueError:
add_flash(request, "error", "小组人数上限需要是数字。")
return redirect("/admin/groups")
if max_members < len(group.members):
add_flash(request, "error", "人数上限不能小于当前成员数。")
return redirect("/admin/groups")
group.max_members = max_members
db.add(group)
db.commit()
add_flash(request, "success", f"{group.name} 的人数上限已更新。")
return redirect("/admin/groups")
@router.get("/admin/activities")
def admin_activities(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activities = (
db.query(Activity)
.options(joinedload(Activity.tasks), joinedload(Activity.created_by))
.order_by(Activity.start_at.desc())
.all()
)
return render(
request,
"admin_activities.html",
{"page_title": "活动发布", "admin": admin, "activities": activities},
)
@router.get("/admin/activities/{activity_id}/edit")
def edit_activity_page(activity_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activity = (
db.query(Activity)
.options(
joinedload(Activity.created_by),
joinedload(Activity.tasks).joinedload(Task.submissions),
)
.filter(Activity.id == activity_id)
.first()
)
if not activity:
raise HTTPException(status_code=404, detail="活动不存在")
leaderboard = build_leaderboard(db, activity.id)
return render(
request,
"admin_activity_edit.html",
{
"page_title": f"编辑活动 · {activity.title}",
"admin": admin,
"activity": activity,
"leaderboard": leaderboard,
},
)
@router.post("/admin/activities")
async def create_activity(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
try:
activity_fields = parse_activity_fields(form)
tasks_payload = await collect_task_payloads(
form,
"task_title",
"task_description",
"task_image_url",
"task_clue_image_url",
)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect("/admin/activities")
if not tasks_payload:
add_flash(request, "error", "至少需要添加一个任务卡片。")
return redirect("/admin/activities")
activity = Activity(created_by_id=admin.id, **activity_fields)
db.add(activity)
db.flush()
tasks = []
for payload in tasks_payload:
task = Task(
activity=activity,
title=payload["title"],
description=payload["description"],
image_url=payload["image_url"],
clue_image_url=payload["clue_image_url"],
image_filename=payload["image_url"].split("/")[-1][:255] or "task-link",
clue_image_filename=(payload["clue_image_url"].split("/")[-1][:255] if payload["clue_image_url"] else None),
)
tasks.append(task)
db.add(task)
apply_task_schedule(activity, tasks)
db.commit()
add_flash(request, "success", f"活动 {activity.title} 已发布。")
return redirect("/admin/activities")
@router.post("/admin/activities/{activity_id}/edit")
async def update_activity(activity_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activity = (
db.query(Activity)
.options(joinedload(Activity.tasks))
.filter(Activity.id == activity_id)
.first()
)
if not activity:
raise HTTPException(status_code=404, detail="活动不存在")
form = await request.form()
try:
activity_fields = parse_activity_fields(form)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect(f"/admin/activities/{activity_id}/edit")
existing_task_ids = form.getlist("existing_task_id")
existing_task_titles = form.getlist("existing_task_title")
existing_task_descriptions = form.getlist("existing_task_description")
existing_task_image_urls = form.getlist("existing_task_image_url")
existing_task_clue_urls = form.getlist("existing_task_clue_image_url")
remove_clue_ids = {
int(value)
for value in form.getlist("existing_task_remove_clue")
if str(value).isdigit()
}
tasks_by_id = {task.id: task for task in activity.tasks}
ordered_tasks: list[Task] = []
seen_task_ids: set[int] = set()
try:
for index, raw_task_id in enumerate(existing_task_ids):
if not str(raw_task_id).isdigit():
raise ValueError("任务数据无效,请刷新页面后重试。")
task_id = int(raw_task_id)
task = tasks_by_id.get(task_id)
if not task or task_id in seen_task_ids:
raise ValueError("任务数据无效,请刷新页面后重试。")
seen_task_ids.add(task_id)
title = str(existing_task_titles[index]).strip() if index < len(existing_task_titles) else ""
description = str(existing_task_descriptions[index]).strip() if index < len(existing_task_descriptions) else ""
if not title:
raise ValueError(f"第 {index + 1} 个已有任务标题不能为空。")
image_url = normalize_image_url(
existing_task_image_urls[index] if index < len(existing_task_image_urls) else None,
f"第 {index + 1} 个已有任务的主图链接",
required=True,
)
clue_image_url = normalize_image_url(
existing_task_clue_urls[index] if index < len(existing_task_clue_urls) else None,
f"第 {index + 1} 个已有任务的线索图链接",
required=False,
)
if task_id in remove_clue_ids:
clue_image_url = None
task.title = title
task.description = description
task.image_url = image_url
task.image_filename = image_url.split("/")[-1][:255] or task.image_filename
if task.image_path:
delete_file_if_exists(task.image_path, settings.task_media_root)
task.image_path = None
if clue_image_url:
task.clue_image_url = clue_image_url
task.clue_image_filename = clue_image_url.split("/")[-1][:255] or task.clue_image_filename
if task.clue_image_path:
delete_file_if_exists(task.clue_image_path, settings.task_media_root)
task.clue_image_path = None
else:
if task.clue_image_path:
delete_file_if_exists(task.clue_image_path, settings.task_media_root)
task.clue_image_url = None
task.clue_image_path = None
task.clue_image_mime = None
task.clue_image_filename = None
ordered_tasks.append(task)
new_tasks_payload = await collect_task_payloads(
form,
"new_task_title",
"new_task_description",
"new_task_image_url",
"new_task_clue_image_url",
)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect(f"/admin/activities/{activity_id}/edit")
if len(seen_task_ids) != len(tasks_by_id):
add_flash(request, "error", "任务列表不完整,请刷新页面后重试。")
return redirect(f"/admin/activities/{activity_id}/edit")
for field_name, field_value in activity_fields.items():
setattr(activity, field_name, field_value)
all_tasks = list(ordered_tasks)
for payload in new_tasks_payload:
task = Task(
activity=activity,
title=payload["title"],
description=payload["description"],
image_url=payload["image_url"],
clue_image_url=payload["clue_image_url"],
image_filename=payload["image_url"].split("/")[-1][:255] or "task-link",
clue_image_filename=(payload["clue_image_url"].split("/")[-1][:255] if payload["clue_image_url"] else None),
)
db.add(task)
all_tasks.append(task)
if not all_tasks:
add_flash(request, "error", "活动至少需要保留一个任务。")
return redirect(f"/admin/activities/{activity_id}/edit")
apply_task_schedule(activity, all_tasks)
db.add(activity)
db.commit()
add_flash(request, "success", f"活动 {activity.title} 已更新。")
return redirect(f"/admin/activities/{activity_id}/edit")
@router.post("/admin/tasks/{task_id}/delete")
async def delete_task(task_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
task = db.query(Task).options(joinedload(Task.submissions)).filter(Task.id == task_id).first()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
activity = db.query(Activity).options(joinedload(Activity.tasks)).filter(Activity.id == task.activity_id).first()
if not activity:
raise HTTPException(status_code=404, detail="活动不存在")
if len(activity.tasks) <= 1:
add_flash(request, "error", "活动至少需要保留一个任务,不能删除最后一个任务。")
return redirect(f"/admin/activities/{activity.id}/edit")
cleanup_submission_files(task.submissions)
cleanup_task_files(task)
activity_id = activity.id
db.delete(task)
db.flush()
remaining_tasks = (
db.query(Task)
.filter(Task.activity_id == activity_id)
.order_by(Task.display_order.asc(), Task.id.asc())
.all()
)
apply_task_schedule(activity, remaining_tasks)
db.commit()
add_flash(request, "success", "任务已删除,剩余任务顺序和线索发布时间已自动更新。")
return redirect(f"/admin/activities/{activity_id}/edit")
@router.post("/admin/activities/{activity_id}/delete")
async def delete_activity(activity_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activity = (
db.query(Activity)
.options(joinedload(Activity.tasks).joinedload(Task.submissions))
.filter(Activity.id == activity_id)
.first()
)
if not activity:
raise HTTPException(status_code=404, detail="活动不存在")
activity_title = activity.title
cleanup_activity_files(activity)
db.delete(activity)
db.commit()
add_flash(request, "success", f"活动 {activity_title} 已删除,相关任务与本地提交图片也已清理。")
return redirect("/admin/activities")
@router.post("/admin/activities/{activity_id}/visibility")
async def update_activity_visibility(activity_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activity = db.get(Activity, activity_id)
if not activity:
raise HTTPException(status_code=404, detail="活动不存在")
form = await request.form()
activity.is_visible = form.get("is_visible") == "on"
activity.leaderboard_visible = form.get("leaderboard_visible") == "on"
db.add(activity)
db.commit()
add_flash(request, "success", f"已更新 {activity.title} 的活动可见性和排行榜设置。")
return redirect("/admin/activities")
@router.get("/admin/reviews")
def admin_reviews(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
activity_filter = request.query_params.get("activity_id", "")
activity_id = int(activity_filter) if activity_filter.isdigit() else None
pending_submissions = rebalance_pending_reviews(db, activity_id)
assigned_submissions = [
submission for submission in pending_submissions if submission.assigned_admin_id == admin.id
]
recent_submissions = recent_reviews_query(db, activity_id).limit(20).all()
activities = db.query(Activity).order_by(Activity.start_at.desc()).all()
return render(
request,
"admin_reviews.html",
{
"page_title": "审核中心",
"admin": admin,
"activities": activities,
"activity_filter": activity_filter,
"assigned_submissions": assigned_submissions,
"recent_submissions": recent_submissions,
"online_admin_count": len(online_admins(db)),
},
)
@router.get("/api/admin/reviews/feed")
def review_feed(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return JSONResponse({"error": "forbidden"}, status_code=403)
activity_filter = request.query_params.get("activity_id", "")
activity_id = int(activity_filter) if activity_filter.isdigit() else None
pending_submissions = rebalance_pending_reviews(db, activity_id)
assigned_submissions = [
submission for submission in pending_submissions if submission.assigned_admin_id == admin.id
]
recent_submissions = recent_reviews_query(db, activity_id).limit(20).all()
return JSONResponse(
{
"online_admin_count": len(online_admins(db)),
"assigned_submissions": [serialize_submission(submission) for submission in assigned_submissions],
"recent_submissions": [serialize_submission(submission) for submission in recent_submissions],
}
)
@router.post("/admin/submissions/{submission_id}/review")
async def review_submission(submission_id: int, request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
if request.headers.get("X-Requested-With") == "fetch":
return JSONResponse({"error": "forbidden"}, status_code=403)
return redirect("/admin")
submission = (
db.query(Submission)
.options(
joinedload(Submission.task).joinedload(Task.activity),
joinedload(Submission.user),
joinedload(Submission.group),
joinedload(Submission.reviewed_by),
joinedload(Submission.assigned_admin),
)
.filter(Submission.id == submission_id)
.first()
)
if not submission:
if request.headers.get("X-Requested-With") == "fetch":
return JSONResponse({"error": "not_found"}, status_code=404)
raise HTTPException(status_code=404, detail="提交记录不存在")
form = await request.form()
decision = str(form.get("decision", "")).strip()
feedback = str(form.get("feedback", "")).strip() or None
if decision not in {"approved", "rejected"}:
if request.headers.get("X-Requested-With") == "fetch":
return JSONResponse({"error": "invalid_decision"}, status_code=400)
add_flash(request, "error", "审核操作无效。")
return redirect("/admin/reviews")
if submission.status != "pending":
message = "该提交已被其他管理员处理,页面即将刷新到最新状态。"
if request.headers.get("X-Requested-With") == "fetch":
return JSONResponse({"error": "already_reviewed", "message": message}, status_code=409)
add_flash(request, "info", message)
return redirect("/admin/reviews")
submission.status = decision
submission.feedback = feedback
submission.reviewed_by_id = admin.id
submission.assigned_admin_id = admin.id
submission.assigned_at = submission.assigned_at or local_now()
submission.reviewed_at = local_now()
submission.approved_at = submission.created_at if decision == "approved" else None
db.add(submission)
db.commit()
db.refresh(submission)
if request.headers.get("X-Requested-With") == "fetch":
return JSONResponse({"ok": True, "submission": serialize_submission(submission)})
add_flash(request, "success", "审核结果已保存。")
return redirect("/admin/reviews")
@router.post("/admin/reviews/download")
async def download_selected_submissions(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
selected_ids = [int(value) for value in form.getlist("submission_ids") if str(value).isdigit()]
if not selected_ids:
add_flash(request, "error", "请先勾选需要下载的图片。")
return redirect("/admin/reviews")
submissions = (
db.query(Submission)
.options(joinedload(Submission.user), joinedload(Submission.group), joinedload(Submission.task).joinedload(Task.activity))
.filter(Submission.id.in_(selected_ids))
.all()
)
archive = io.BytesIO()
with zipfile.ZipFile(archive, mode="w", compression=zipfile.ZIP_DEFLATED) as zip_file:
for submission in submissions:
file_path = Path(submission.file_path)
if not file_path.exists():
continue
suffix = file_path.suffix or ".jpg"
activity_title = submission.task.activity.title.replace("/", "_")
task_title = submission.task.title.replace("/", "_")
group_name = (submission.group.name if submission.group else "未分组").replace("/", "_")
uploader_name = (submission.user.full_name if submission.user else "unknown").replace("/", "_")
archive_name = f"{activity_title}/{group_name}/{task_title}_{uploader_name}{suffix}"
zip_file.write(file_path, archive_name)
archive.seek(0)
filename = f"checkin_submissions_{local_now().strftime('%Y%m%d_%H%M%S')}.zip"
return StreamingResponse(
archive,
media_type="application/zip",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
@router.get("/admin/images")
def admin_images(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
scope = request.query_params.get("scope", "all").strip()
if scope not in {"all", "tasks", "submissions", "referenced", "orphan"}:
scope = "all"
image_items, summary = build_image_inventory(db, scope)
return render(
request,
"admin_images.html",
{
"page_title": "图片管理",
"admin": admin,
"scope": scope,
"image_items": image_items,
"summary": summary,
},
)
@router.post("/admin/images/delete")
async def delete_managed_image(request: Request, db: Session = Depends(get_db)):
admin = require_admin(request, db)
if not admin:
return redirect("/admin")
form = await request.form()
relative_path = str(form.get("relative_path", "")).strip()
scope = str(form.get("scope", "all")).strip()
if scope not in {"all", "tasks", "submissions", "referenced", "orphan"}:
scope = "all"
if not relative_path:
add_flash(request, "error", "图片路径不能为空。")
return redirect(f"/admin/images?scope={scope}")
try:
file_path = resolve_managed_path(settings.docker_root, relative_path)
except ValueError as exc:
add_flash(request, "error", str(exc))
return redirect(f"/admin/images?scope={scope}")
if not file_path.exists() or not file_path.is_file():
add_flash(request, "error", "图片不存在或已被删除。")
return redirect(f"/admin/images?scope={scope}")
references = build_image_reference_index(db).get(normalize_path_key(file_path), [])
if references:
add_flash(request, "error", "该图片仍被系统引用,暂时不能直接删除。")
return redirect(f"/admin/images?scope={scope}")
delete_file_if_exists(str(file_path), settings.docker_root)
add_flash(request, "success", f"已删除图片:{relative_path}")
return redirect(f"/admin/images?scope={scope}")