| from __future__ import annotations
|
|
|
| import atexit
|
| import json
|
| import re
|
| import time
|
| from datetime import date as date_cls, time as time_cls
|
| from functools import wraps
|
| from typing import Callable
|
| from urllib.parse import urlparse
|
|
|
| from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for
|
|
|
| from core.config import AppConfig
|
| from core.db import (
|
| DEFAULT_REGISTRATION_CODE_MAX_USES,
|
| Database,
|
| MAX_REFRESH_INTERVAL_SECONDS,
|
| MIN_REFRESH_INTERVAL_SECONDS,
|
| normalize_registration_code,
|
| )
|
| from core.runtime_logging import configure_logging, get_logger
|
| from core.security import SecretBox, hash_password, verify_password
|
| from core.task_manager import TaskManager
|
|
|
|
|
| configure_logging()
|
| APP_LOGGER = get_logger("sacc.web")
|
|
|
| config = AppConfig.load()
|
| store = Database(
|
| config.db_path,
|
| default_parallel_limit=config.default_parallel_limit,
|
| mysql_ssl_ca_path=config.mysql_ssl_ca_path,
|
| )
|
| store.init_db()
|
| secret_box = SecretBox(config.encryption_key)
|
| task_manager = TaskManager(config=config, store=store, secret_box=secret_box)
|
|
|
|
|
| def _seed_legacy_user() -> None:
|
| if store.list_users():
|
| return
|
|
|
| legacy_path = config.root_dir / "user_data.json"
|
| if not legacy_path.exists():
|
| return
|
|
|
| try:
|
| payload = json.loads(legacy_path.read_text(encoding="utf-8"))
|
| except (OSError, json.JSONDecodeError):
|
| return
|
|
|
| student_id = str(payload.get("std_id", "")).strip()
|
| password = str(payload.get("password", "")).strip()
|
| if not student_id or not password:
|
| return
|
|
|
| user_id = store.create_user(student_id, secret_box.encrypt(password), "Legacy User")
|
| for source_key, category in (("a", "plan"), ("b", "free")):
|
| for course in payload.get("course", {}).get(source_key, []):
|
| course_id = str(course.get("course_id", "")).strip()
|
| course_index = str(course.get("course_index", "")).strip()
|
| if course_id and course_index:
|
| store.add_course(user_id, category, course_id, course_index)
|
|
|
|
|
| _seed_legacy_user()
|
| task_manager.start()
|
| atexit.register(task_manager.shutdown)
|
|
|
| app = Flask(__name__, template_folder="templates", static_folder="static")
|
| app.secret_key = config.session_secret
|
| app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax")
|
| APP_LOGGER.info(
|
| "Application bootstrap complete | data_dir=%s db_path=%s backend=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s schedule_timezone=%s",
|
| config.data_dir,
|
| config.db_path,
|
| config.database_backend,
|
| config.chrome_binary,
|
| config.chromedriver_path,
|
| config.default_parallel_limit,
|
| config.schedule_timezone,
|
| )
|
|
|
| CATEGORY_LABELS = {
|
| "plan": "方案选课",
|
| "free": "自由选课",
|
| }
|
| TASK_LABELS = {
|
| "pending": "排队中",
|
| "running": "执行中",
|
| "cancel_requested": "停止中",
|
| "completed": "已完成",
|
| "stopped": "已停止",
|
| "failed": "失败",
|
| }
|
|
|
| SKIPPED_REQUEST_LOG_PREFIXES = (
|
| "/static/",
|
| "/api/",
|
| )
|
| SKIPPED_REQUEST_LOG_PATHS = {
|
| "/favicon.ico",
|
| }
|
| COURSE_ID_PATTERN = re.compile(r"^[0-9A-Za-z]{1,32}$")
|
| COURSE_INDEX_PATTERN = re.compile(r"^[0-9A-Za-z]{1,8}$")
|
| REGISTRATION_CODE_PATTERN = re.compile(r"^[A-Z0-9-]{6,64}$")
|
|
|
|
|
| def _current_actor_label() -> str:
|
| role = session.get("role", "guest")
|
| if role == "user":
|
| return f"user:{session.get('user_id', '-')}"
|
| if role == "admin":
|
| return f"admin:{session.get('admin_username', '-')}"
|
| return "guest"
|
|
|
|
|
| @app.before_request
|
| def before_request_logging() -> None:
|
| g.request_started_at = time.perf_counter()
|
|
|
|
|
| @app.after_request
|
| def after_request_logging(response):
|
| if request.path in SKIPPED_REQUEST_LOG_PATHS:
|
| return response
|
| if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES):
|
| return response
|
|
|
| started_at = getattr(g, "request_started_at", None)
|
| duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000
|
| remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-"
|
| APP_LOGGER.info(
|
| "HTTP %s %s -> %s in %.1fms | actor=%s remote=%s",
|
| request.method,
|
| request.path,
|
| response.status_code,
|
| duration_ms,
|
| _current_actor_label(),
|
| remote_addr,
|
| )
|
| return response
|
|
|
|
|
| @app.context_processor
|
| def inject_globals() -> dict:
|
| return {
|
| "category_labels": CATEGORY_LABELS,
|
| "task_labels": TASK_LABELS,
|
| "session_role": session.get("role", "guest"),
|
| "refresh_interval_min": MIN_REFRESH_INTERVAL_SECONDS,
|
| "refresh_interval_max": MAX_REFRESH_INTERVAL_SECONDS,
|
| "default_refresh_interval_seconds": config.poll_interval_seconds,
|
| "default_registration_code_max_uses": DEFAULT_REGISTRATION_CODE_MAX_USES,
|
| }
|
|
|
|
|
| def _login_required(role: str) -> Callable:
|
| def decorator(view: Callable) -> Callable:
|
| @wraps(view)
|
| def wrapped(*args, **kwargs):
|
| current_role = session.get("role")
|
| if role == "user" and current_role != "user":
|
| flash("请先登录学生账号。", "warning")
|
| return redirect(url_for("login"))
|
| if role == "admin" and current_role != "admin":
|
| flash("请先登录管理员账号。", "warning")
|
| return redirect(url_for("admin_login"))
|
| return view(*args, **kwargs)
|
|
|
| return wrapped
|
|
|
| return decorator
|
|
|
|
|
| def _get_current_user() -> dict | None:
|
| user_id = session.get("user_id")
|
| if not user_id:
|
| return None
|
| return store.get_user(int(user_id))
|
|
|
|
|
| def _get_admin_identity() -> dict:
|
| return {
|
| "username": session.get("admin_username", ""),
|
| "is_super_admin": bool(session.get("is_super_admin", False)),
|
| }
|
|
|
|
|
| def _normalize_course_token(raw_value: str) -> str:
|
| return re.sub(r"\s+", "", str(raw_value or "")).upper()
|
|
|
|
|
| def _validate_course_target(course_id: str, course_index: str) -> tuple[str, str] | None:
|
| normalized_course_id = _normalize_course_token(course_id)
|
| normalized_course_index = _normalize_course_token(course_index)
|
| if not COURSE_ID_PATTERN.fullmatch(normalized_course_id):
|
| return None
|
| if not COURSE_INDEX_PATTERN.fullmatch(normalized_course_index):
|
| return None
|
| return normalized_course_id, normalized_course_index
|
|
|
|
|
| def _parse_refresh_interval(raw_value: str | None, *, default: int) -> int:
|
| raw_text = str(raw_value or "").strip()
|
| if not raw_text:
|
| return default
|
| try:
|
| interval = int(raw_text)
|
| except ValueError as exc:
|
| raise ValueError(f"刷新间隔必须是 {MIN_REFRESH_INTERVAL_SECONDS} 到 {MAX_REFRESH_INTERVAL_SECONDS} 之间的整数。") from exc
|
| if interval < MIN_REFRESH_INTERVAL_SECONDS or interval > MAX_REFRESH_INTERVAL_SECONDS:
|
| raise ValueError(f"刷新间隔必须在 {MIN_REFRESH_INTERVAL_SECONDS} 到 {MAX_REFRESH_INTERVAL_SECONDS} 秒之间。")
|
| return interval
|
|
|
|
|
| def _parse_registration_code_max_uses(raw_value: str | None) -> int:
|
| raw_text = str(raw_value or "").strip()
|
| if not raw_text:
|
| return DEFAULT_REGISTRATION_CODE_MAX_USES
|
| try:
|
| value = int(raw_text)
|
| except ValueError as exc:
|
| raise ValueError("注册码可用次数必须是 1 到 99 之间的整数。") from exc
|
| if value < 1 or value > 99:
|
| raise ValueError("注册码可用次数必须在 1 到 99 之间。")
|
| return value
|
|
|
|
|
| def _parse_iso_date(raw_value: str | None, label: str) -> str:
|
| raw_text = str(raw_value or "").strip()
|
| if not raw_text:
|
| raise ValueError(f"{label}不能为空。")
|
| try:
|
| return date_cls.fromisoformat(raw_text).isoformat()
|
| except ValueError as exc:
|
| raise ValueError(f"{label}格式无效,请使用 YYYY-MM-DD。") from exc
|
|
|
|
|
| def _parse_clock_time(raw_value: str | None, label: str) -> str:
|
| raw_text = str(raw_value or "").strip()
|
| if not raw_text:
|
| raise ValueError(f"{label}不能为空。")
|
| try:
|
| return time_cls.fromisoformat(raw_text).strftime("%H:%M")
|
| except ValueError as exc:
|
| raise ValueError(f"{label}格式无效,请使用 HH:MM。") from exc
|
|
|
|
|
| def _parse_schedule_form(form) -> dict:
|
| enabled = str(form.get("schedule_enabled", "")).lower() in {"1", "true", "on", "yes"}
|
| start_date_raw = form.get("start_date", "")
|
| end_date_raw = form.get("end_date", "")
|
| daily_start_time_raw = form.get("daily_start_time", "")
|
| daily_stop_time_raw = form.get("daily_stop_time", "")
|
| has_any_value = enabled or any(str(value or "").strip() for value in (start_date_raw, end_date_raw, daily_start_time_raw, daily_stop_time_raw))
|
| if not has_any_value:
|
| return {
|
| "is_enabled": False,
|
| "start_date": None,
|
| "end_date": None,
|
| "daily_start_time": None,
|
| "daily_stop_time": None,
|
| }
|
|
|
| start_date = _parse_iso_date(start_date_raw, "开始日期")
|
| end_date = _parse_iso_date(end_date_raw, "结束日期")
|
| daily_start_time = _parse_clock_time(daily_start_time_raw, "每日启动时间")
|
| daily_stop_time = _parse_clock_time(daily_stop_time_raw, "每日停止时间")
|
| if end_date < start_date:
|
| raise ValueError("结束日期不能早于开始日期。")
|
| if daily_stop_time <= daily_start_time:
|
| raise ValueError("每日停止时间必须晚于每日启动时间。")
|
| return {
|
| "is_enabled": enabled,
|
| "start_date": start_date,
|
| "end_date": end_date,
|
| "daily_start_time": daily_start_time,
|
| "daily_stop_time": daily_stop_time,
|
| }
|
|
|
|
|
| def _user_owns_course(user_id: int, course_target_id: int) -> bool:
|
| return any(course["id"] == course_target_id for course in store.list_courses_for_user(user_id))
|
|
|
|
|
| def _build_user_dashboard_context(user: dict) -> dict:
|
| return {
|
| "current_user": user,
|
| "courses": store.list_courses_for_user(user["id"]),
|
| "task": store.get_latest_task_for_user(user["id"]),
|
| "schedule": store.get_user_schedule(user["id"]),
|
| "recent_logs": store.list_recent_logs(user_id=user["id"], limit=config.logs_page_size),
|
| }
|
|
|
|
|
| def _load_admin_users(*, include_courses: bool = True, include_latest_task: bool = True, include_schedule: bool = True) -> list[dict]:
|
| users = store.list_users()
|
| for user in users:
|
| if include_courses:
|
| user["courses"] = store.list_courses_for_user(user["id"])
|
| if include_latest_task:
|
| user["latest_task"] = store.get_latest_task_for_user(user["id"])
|
| if include_schedule:
|
| user["schedule"] = store.get_user_schedule(user["id"])
|
| return users
|
|
|
|
|
| def _build_admin_base_context(active_page: str, *, page_title: str, page_description: str) -> dict:
|
| admin_identity = _get_admin_identity()
|
| return {
|
| "admin_identity": admin_identity,
|
| "is_super_admin": admin_identity["is_super_admin"],
|
| "admin_page": active_page,
|
| "page_title": page_title,
|
| "page_description": page_description,
|
| }
|
|
|
|
|
| def _build_admin_dashboard_context() -> dict:
|
| context = _build_admin_base_context(
|
| "overview",
|
| page_title="Overview",
|
| page_description="Review system metrics, recent tasks, and admin-level settings.",
|
| )
|
| context.update(
|
| {
|
| "stats": store.get_admin_stats(),
|
| "recent_tasks": store.list_recent_tasks(limit=18),
|
| "parallel_limit": store.get_parallel_limit(),
|
| "admins": store.list_admins(),
|
| "status_url": url_for("admin_status"),
|
| }
|
| )
|
| return context
|
|
|
|
|
| def _build_admin_users_context() -> dict:
|
| context = _build_admin_base_context(
|
| "users",
|
| page_title="Users",
|
| page_description="Manage user accounts, course targets, and task operations.",
|
| )
|
| context.update(
|
| {
|
| "users": _load_admin_users(include_courses=True, include_latest_task=True, include_schedule=True),
|
| }
|
| )
|
| return context
|
|
|
|
|
| def _build_admin_schedules_context() -> dict:
|
| context = _build_admin_base_context(
|
| "schedules",
|
| page_title="Schedules",
|
| page_description="Configure per-user auto start and stop windows by date and time.",
|
| )
|
| users = _load_admin_users(include_courses=False, include_latest_task=True, include_schedule=True)
|
| context.update(
|
| {
|
| "users": users,
|
| "stats": store.get_admin_stats(),
|
| }
|
| )
|
| return context
|
|
|
|
|
| def _build_admin_registration_codes_context() -> dict:
|
| context = _build_admin_base_context(
|
| "registration_codes",
|
| page_title="Codes",
|
| page_description="Create registration codes and inspect their usage state.",
|
| )
|
| context.update(
|
| {
|
| "registration_codes": store.list_registration_codes(limit=60),
|
| "stats": store.get_admin_stats(),
|
| }
|
| )
|
| return context
|
|
|
|
|
| def _build_admin_logs_context() -> dict:
|
| recent_logs = store.list_recent_logs(limit=config.logs_page_size)
|
| context = _build_admin_base_context(
|
| "logs",
|
| page_title="Logs",
|
| page_description="Review live global logs for task execution and troubleshooting.",
|
| )
|
| context.update(
|
| {
|
| "recent_logs": recent_logs,
|
| "log_stream_url": url_for("stream_admin_logs", last_id=recent_logs[-1]["id"] if recent_logs else 0),
|
| }
|
| )
|
| return context
|
|
|
|
|
| def _queue_task_for_user(user: dict, *, requested_by: str, requested_by_role: str) -> tuple[dict, bool]:
|
| return task_manager.queue_task(user["id"], requested_by=requested_by, requested_by_role=requested_by_role)
|
|
|
|
|
| def _latest_log_id(logs: list[dict]) -> int:
|
| if not logs:
|
| return 0
|
| return int(logs[-1]["id"])
|
|
|
|
|
| def _admin_post_redirect(default_endpoint: str):
|
| target = (request.form.get("next") or request.referrer or "").strip()
|
| if target:
|
| parsed = urlparse(target)
|
| same_host = not parsed.netloc or parsed.netloc == request.host
|
| if same_host and (parsed.path or "").startswith("/admin"):
|
| safe_target = parsed.path or url_for(default_endpoint)
|
| if parsed.query:
|
| safe_target = f"{safe_target}?{parsed.query}"
|
| return redirect(safe_target)
|
| return redirect(url_for(default_endpoint))
|
|
|
|
|
| @app.get("/")
|
| def index():
|
| if session.get("role") == "user":
|
| return redirect(url_for("dashboard"))
|
| if session.get("role") == "admin":
|
| return redirect(url_for("admin_dashboard"))
|
| return redirect(url_for("login"))
|
|
|
|
|
| @app.route("/login", methods=["GET", "POST"])
|
| def login():
|
| if request.method == "POST":
|
| student_id = request.form.get("student_id", "").strip()
|
| password = request.form.get("password", "")
|
| user = store.get_user_by_student_id(student_id)
|
| if user is None:
|
| flash("没有找到该学号对应的账号。如果你有注册码,请先完成注册。", "danger")
|
| return render_template("login.html")
|
| if not user["is_active"]:
|
| flash("该账号已被管理员禁用。", "danger")
|
| return render_template("login.html")
|
| try:
|
| stored_password = secret_box.decrypt(user["password_encrypted"])
|
| except Exception:
|
| flash("账号数据损坏,请联系管理员重置密码。", "danger")
|
| return render_template("login.html")
|
| if stored_password != password:
|
| flash("学号或密码不正确。", "danger")
|
| return render_template("login.html")
|
|
|
| session.clear()
|
| session["role"] = "user"
|
| session["user_id"] = user["id"]
|
| return redirect(url_for("dashboard"))
|
|
|
| return render_template("login.html")
|
|
|
|
|
| @app.route("/register", methods=["GET", "POST"])
|
| def register():
|
| if request.method == "POST":
|
| registration_code = normalize_registration_code(request.form.get("registration_code", ""))
|
| student_id = request.form.get("student_id", "").strip()
|
| password = request.form.get("password", "").strip()
|
| display_name = request.form.get("display_name", "").strip()
|
|
|
| if not REGISTRATION_CODE_PATTERN.fullmatch(registration_code):
|
| flash("请输入有效的注册码。", "danger")
|
| return render_template("register.html")
|
| if not student_id.isdigit() or not password:
|
| flash("请填写学号和教务处密码。", "danger")
|
| return render_template("register.html")
|
|
|
| try:
|
| store.register_user_with_code(
|
| registration_code,
|
| student_id,
|
| secret_box.encrypt(password),
|
| display_name,
|
| refresh_interval_seconds=config.poll_interval_seconds,
|
| )
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return render_template("register.html")
|
|
|
| flash("注册成功,请使用学号和教务处密码登录。", "success")
|
| return redirect(url_for("login"))
|
|
|
| return render_template("register.html")
|
|
|
|
|
| @app.post("/logout")
|
| def logout():
|
| session.clear()
|
| return redirect(url_for("login"))
|
|
|
|
|
| @app.route("/admin", methods=["GET", "POST"])
|
| def admin_login():
|
| if request.method == "POST":
|
| username = request.form.get("username", "").strip()
|
| password = request.form.get("password", "")
|
| is_super_admin = username == config.super_admin_username and password == config.super_admin_password
|
| admin_row = store.get_admin_by_username(username)
|
| is_regular_admin = bool(admin_row and verify_password(admin_row["password_hash"], password))
|
| if not is_super_admin and not is_regular_admin:
|
| flash("管理员账号或密码错误。", "danger")
|
| return render_template("admin_login.html")
|
|
|
| session.clear()
|
| session["role"] = "admin"
|
| session["admin_username"] = username
|
| session["is_super_admin"] = is_super_admin
|
| return redirect(url_for("admin_dashboard"))
|
|
|
| return render_template("admin_login.html")
|
|
|
|
|
| @app.post("/admin/logout")
|
| def admin_logout():
|
| session.clear()
|
| return redirect(url_for("admin_login"))
|
|
|
| @app.get("/dashboard")
|
| @_login_required("user")
|
| def dashboard():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
| return render_template("dashboard.html", **_build_user_dashboard_context(user))
|
|
|
|
|
| @app.post("/dashboard/profile")
|
| @_login_required("user")
|
| def update_profile():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
|
|
| password = request.form.get("password", "").strip()
|
| display_name = request.form.get("display_name", "").strip()
|
| if not password:
|
| flash("密码不能为空。", "danger")
|
| return redirect(url_for("dashboard"))
|
|
|
| store.update_user(user["id"], password_encrypted=secret_box.encrypt(password), display_name=display_name)
|
| flash("账号信息已更新。", "success")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.post("/dashboard/settings/runtime")
|
| @_login_required("user")
|
| def update_runtime_settings():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
|
|
| try:
|
| refresh_interval_seconds = _parse_refresh_interval(
|
| request.form.get("refresh_interval_seconds"),
|
| default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
|
| )
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return redirect(url_for("dashboard"))
|
|
|
| store.update_user(user["id"], refresh_interval_seconds=refresh_interval_seconds)
|
| flash(f"未命中课程后的刷新间隔已更新为 {refresh_interval_seconds} 秒。", "success")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.post("/dashboard/courses")
|
| @_login_required("user")
|
| def add_course():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
|
|
| category = request.form.get("category", "free")
|
| course_id = request.form.get("course_id", "")
|
| course_index = request.form.get("course_index", "")
|
| normalized_target = _validate_course_target(course_id, course_index)
|
| if normalized_target is None:
|
| flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger")
|
| return redirect(url_for("dashboard"))
|
|
|
| normalized_course_id, normalized_course_index = normalized_target
|
| store.add_course(user["id"], category, normalized_course_id, normalized_course_index)
|
| flash("课程已加入任务列表。", "success")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.post("/dashboard/courses/<int:course_target_id>/delete")
|
| @_login_required("user")
|
| def delete_course(course_target_id: int):
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
| if not _user_owns_course(user["id"], course_target_id):
|
| flash("不能删除不属于当前账号的课程。", "danger")
|
| return redirect(url_for("dashboard"))
|
| store.delete_course(course_target_id)
|
| flash("课程已移除。", "success")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.post("/dashboard/task/start")
|
| @_login_required("user")
|
| def start_task():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
| task, created = _queue_task_for_user(user, requested_by=user["student_id"], requested_by_role="user")
|
| flash("任务已启动。" if created else f"已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.post("/dashboard/task/stop")
|
| @_login_required("user")
|
| def stop_task():
|
| user = _get_current_user()
|
| if user is None:
|
| session.clear()
|
| return redirect(url_for("login"))
|
| active_task = store.find_active_task_for_user(user["id"])
|
| if active_task and task_manager.stop_task(active_task["id"]):
|
| flash("停止请求已发送。", "success")
|
| else:
|
| flash("当前没有可停止的任务。", "warning")
|
| return redirect(url_for("dashboard"))
|
|
|
|
|
| @app.get("/admin/dashboard")
|
| @_login_required("admin")
|
| def admin_dashboard():
|
| return render_template("admin_dashboard.html", **_build_admin_dashboard_context())
|
|
|
|
|
| @app.get("/admin/users")
|
| @_login_required("admin")
|
| def admin_users():
|
| return render_template("admin_users.html", **_build_admin_users_context())
|
|
|
|
|
| @app.get("/admin/schedules")
|
| @_login_required("admin")
|
| def admin_schedules():
|
| return render_template("admin_schedules.html", **_build_admin_schedules_context())
|
|
|
|
|
| @app.get("/admin/registration-codes")
|
| @_login_required("admin")
|
| def admin_registration_codes():
|
| return render_template("admin_registration_codes.html", **_build_admin_registration_codes_context())
|
|
|
|
|
| @app.get("/admin/logs")
|
| @_login_required("admin")
|
| def admin_logs():
|
| return render_template("admin_logs.html", **_build_admin_logs_context())
|
|
|
|
|
| @app.post("/admin/settings/parallel-limit")
|
| @_login_required("admin")
|
| def update_parallel_limit():
|
| try:
|
| parallel_limit = max(1, min(8, int(request.form.get("parallel_limit", str(config.default_parallel_limit)))))
|
| except ValueError:
|
| flash("并行数必须是 1 到 8 的整数。", "danger")
|
| return _admin_post_redirect("admin_dashboard")
|
| store.set_parallel_limit(parallel_limit)
|
| flash(f"并行数已更新为 {parallel_limit}。", "success")
|
| return _admin_post_redirect("admin_dashboard")
|
|
|
|
|
| @app.post("/admin/users")
|
| @_login_required("admin")
|
| def create_user():
|
| student_id = request.form.get("student_id", "").strip()
|
| password = request.form.get("password", "").strip()
|
| display_name = request.form.get("display_name", "").strip()
|
| try:
|
| refresh_interval_seconds = _parse_refresh_interval(
|
| request.form.get("refresh_interval_seconds"),
|
| default=config.poll_interval_seconds,
|
| )
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return _admin_post_redirect("admin_users")
|
| if not student_id.isdigit() or not password:
|
| flash("请填写有效的学号和密码。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| if store.get_user_by_student_id(student_id):
|
| flash("该学号已经存在。", "warning")
|
| return _admin_post_redirect("admin_users")
|
| store.create_user(
|
| student_id,
|
| secret_box.encrypt(password),
|
| display_name,
|
| refresh_interval_seconds=refresh_interval_seconds,
|
| )
|
| flash("用户已创建。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/update")
|
| @_login_required("admin")
|
| def update_user(user_id: int):
|
| user = store.get_user(user_id)
|
| if user is None:
|
| flash("用户不存在。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| display_name = request.form.get("display_name", user["display_name"]).strip()
|
| password = request.form.get("password", "").strip()
|
| try:
|
| refresh_interval_seconds = _parse_refresh_interval(
|
| request.form.get("refresh_interval_seconds"),
|
| default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
|
| )
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return _admin_post_redirect("admin_users")
|
| if password:
|
| store.update_user(
|
| user_id,
|
| display_name=display_name,
|
| password_encrypted=secret_box.encrypt(password),
|
| refresh_interval_seconds=refresh_interval_seconds,
|
| )
|
| else:
|
| store.update_user(user_id, display_name=display_name, refresh_interval_seconds=refresh_interval_seconds)
|
| flash("用户信息已更新。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/toggle")
|
| @_login_required("admin")
|
| def toggle_user(user_id: int):
|
| updated = store.toggle_user_active(user_id)
|
| if updated is None:
|
| flash("用户不存在。", "danger")
|
| else:
|
| flash("用户状态已切换。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/delete")
|
| @_login_required("admin")
|
| def delete_user_by_admin(user_id: int):
|
| user = store.get_user(user_id)
|
| if user is None:
|
| flash("用户不存在。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| active_task = store.find_active_task_for_user(user_id)
|
| if active_task is not None:
|
| flash("请先停止该用户当前任务,再删除用户。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| store.delete_user(user_id)
|
| flash("用户及其课程、日志、定时设置已删除。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/schedule")
|
| @_login_required("admin")
|
| def update_user_schedule(user_id: int):
|
| if store.get_user(user_id) is None:
|
| flash("用户不存在。", "danger")
|
| return _admin_post_redirect("admin_schedules")
|
| try:
|
| schedule_payload = _parse_schedule_form(request.form)
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return _admin_post_redirect("admin_schedules")
|
| store.upsert_user_schedule(user_id, **schedule_payload)
|
| flash("定时启动终止设置已更新。", "success")
|
| return _admin_post_redirect("admin_schedules")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/courses")
|
| @_login_required("admin")
|
| def admin_add_course(user_id: int):
|
| if store.get_user(user_id) is None:
|
| flash("用户不存在。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| category = request.form.get("category", "free")
|
| course_id = request.form.get("course_id", "")
|
| course_index = request.form.get("course_index", "")
|
| normalized_target = _validate_course_target(course_id, course_index)
|
| if normalized_target is None:
|
| flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| normalized_course_id, normalized_course_index = normalized_target
|
| store.add_course(user_id, category, normalized_course_id, normalized_course_index)
|
| flash("课程已添加到对应用户。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/courses/<int:course_target_id>/delete")
|
| @_login_required("admin")
|
| def admin_delete_course(course_target_id: int):
|
| store.delete_course(course_target_id)
|
| flash("课程已删除。", "success")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/task/start")
|
| @_login_required("admin")
|
| def admin_start_user_task(user_id: int):
|
| user = store.get_user(user_id)
|
| if user is None:
|
| flash("用户不存在。", "danger")
|
| return _admin_post_redirect("admin_users")
|
| admin_identity = _get_admin_identity()
|
| task, created = _queue_task_for_user(user, requested_by=admin_identity["username"], requested_by_role="admin")
|
| flash("任务已加入队列。" if created else f"该用户已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/users/<int:user_id>/task/stop")
|
| @_login_required("admin")
|
| def admin_stop_user_task(user_id: int):
|
| active_task = store.find_active_task_for_user(user_id)
|
| if active_task and task_manager.stop_task(active_task["id"]):
|
| flash("已发送停止请求。", "success")
|
| else:
|
| flash("当前没有可停止任务。", "warning")
|
| return _admin_post_redirect("admin_users")
|
|
|
|
|
| @app.post("/admin/admins")
|
| @_login_required("admin")
|
| def create_admin():
|
| if not session.get("is_super_admin", False):
|
| flash("只有超级管理员可以新增管理员。", "danger")
|
| return _admin_post_redirect("admin_dashboard")
|
| username = request.form.get("username", "").strip()
|
| password = request.form.get("password", "").strip()
|
| if not username or not password:
|
| flash("请填写管理员账号和密码。", "danger")
|
| return _admin_post_redirect("admin_dashboard")
|
| if username == config.super_admin_username or store.get_admin_by_username(username):
|
| flash("该管理员账号已存在。", "warning")
|
| return _admin_post_redirect("admin_dashboard")
|
| store.create_admin(username, hash_password(password))
|
| flash("管理员已创建。", "success")
|
| return _admin_post_redirect("admin_dashboard")
|
|
|
|
|
| @app.post("/admin/registration-codes")
|
| @_login_required("admin")
|
| def create_registration_code():
|
| note = request.form.get("note", "").strip()
|
| try:
|
| max_uses = _parse_registration_code_max_uses(request.form.get("max_uses"))
|
| except ValueError as exc:
|
| flash(str(exc), "danger")
|
| return _admin_post_redirect("admin_registration_codes")
|
| admin_identity = _get_admin_identity()
|
| created = store.create_registration_code(created_by=admin_identity["username"], note=note, max_uses=max_uses)
|
| flash(f"注册码已创建:{created['code']}", "success")
|
| return _admin_post_redirect("admin_registration_codes")
|
|
|
|
|
| @app.post("/admin/registration-codes/<int:registration_code_id>/toggle")
|
| @_login_required("admin")
|
| def toggle_registration_code(registration_code_id: int):
|
| updated = store.toggle_registration_code_active(registration_code_id)
|
| if updated is None:
|
| flash("注册码不存在。", "danger")
|
| else:
|
| flash("注册码状态已更新。", "success")
|
| return _admin_post_redirect("admin_registration_codes")
|
|
|
| @app.get("/api/user/status")
|
| @_login_required("user")
|
| def user_status():
|
| user = _get_current_user()
|
| if user is None:
|
| return jsonify({"ok": False}), 401
|
| task = store.get_latest_task_for_user(user["id"])
|
| return jsonify(
|
| {
|
| "ok": True,
|
| "task": task,
|
| "courses": store.list_courses_for_user(user["id"]),
|
| "user": {
|
| "refresh_interval_seconds": int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
|
| },
|
| }
|
| )
|
|
|
|
|
| @app.get("/api/admin/status")
|
| @_login_required("admin")
|
| def admin_status():
|
| return jsonify(
|
| {
|
| "ok": True,
|
| "stats": store.get_admin_stats(),
|
| "parallel_limit": store.get_parallel_limit(),
|
| "recent_tasks": store.list_recent_tasks(limit=12),
|
| }
|
| )
|
|
|
|
|
| @app.get("/api/user/logs/stream")
|
| @_login_required("user")
|
| def stream_user_logs():
|
| user = _get_current_user()
|
| if user is None:
|
| return jsonify({"ok": False}), 401
|
| last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(user_id=user["id"], limit=1))))
|
|
|
| @stream_with_context
|
| def generate():
|
| current_last_id = last_id
|
| while True:
|
| logs = store.list_logs_after(current_last_id, user_id=user["id"], limit=60)
|
| if logs:
|
| for log in logs:
|
| current_last_id = int(log["id"])
|
| yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n"
|
| else:
|
| yield ": keep-alive\n\n"
|
| time.sleep(1)
|
|
|
| response = Response(generate(), mimetype="text/event-stream")
|
| response.headers["Cache-Control"] = "no-cache"
|
| response.headers["X-Accel-Buffering"] = "no"
|
| return response
|
|
|
|
|
| @app.get("/api/admin/logs/stream")
|
| @_login_required("admin")
|
| def stream_admin_logs():
|
| last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(limit=1))))
|
|
|
| @stream_with_context
|
| def generate():
|
| current_last_id = last_id
|
| while True:
|
| logs = store.list_logs_after(current_last_id, limit=80)
|
| if logs:
|
| for log in logs:
|
| current_last_id = int(log["id"])
|
| yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n"
|
| else:
|
| yield ": keep-alive\n\n"
|
| time.sleep(1)
|
|
|
| response = Response(generate(), mimetype="text/event-stream")
|
| response.headers["Cache-Control"] = "no-cache"
|
| response.headers["X-Accel-Buffering"] = "no"
|
| return response
|
|
|