from __future__ import annotations import atexit import json import re import time from datetime import date as date_cls, time as time_cls from functools import wraps from typing import Callable from urllib.parse import urlparse from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for from core.config import AppConfig from core.db import ( DEFAULT_REGISTRATION_CODE_MAX_USES, Database, MAX_REFRESH_INTERVAL_SECONDS, MIN_REFRESH_INTERVAL_SECONDS, normalize_registration_code, ) from core.runtime_logging import configure_logging, get_logger from core.security import SecretBox, hash_password, verify_password from core.task_manager import TaskManager configure_logging() APP_LOGGER = get_logger("sacc.web") config = AppConfig.load() store = Database( config.db_path, default_parallel_limit=config.default_parallel_limit, mysql_ssl_ca_path=config.mysql_ssl_ca_path, ) store.init_db() secret_box = SecretBox(config.encryption_key) task_manager = TaskManager(config=config, store=store, secret_box=secret_box) def _seed_legacy_user() -> None: if store.list_users(): return legacy_path = config.root_dir / "user_data.json" if not legacy_path.exists(): return try: payload = json.loads(legacy_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return student_id = str(payload.get("std_id", "")).strip() password = str(payload.get("password", "")).strip() if not student_id or not password: return user_id = store.create_user(student_id, secret_box.encrypt(password), "Legacy User") for source_key, category in (("a", "plan"), ("b", "free")): for course in payload.get("course", {}).get(source_key, []): course_id = str(course.get("course_id", "")).strip() course_index = str(course.get("course_index", "")).strip() if course_id and course_index: store.add_course(user_id, category, course_id, course_index) _seed_legacy_user() task_manager.start() atexit.register(task_manager.shutdown) app = Flask(__name__, template_folder="templates", static_folder="static") app.secret_key = config.session_secret app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax") APP_LOGGER.info( "Application bootstrap complete | data_dir=%s db_path=%s backend=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s schedule_timezone=%s", config.data_dir, config.db_path, config.database_backend, config.chrome_binary, config.chromedriver_path, config.default_parallel_limit, config.schedule_timezone, ) CATEGORY_LABELS = { "plan": "方案选课", "free": "自由选课", } TASK_LABELS = { "pending": "排队中", "running": "执行中", "cancel_requested": "停止中", "completed": "已完成", "stopped": "已停止", "failed": "失败", } SKIPPED_REQUEST_LOG_PREFIXES = ( "/static/", "/api/", ) SKIPPED_REQUEST_LOG_PATHS = { "/favicon.ico", } COURSE_ID_PATTERN = re.compile(r"^[0-9A-Za-z]{1,32}$") COURSE_INDEX_PATTERN = re.compile(r"^[0-9A-Za-z]{1,8}$") REGISTRATION_CODE_PATTERN = re.compile(r"^[A-Z0-9-]{6,64}$") def _current_actor_label() -> str: role = session.get("role", "guest") if role == "user": return f"user:{session.get('user_id', '-')}" if role == "admin": return f"admin:{session.get('admin_username', '-')}" return "guest" @app.before_request def before_request_logging() -> None: g.request_started_at = time.perf_counter() @app.after_request def after_request_logging(response): if request.path in SKIPPED_REQUEST_LOG_PATHS: return response if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES): return response started_at = getattr(g, "request_started_at", None) duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000 remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-" APP_LOGGER.info( "HTTP %s %s -> %s in %.1fms | actor=%s remote=%s", request.method, request.path, response.status_code, duration_ms, _current_actor_label(), remote_addr, ) return response @app.context_processor def inject_globals() -> dict: return { "category_labels": CATEGORY_LABELS, "task_labels": TASK_LABELS, "session_role": session.get("role", "guest"), "refresh_interval_min": MIN_REFRESH_INTERVAL_SECONDS, "refresh_interval_max": MAX_REFRESH_INTERVAL_SECONDS, "default_refresh_interval_seconds": config.poll_interval_seconds, "default_registration_code_max_uses": DEFAULT_REGISTRATION_CODE_MAX_USES, } def _login_required(role: str) -> Callable: def decorator(view: Callable) -> Callable: @wraps(view) def wrapped(*args, **kwargs): current_role = session.get("role") if role == "user" and current_role != "user": flash("请先登录学生账号。", "warning") return redirect(url_for("login")) if role == "admin" and current_role != "admin": flash("请先登录管理员账号。", "warning") return redirect(url_for("admin_login")) return view(*args, **kwargs) return wrapped return decorator def _get_current_user() -> dict | None: user_id = session.get("user_id") if not user_id: return None return store.get_user(int(user_id)) def _get_admin_identity() -> dict: return { "username": session.get("admin_username", ""), "is_super_admin": bool(session.get("is_super_admin", False)), } def _normalize_course_token(raw_value: str) -> str: return re.sub(r"\s+", "", str(raw_value or "")).upper() def _validate_course_target(course_id: str, course_index: str) -> tuple[str, str] | None: normalized_course_id = _normalize_course_token(course_id) normalized_course_index = _normalize_course_token(course_index) if not COURSE_ID_PATTERN.fullmatch(normalized_course_id): return None if not COURSE_INDEX_PATTERN.fullmatch(normalized_course_index): return None return normalized_course_id, normalized_course_index def _parse_refresh_interval(raw_value: str | None, *, default: int) -> int: raw_text = str(raw_value or "").strip() if not raw_text: return default try: interval = int(raw_text) except ValueError as exc: raise ValueError(f"刷新间隔必须是 {MIN_REFRESH_INTERVAL_SECONDS} 到 {MAX_REFRESH_INTERVAL_SECONDS} 之间的整数。") from exc if interval < MIN_REFRESH_INTERVAL_SECONDS or interval > MAX_REFRESH_INTERVAL_SECONDS: raise ValueError(f"刷新间隔必须在 {MIN_REFRESH_INTERVAL_SECONDS} 到 {MAX_REFRESH_INTERVAL_SECONDS} 秒之间。") return interval def _parse_registration_code_max_uses(raw_value: str | None) -> int: raw_text = str(raw_value or "").strip() if not raw_text: return DEFAULT_REGISTRATION_CODE_MAX_USES try: value = int(raw_text) except ValueError as exc: raise ValueError("注册码可用次数必须是 1 到 99 之间的整数。") from exc if value < 1 or value > 99: raise ValueError("注册码可用次数必须在 1 到 99 之间。") return value def _parse_iso_date(raw_value: str | None, label: str) -> str: raw_text = str(raw_value or "").strip() if not raw_text: raise ValueError(f"{label}不能为空。") try: return date_cls.fromisoformat(raw_text).isoformat() except ValueError as exc: raise ValueError(f"{label}格式无效,请使用 YYYY-MM-DD。") from exc def _parse_clock_time(raw_value: str | None, label: str) -> str: raw_text = str(raw_value or "").strip() if not raw_text: raise ValueError(f"{label}不能为空。") try: return time_cls.fromisoformat(raw_text).strftime("%H:%M") except ValueError as exc: raise ValueError(f"{label}格式无效,请使用 HH:MM。") from exc def _parse_schedule_form(form) -> dict: enabled = str(form.get("schedule_enabled", "")).lower() in {"1", "true", "on", "yes"} start_date_raw = form.get("start_date", "") end_date_raw = form.get("end_date", "") daily_start_time_raw = form.get("daily_start_time", "") daily_stop_time_raw = form.get("daily_stop_time", "") has_any_value = enabled or any(str(value or "").strip() for value in (start_date_raw, end_date_raw, daily_start_time_raw, daily_stop_time_raw)) if not has_any_value: return { "is_enabled": False, "start_date": None, "end_date": None, "daily_start_time": None, "daily_stop_time": None, } start_date = _parse_iso_date(start_date_raw, "开始日期") end_date = _parse_iso_date(end_date_raw, "结束日期") daily_start_time = _parse_clock_time(daily_start_time_raw, "每日启动时间") daily_stop_time = _parse_clock_time(daily_stop_time_raw, "每日停止时间") if end_date < start_date: raise ValueError("结束日期不能早于开始日期。") if daily_stop_time <= daily_start_time: raise ValueError("每日停止时间必须晚于每日启动时间。") return { "is_enabled": enabled, "start_date": start_date, "end_date": end_date, "daily_start_time": daily_start_time, "daily_stop_time": daily_stop_time, } def _user_owns_course(user_id: int, course_target_id: int) -> bool: return any(course["id"] == course_target_id for course in store.list_courses_for_user(user_id)) def _build_user_dashboard_context(user: dict) -> dict: return { "current_user": user, "courses": store.list_courses_for_user(user["id"]), "task": store.get_latest_task_for_user(user["id"]), "schedule": store.get_user_schedule(user["id"]), "recent_logs": store.list_recent_logs(user_id=user["id"], limit=config.logs_page_size), } def _load_admin_users(*, include_courses: bool = True, include_latest_task: bool = True, include_schedule: bool = True) -> list[dict]: users = store.list_users() for user in users: if include_courses: user["courses"] = store.list_courses_for_user(user["id"]) if include_latest_task: user["latest_task"] = store.get_latest_task_for_user(user["id"]) if include_schedule: user["schedule"] = store.get_user_schedule(user["id"]) return users def _build_admin_base_context(active_page: str, *, page_title: str, page_description: str) -> dict: admin_identity = _get_admin_identity() return { "admin_identity": admin_identity, "is_super_admin": admin_identity["is_super_admin"], "admin_page": active_page, "page_title": page_title, "page_description": page_description, } def _build_admin_dashboard_context() -> dict: context = _build_admin_base_context( "overview", page_title="Overview", page_description="Review system metrics, recent tasks, and admin-level settings.", ) context.update( { "stats": store.get_admin_stats(), "recent_tasks": store.list_recent_tasks(limit=18), "parallel_limit": store.get_parallel_limit(), "admins": store.list_admins(), "status_url": url_for("admin_status"), } ) return context def _build_admin_users_context() -> dict: context = _build_admin_base_context( "users", page_title="Users", page_description="Manage user accounts, course targets, and task operations.", ) context.update( { "users": _load_admin_users(include_courses=True, include_latest_task=True, include_schedule=True), } ) return context def _build_admin_schedules_context() -> dict: context = _build_admin_base_context( "schedules", page_title="Schedules", page_description="Configure per-user auto start and stop windows by date and time.", ) users = _load_admin_users(include_courses=False, include_latest_task=True, include_schedule=True) context.update( { "users": users, "stats": store.get_admin_stats(), } ) return context def _build_admin_registration_codes_context() -> dict: context = _build_admin_base_context( "registration_codes", page_title="Codes", page_description="Create registration codes and inspect their usage state.", ) context.update( { "registration_codes": store.list_registration_codes(limit=60), "stats": store.get_admin_stats(), } ) return context def _build_admin_logs_context() -> dict: recent_logs = store.list_recent_logs(limit=config.logs_page_size) context = _build_admin_base_context( "logs", page_title="Logs", page_description="Review live global logs for task execution and troubleshooting.", ) context.update( { "recent_logs": recent_logs, "log_stream_url": url_for("stream_admin_logs", last_id=recent_logs[-1]["id"] if recent_logs else 0), } ) return context def _queue_task_for_user(user: dict, *, requested_by: str, requested_by_role: str) -> tuple[dict, bool]: return task_manager.queue_task(user["id"], requested_by=requested_by, requested_by_role=requested_by_role) def _latest_log_id(logs: list[dict]) -> int: if not logs: return 0 return int(logs[-1]["id"]) def _admin_post_redirect(default_endpoint: str): target = (request.form.get("next") or request.referrer or "").strip() if target: parsed = urlparse(target) same_host = not parsed.netloc or parsed.netloc == request.host if same_host and (parsed.path or "").startswith("/admin"): safe_target = parsed.path or url_for(default_endpoint) if parsed.query: safe_target = f"{safe_target}?{parsed.query}" return redirect(safe_target) return redirect(url_for(default_endpoint)) @app.get("/") def index(): if session.get("role") == "user": return redirect(url_for("dashboard")) if session.get("role") == "admin": return redirect(url_for("admin_dashboard")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": student_id = request.form.get("student_id", "").strip() password = request.form.get("password", "") user = store.get_user_by_student_id(student_id) if user is None: flash("没有找到该学号对应的账号。如果你有注册码,请先完成注册。", "danger") return render_template("login.html") if not user["is_active"]: flash("该账号已被管理员禁用。", "danger") return render_template("login.html") try: stored_password = secret_box.decrypt(user["password_encrypted"]) except Exception: flash("账号数据损坏,请联系管理员重置密码。", "danger") return render_template("login.html") if stored_password != password: flash("学号或密码不正确。", "danger") return render_template("login.html") session.clear() session["role"] = "user" session["user_id"] = user["id"] return redirect(url_for("dashboard")) return render_template("login.html") @app.route("/register", methods=["GET", "POST"]) def register(): if request.method == "POST": registration_code = normalize_registration_code(request.form.get("registration_code", "")) student_id = request.form.get("student_id", "").strip() password = request.form.get("password", "").strip() display_name = request.form.get("display_name", "").strip() if not REGISTRATION_CODE_PATTERN.fullmatch(registration_code): flash("请输入有效的注册码。", "danger") return render_template("register.html") if not student_id.isdigit() or not password: flash("请填写学号和教务处密码。", "danger") return render_template("register.html") try: store.register_user_with_code( registration_code, student_id, secret_box.encrypt(password), display_name, refresh_interval_seconds=config.poll_interval_seconds, ) except ValueError as exc: flash(str(exc), "danger") return render_template("register.html") flash("注册成功,请使用学号和教务处密码登录。", "success") return redirect(url_for("login")) return render_template("register.html") @app.post("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/admin", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") is_super_admin = username == config.super_admin_username and password == config.super_admin_password admin_row = store.get_admin_by_username(username) is_regular_admin = bool(admin_row and verify_password(admin_row["password_hash"], password)) if not is_super_admin and not is_regular_admin: flash("管理员账号或密码错误。", "danger") return render_template("admin_login.html") session.clear() session["role"] = "admin" session["admin_username"] = username session["is_super_admin"] = is_super_admin return redirect(url_for("admin_dashboard")) return render_template("admin_login.html") @app.post("/admin/logout") def admin_logout(): session.clear() return redirect(url_for("admin_login")) @app.get("/dashboard") @_login_required("user") def dashboard(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) return render_template("dashboard.html", **_build_user_dashboard_context(user)) @app.post("/dashboard/profile") @_login_required("user") def update_profile(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) password = request.form.get("password", "").strip() display_name = request.form.get("display_name", "").strip() if not password: flash("密码不能为空。", "danger") return redirect(url_for("dashboard")) store.update_user(user["id"], password_encrypted=secret_box.encrypt(password), display_name=display_name) flash("账号信息已更新。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/settings/runtime") @_login_required("user") def update_runtime_settings(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) try: refresh_interval_seconds = _parse_refresh_interval( request.form.get("refresh_interval_seconds"), default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds), ) except ValueError as exc: flash(str(exc), "danger") return redirect(url_for("dashboard")) store.update_user(user["id"], refresh_interval_seconds=refresh_interval_seconds) flash(f"未命中课程后的刷新间隔已更新为 {refresh_interval_seconds} 秒。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses") @_login_required("user") def add_course(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) category = request.form.get("category", "free") course_id = request.form.get("course_id", "") course_index = request.form.get("course_index", "") normalized_target = _validate_course_target(course_id, course_index) if normalized_target is None: flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger") return redirect(url_for("dashboard")) normalized_course_id, normalized_course_index = normalized_target store.add_course(user["id"], category, normalized_course_id, normalized_course_index) flash("课程已加入任务列表。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses//delete") @_login_required("user") def delete_course(course_target_id: int): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) if not _user_owns_course(user["id"], course_target_id): flash("不能删除不属于当前账号的课程。", "danger") return redirect(url_for("dashboard")) store.delete_course(course_target_id) flash("课程已移除。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/task/start") @_login_required("user") def start_task(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) task, created = _queue_task_for_user(user, requested_by=user["student_id"], requested_by_role="user") flash("任务已启动。" if created else f"已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") return redirect(url_for("dashboard")) @app.post("/dashboard/task/stop") @_login_required("user") def stop_task(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) active_task = store.find_active_task_for_user(user["id"]) if active_task and task_manager.stop_task(active_task["id"]): flash("停止请求已发送。", "success") else: flash("当前没有可停止的任务。", "warning") return redirect(url_for("dashboard")) @app.get("/admin/dashboard") @_login_required("admin") def admin_dashboard(): return render_template("admin_dashboard.html", **_build_admin_dashboard_context()) @app.get("/admin/users") @_login_required("admin") def admin_users(): return render_template("admin_users.html", **_build_admin_users_context()) @app.get("/admin/schedules") @_login_required("admin") def admin_schedules(): return render_template("admin_schedules.html", **_build_admin_schedules_context()) @app.get("/admin/registration-codes") @_login_required("admin") def admin_registration_codes(): return render_template("admin_registration_codes.html", **_build_admin_registration_codes_context()) @app.get("/admin/logs") @_login_required("admin") def admin_logs(): return render_template("admin_logs.html", **_build_admin_logs_context()) @app.post("/admin/settings/parallel-limit") @_login_required("admin") def update_parallel_limit(): try: parallel_limit = max(1, min(8, int(request.form.get("parallel_limit", str(config.default_parallel_limit))))) except ValueError: flash("并行数必须是 1 到 8 的整数。", "danger") return _admin_post_redirect("admin_dashboard") store.set_parallel_limit(parallel_limit) flash(f"并行数已更新为 {parallel_limit}。", "success") return _admin_post_redirect("admin_dashboard") @app.post("/admin/users") @_login_required("admin") def create_user(): student_id = request.form.get("student_id", "").strip() password = request.form.get("password", "").strip() display_name = request.form.get("display_name", "").strip() try: refresh_interval_seconds = _parse_refresh_interval( request.form.get("refresh_interval_seconds"), default=config.poll_interval_seconds, ) except ValueError as exc: flash(str(exc), "danger") return _admin_post_redirect("admin_users") if not student_id.isdigit() or not password: flash("请填写有效的学号和密码。", "danger") return _admin_post_redirect("admin_users") if store.get_user_by_student_id(student_id): flash("该学号已经存在。", "warning") return _admin_post_redirect("admin_users") store.create_user( student_id, secret_box.encrypt(password), display_name, refresh_interval_seconds=refresh_interval_seconds, ) flash("用户已创建。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/users//update") @_login_required("admin") def update_user(user_id: int): user = store.get_user(user_id) if user is None: flash("用户不存在。", "danger") return _admin_post_redirect("admin_users") display_name = request.form.get("display_name", user["display_name"]).strip() password = request.form.get("password", "").strip() try: refresh_interval_seconds = _parse_refresh_interval( request.form.get("refresh_interval_seconds"), default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds), ) except ValueError as exc: flash(str(exc), "danger") return _admin_post_redirect("admin_users") if password: store.update_user( user_id, display_name=display_name, password_encrypted=secret_box.encrypt(password), refresh_interval_seconds=refresh_interval_seconds, ) else: store.update_user(user_id, display_name=display_name, refresh_interval_seconds=refresh_interval_seconds) flash("用户信息已更新。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/users//toggle") @_login_required("admin") def toggle_user(user_id: int): updated = store.toggle_user_active(user_id) if updated is None: flash("用户不存在。", "danger") else: flash("用户状态已切换。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/users//delete") @_login_required("admin") def delete_user_by_admin(user_id: int): user = store.get_user(user_id) if user is None: flash("用户不存在。", "danger") return _admin_post_redirect("admin_users") active_task = store.find_active_task_for_user(user_id) if active_task is not None: flash("请先停止该用户当前任务,再删除用户。", "danger") return _admin_post_redirect("admin_users") store.delete_user(user_id) flash("用户及其课程、日志、定时设置已删除。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/users//schedule") @_login_required("admin") def update_user_schedule(user_id: int): if store.get_user(user_id) is None: flash("用户不存在。", "danger") return _admin_post_redirect("admin_schedules") try: schedule_payload = _parse_schedule_form(request.form) except ValueError as exc: flash(str(exc), "danger") return _admin_post_redirect("admin_schedules") store.upsert_user_schedule(user_id, **schedule_payload) flash("定时启动终止设置已更新。", "success") return _admin_post_redirect("admin_schedules") @app.post("/admin/users//courses") @_login_required("admin") def admin_add_course(user_id: int): if store.get_user(user_id) is None: flash("用户不存在。", "danger") return _admin_post_redirect("admin_users") category = request.form.get("category", "free") course_id = request.form.get("course_id", "") course_index = request.form.get("course_index", "") normalized_target = _validate_course_target(course_id, course_index) if normalized_target is None: flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger") return _admin_post_redirect("admin_users") normalized_course_id, normalized_course_index = normalized_target store.add_course(user_id, category, normalized_course_id, normalized_course_index) flash("课程已添加到对应用户。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/courses//delete") @_login_required("admin") def admin_delete_course(course_target_id: int): store.delete_course(course_target_id) flash("课程已删除。", "success") return _admin_post_redirect("admin_users") @app.post("/admin/users//task/start") @_login_required("admin") def admin_start_user_task(user_id: int): user = store.get_user(user_id) if user is None: flash("用户不存在。", "danger") return _admin_post_redirect("admin_users") admin_identity = _get_admin_identity() task, created = _queue_task_for_user(user, requested_by=admin_identity["username"], requested_by_role="admin") flash("任务已加入队列。" if created else f"该用户已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") return _admin_post_redirect("admin_users") @app.post("/admin/users//task/stop") @_login_required("admin") def admin_stop_user_task(user_id: int): active_task = store.find_active_task_for_user(user_id) if active_task and task_manager.stop_task(active_task["id"]): flash("已发送停止请求。", "success") else: flash("当前没有可停止任务。", "warning") return _admin_post_redirect("admin_users") @app.post("/admin/admins") @_login_required("admin") def create_admin(): if not session.get("is_super_admin", False): flash("只有超级管理员可以新增管理员。", "danger") return _admin_post_redirect("admin_dashboard") username = request.form.get("username", "").strip() password = request.form.get("password", "").strip() if not username or not password: flash("请填写管理员账号和密码。", "danger") return _admin_post_redirect("admin_dashboard") if username == config.super_admin_username or store.get_admin_by_username(username): flash("该管理员账号已存在。", "warning") return _admin_post_redirect("admin_dashboard") store.create_admin(username, hash_password(password)) flash("管理员已创建。", "success") return _admin_post_redirect("admin_dashboard") @app.post("/admin/registration-codes") @_login_required("admin") def create_registration_code(): note = request.form.get("note", "").strip() try: max_uses = _parse_registration_code_max_uses(request.form.get("max_uses")) except ValueError as exc: flash(str(exc), "danger") return _admin_post_redirect("admin_registration_codes") admin_identity = _get_admin_identity() created = store.create_registration_code(created_by=admin_identity["username"], note=note, max_uses=max_uses) flash(f"注册码已创建:{created['code']}", "success") return _admin_post_redirect("admin_registration_codes") @app.post("/admin/registration-codes//toggle") @_login_required("admin") def toggle_registration_code(registration_code_id: int): updated = store.toggle_registration_code_active(registration_code_id) if updated is None: flash("注册码不存在。", "danger") else: flash("注册码状态已更新。", "success") return _admin_post_redirect("admin_registration_codes") @app.get("/api/user/status") @_login_required("user") def user_status(): user = _get_current_user() if user is None: return jsonify({"ok": False}), 401 task = store.get_latest_task_for_user(user["id"]) return jsonify( { "ok": True, "task": task, "courses": store.list_courses_for_user(user["id"]), "user": { "refresh_interval_seconds": int(user.get("refresh_interval_seconds") or config.poll_interval_seconds), }, } ) @app.get("/api/admin/status") @_login_required("admin") def admin_status(): return jsonify( { "ok": True, "stats": store.get_admin_stats(), "parallel_limit": store.get_parallel_limit(), "recent_tasks": store.list_recent_tasks(limit=12), } ) @app.get("/api/user/logs/stream") @_login_required("user") def stream_user_logs(): user = _get_current_user() if user is None: return jsonify({"ok": False}), 401 last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(user_id=user["id"], limit=1)))) @stream_with_context def generate(): current_last_id = last_id while True: logs = store.list_logs_after(current_last_id, user_id=user["id"], limit=60) if logs: for log in logs: current_last_id = int(log["id"]) yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" else: yield ": keep-alive\n\n" time.sleep(1) response = Response(generate(), mimetype="text/event-stream") response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response @app.get("/api/admin/logs/stream") @_login_required("admin") def stream_admin_logs(): last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(limit=1)))) @stream_with_context def generate(): current_last_id = last_id while True: logs = store.list_logs_after(current_last_id, limit=80) if logs: for log in logs: current_last_id = int(log["id"]) yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" else: yield ": keep-alive\n\n" time.sleep(1) response = Response(generate(), mimetype="text/event-stream") response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response