SACC / space_app.py
cacode's picture
Split admin features into separate pages
f256f5b verified
from __future__ import annotations
import atexit
import json
import re
import time
from datetime import date as date_cls, time as time_cls
from functools import wraps
from typing import Callable
from urllib.parse import urlparse
from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for
from core.config import AppConfig
from core.db import (
DEFAULT_REGISTRATION_CODE_MAX_USES,
Database,
MAX_REFRESH_INTERVAL_SECONDS,
MIN_REFRESH_INTERVAL_SECONDS,
normalize_registration_code,
)
from core.runtime_logging import configure_logging, get_logger
from core.security import SecretBox, hash_password, verify_password
from core.task_manager import TaskManager
configure_logging()
APP_LOGGER = get_logger("sacc.web")
config = AppConfig.load()
store = Database(
config.db_path,
default_parallel_limit=config.default_parallel_limit,
mysql_ssl_ca_path=config.mysql_ssl_ca_path,
)
store.init_db()
secret_box = SecretBox(config.encryption_key)
task_manager = TaskManager(config=config, store=store, secret_box=secret_box)
def _seed_legacy_user() -> None:
if store.list_users():
return
legacy_path = config.root_dir / "user_data.json"
if not legacy_path.exists():
return
try:
payload = json.loads(legacy_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return
student_id = str(payload.get("std_id", "")).strip()
password = str(payload.get("password", "")).strip()
if not student_id or not password:
return
user_id = store.create_user(student_id, secret_box.encrypt(password), "Legacy User")
for source_key, category in (("a", "plan"), ("b", "free")):
for course in payload.get("course", {}).get(source_key, []):
course_id = str(course.get("course_id", "")).strip()
course_index = str(course.get("course_index", "")).strip()
if course_id and course_index:
store.add_course(user_id, category, course_id, course_index)
_seed_legacy_user()
task_manager.start()
atexit.register(task_manager.shutdown)
app = Flask(__name__, template_folder="templates", static_folder="static")
app.secret_key = config.session_secret
app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax")
APP_LOGGER.info(
"Application bootstrap complete | data_dir=%s db_path=%s backend=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s schedule_timezone=%s",
config.data_dir,
config.db_path,
config.database_backend,
config.chrome_binary,
config.chromedriver_path,
config.default_parallel_limit,
config.schedule_timezone,
)
CATEGORY_LABELS = {
"plan": "方案选课",
"free": "自由选课",
}
TASK_LABELS = {
"pending": "排队中",
"running": "执行中",
"cancel_requested": "停止中",
"completed": "已完成",
"stopped": "已停止",
"failed": "失败",
}
SKIPPED_REQUEST_LOG_PREFIXES = (
"/static/",
"/api/",
)
SKIPPED_REQUEST_LOG_PATHS = {
"/favicon.ico",
}
COURSE_ID_PATTERN = re.compile(r"^[0-9A-Za-z]{1,32}$")
COURSE_INDEX_PATTERN = re.compile(r"^[0-9A-Za-z]{1,8}$")
REGISTRATION_CODE_PATTERN = re.compile(r"^[A-Z0-9-]{6,64}$")
def _current_actor_label() -> str:
role = session.get("role", "guest")
if role == "user":
return f"user:{session.get('user_id', '-')}"
if role == "admin":
return f"admin:{session.get('admin_username', '-')}"
return "guest"
@app.before_request
def before_request_logging() -> None:
g.request_started_at = time.perf_counter()
@app.after_request
def after_request_logging(response):
if request.path in SKIPPED_REQUEST_LOG_PATHS:
return response
if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES):
return response
started_at = getattr(g, "request_started_at", None)
duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000
remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-"
APP_LOGGER.info(
"HTTP %s %s -> %s in %.1fms | actor=%s remote=%s",
request.method,
request.path,
response.status_code,
duration_ms,
_current_actor_label(),
remote_addr,
)
return response
@app.context_processor
def inject_globals() -> dict:
return {
"category_labels": CATEGORY_LABELS,
"task_labels": TASK_LABELS,
"session_role": session.get("role", "guest"),
"refresh_interval_min": MIN_REFRESH_INTERVAL_SECONDS,
"refresh_interval_max": MAX_REFRESH_INTERVAL_SECONDS,
"default_refresh_interval_seconds": config.poll_interval_seconds,
"default_registration_code_max_uses": DEFAULT_REGISTRATION_CODE_MAX_USES,
}
def _login_required(role: str) -> Callable:
def decorator(view: Callable) -> Callable:
@wraps(view)
def wrapped(*args, **kwargs):
current_role = session.get("role")
if role == "user" and current_role != "user":
flash("请先登录学生账号。", "warning")
return redirect(url_for("login"))
if role == "admin" and current_role != "admin":
flash("请先登录管理员账号。", "warning")
return redirect(url_for("admin_login"))
return view(*args, **kwargs)
return wrapped
return decorator
def _get_current_user() -> dict | None:
user_id = session.get("user_id")
if not user_id:
return None
return store.get_user(int(user_id))
def _get_admin_identity() -> dict:
return {
"username": session.get("admin_username", ""),
"is_super_admin": bool(session.get("is_super_admin", False)),
}
def _normalize_course_token(raw_value: str) -> str:
return re.sub(r"\s+", "", str(raw_value or "")).upper()
def _validate_course_target(course_id: str, course_index: str) -> tuple[str, str] | None:
normalized_course_id = _normalize_course_token(course_id)
normalized_course_index = _normalize_course_token(course_index)
if not COURSE_ID_PATTERN.fullmatch(normalized_course_id):
return None
if not COURSE_INDEX_PATTERN.fullmatch(normalized_course_index):
return None
return normalized_course_id, normalized_course_index
def _parse_refresh_interval(raw_value: str | None, *, default: int) -> int:
raw_text = str(raw_value or "").strip()
if not raw_text:
return default
try:
interval = int(raw_text)
except ValueError as exc:
raise ValueError(f"刷新间隔必须是 {MIN_REFRESH_INTERVAL_SECONDS}{MAX_REFRESH_INTERVAL_SECONDS} 之间的整数。") from exc
if interval < MIN_REFRESH_INTERVAL_SECONDS or interval > MAX_REFRESH_INTERVAL_SECONDS:
raise ValueError(f"刷新间隔必须在 {MIN_REFRESH_INTERVAL_SECONDS}{MAX_REFRESH_INTERVAL_SECONDS} 秒之间。")
return interval
def _parse_registration_code_max_uses(raw_value: str | None) -> int:
raw_text = str(raw_value or "").strip()
if not raw_text:
return DEFAULT_REGISTRATION_CODE_MAX_USES
try:
value = int(raw_text)
except ValueError as exc:
raise ValueError("注册码可用次数必须是 1 到 99 之间的整数。") from exc
if value < 1 or value > 99:
raise ValueError("注册码可用次数必须在 1 到 99 之间。")
return value
def _parse_iso_date(raw_value: str | None, label: str) -> str:
raw_text = str(raw_value or "").strip()
if not raw_text:
raise ValueError(f"{label}不能为空。")
try:
return date_cls.fromisoformat(raw_text).isoformat()
except ValueError as exc:
raise ValueError(f"{label}格式无效,请使用 YYYY-MM-DD。") from exc
def _parse_clock_time(raw_value: str | None, label: str) -> str:
raw_text = str(raw_value or "").strip()
if not raw_text:
raise ValueError(f"{label}不能为空。")
try:
return time_cls.fromisoformat(raw_text).strftime("%H:%M")
except ValueError as exc:
raise ValueError(f"{label}格式无效,请使用 HH:MM。") from exc
def _parse_schedule_form(form) -> dict:
enabled = str(form.get("schedule_enabled", "")).lower() in {"1", "true", "on", "yes"}
start_date_raw = form.get("start_date", "")
end_date_raw = form.get("end_date", "")
daily_start_time_raw = form.get("daily_start_time", "")
daily_stop_time_raw = form.get("daily_stop_time", "")
has_any_value = enabled or any(str(value or "").strip() for value in (start_date_raw, end_date_raw, daily_start_time_raw, daily_stop_time_raw))
if not has_any_value:
return {
"is_enabled": False,
"start_date": None,
"end_date": None,
"daily_start_time": None,
"daily_stop_time": None,
}
start_date = _parse_iso_date(start_date_raw, "开始日期")
end_date = _parse_iso_date(end_date_raw, "结束日期")
daily_start_time = _parse_clock_time(daily_start_time_raw, "每日启动时间")
daily_stop_time = _parse_clock_time(daily_stop_time_raw, "每日停止时间")
if end_date < start_date:
raise ValueError("结束日期不能早于开始日期。")
if daily_stop_time <= daily_start_time:
raise ValueError("每日停止时间必须晚于每日启动时间。")
return {
"is_enabled": enabled,
"start_date": start_date,
"end_date": end_date,
"daily_start_time": daily_start_time,
"daily_stop_time": daily_stop_time,
}
def _user_owns_course(user_id: int, course_target_id: int) -> bool:
return any(course["id"] == course_target_id for course in store.list_courses_for_user(user_id))
def _build_user_dashboard_context(user: dict) -> dict:
return {
"current_user": user,
"courses": store.list_courses_for_user(user["id"]),
"task": store.get_latest_task_for_user(user["id"]),
"schedule": store.get_user_schedule(user["id"]),
"recent_logs": store.list_recent_logs(user_id=user["id"], limit=config.logs_page_size),
}
def _load_admin_users(*, include_courses: bool = True, include_latest_task: bool = True, include_schedule: bool = True) -> list[dict]:
users = store.list_users()
for user in users:
if include_courses:
user["courses"] = store.list_courses_for_user(user["id"])
if include_latest_task:
user["latest_task"] = store.get_latest_task_for_user(user["id"])
if include_schedule:
user["schedule"] = store.get_user_schedule(user["id"])
return users
def _build_admin_base_context(active_page: str, *, page_title: str, page_description: str) -> dict:
admin_identity = _get_admin_identity()
return {
"admin_identity": admin_identity,
"is_super_admin": admin_identity["is_super_admin"],
"admin_page": active_page,
"page_title": page_title,
"page_description": page_description,
}
def _build_admin_dashboard_context() -> dict:
context = _build_admin_base_context(
"overview",
page_title="Overview",
page_description="Review system metrics, recent tasks, and admin-level settings.",
)
context.update(
{
"stats": store.get_admin_stats(),
"recent_tasks": store.list_recent_tasks(limit=18),
"parallel_limit": store.get_parallel_limit(),
"admins": store.list_admins(),
"status_url": url_for("admin_status"),
}
)
return context
def _build_admin_users_context() -> dict:
context = _build_admin_base_context(
"users",
page_title="Users",
page_description="Manage user accounts, course targets, and task operations.",
)
context.update(
{
"users": _load_admin_users(include_courses=True, include_latest_task=True, include_schedule=True),
}
)
return context
def _build_admin_schedules_context() -> dict:
context = _build_admin_base_context(
"schedules",
page_title="Schedules",
page_description="Configure per-user auto start and stop windows by date and time.",
)
users = _load_admin_users(include_courses=False, include_latest_task=True, include_schedule=True)
context.update(
{
"users": users,
"stats": store.get_admin_stats(),
}
)
return context
def _build_admin_registration_codes_context() -> dict:
context = _build_admin_base_context(
"registration_codes",
page_title="Codes",
page_description="Create registration codes and inspect their usage state.",
)
context.update(
{
"registration_codes": store.list_registration_codes(limit=60),
"stats": store.get_admin_stats(),
}
)
return context
def _build_admin_logs_context() -> dict:
recent_logs = store.list_recent_logs(limit=config.logs_page_size)
context = _build_admin_base_context(
"logs",
page_title="Logs",
page_description="Review live global logs for task execution and troubleshooting.",
)
context.update(
{
"recent_logs": recent_logs,
"log_stream_url": url_for("stream_admin_logs", last_id=recent_logs[-1]["id"] if recent_logs else 0),
}
)
return context
def _queue_task_for_user(user: dict, *, requested_by: str, requested_by_role: str) -> tuple[dict, bool]:
return task_manager.queue_task(user["id"], requested_by=requested_by, requested_by_role=requested_by_role)
def _latest_log_id(logs: list[dict]) -> int:
if not logs:
return 0
return int(logs[-1]["id"])
def _admin_post_redirect(default_endpoint: str):
target = (request.form.get("next") or request.referrer or "").strip()
if target:
parsed = urlparse(target)
same_host = not parsed.netloc or parsed.netloc == request.host
if same_host and (parsed.path or "").startswith("/admin"):
safe_target = parsed.path or url_for(default_endpoint)
if parsed.query:
safe_target = f"{safe_target}?{parsed.query}"
return redirect(safe_target)
return redirect(url_for(default_endpoint))
@app.get("/")
def index():
if session.get("role") == "user":
return redirect(url_for("dashboard"))
if session.get("role") == "admin":
return redirect(url_for("admin_dashboard"))
return redirect(url_for("login"))
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
student_id = request.form.get("student_id", "").strip()
password = request.form.get("password", "")
user = store.get_user_by_student_id(student_id)
if user is None:
flash("没有找到该学号对应的账号。如果你有注册码,请先完成注册。", "danger")
return render_template("login.html")
if not user["is_active"]:
flash("该账号已被管理员禁用。", "danger")
return render_template("login.html")
try:
stored_password = secret_box.decrypt(user["password_encrypted"])
except Exception:
flash("账号数据损坏,请联系管理员重置密码。", "danger")
return render_template("login.html")
if stored_password != password:
flash("学号或密码不正确。", "danger")
return render_template("login.html")
session.clear()
session["role"] = "user"
session["user_id"] = user["id"]
return redirect(url_for("dashboard"))
return render_template("login.html")
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
registration_code = normalize_registration_code(request.form.get("registration_code", ""))
student_id = request.form.get("student_id", "").strip()
password = request.form.get("password", "").strip()
display_name = request.form.get("display_name", "").strip()
if not REGISTRATION_CODE_PATTERN.fullmatch(registration_code):
flash("请输入有效的注册码。", "danger")
return render_template("register.html")
if not student_id.isdigit() or not password:
flash("请填写学号和教务处密码。", "danger")
return render_template("register.html")
try:
store.register_user_with_code(
registration_code,
student_id,
secret_box.encrypt(password),
display_name,
refresh_interval_seconds=config.poll_interval_seconds,
)
except ValueError as exc:
flash(str(exc), "danger")
return render_template("register.html")
flash("注册成功,请使用学号和教务处密码登录。", "success")
return redirect(url_for("login"))
return render_template("register.html")
@app.post("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/admin", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
is_super_admin = username == config.super_admin_username and password == config.super_admin_password
admin_row = store.get_admin_by_username(username)
is_regular_admin = bool(admin_row and verify_password(admin_row["password_hash"], password))
if not is_super_admin and not is_regular_admin:
flash("管理员账号或密码错误。", "danger")
return render_template("admin_login.html")
session.clear()
session["role"] = "admin"
session["admin_username"] = username
session["is_super_admin"] = is_super_admin
return redirect(url_for("admin_dashboard"))
return render_template("admin_login.html")
@app.post("/admin/logout")
def admin_logout():
session.clear()
return redirect(url_for("admin_login"))
@app.get("/dashboard")
@_login_required("user")
def dashboard():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
return render_template("dashboard.html", **_build_user_dashboard_context(user))
@app.post("/dashboard/profile")
@_login_required("user")
def update_profile():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
password = request.form.get("password", "").strip()
display_name = request.form.get("display_name", "").strip()
if not password:
flash("密码不能为空。", "danger")
return redirect(url_for("dashboard"))
store.update_user(user["id"], password_encrypted=secret_box.encrypt(password), display_name=display_name)
flash("账号信息已更新。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/settings/runtime")
@_login_required("user")
def update_runtime_settings():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
try:
refresh_interval_seconds = _parse_refresh_interval(
request.form.get("refresh_interval_seconds"),
default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
)
except ValueError as exc:
flash(str(exc), "danger")
return redirect(url_for("dashboard"))
store.update_user(user["id"], refresh_interval_seconds=refresh_interval_seconds)
flash(f"未命中课程后的刷新间隔已更新为 {refresh_interval_seconds} 秒。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/courses")
@_login_required("user")
def add_course():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
category = request.form.get("category", "free")
course_id = request.form.get("course_id", "")
course_index = request.form.get("course_index", "")
normalized_target = _validate_course_target(course_id, course_index)
if normalized_target is None:
flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger")
return redirect(url_for("dashboard"))
normalized_course_id, normalized_course_index = normalized_target
store.add_course(user["id"], category, normalized_course_id, normalized_course_index)
flash("课程已加入任务列表。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/courses/<int:course_target_id>/delete")
@_login_required("user")
def delete_course(course_target_id: int):
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
if not _user_owns_course(user["id"], course_target_id):
flash("不能删除不属于当前账号的课程。", "danger")
return redirect(url_for("dashboard"))
store.delete_course(course_target_id)
flash("课程已移除。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/task/start")
@_login_required("user")
def start_task():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
task, created = _queue_task_for_user(user, requested_by=user["student_id"], requested_by_role="user")
flash("任务已启动。" if created else f"已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning")
return redirect(url_for("dashboard"))
@app.post("/dashboard/task/stop")
@_login_required("user")
def stop_task():
user = _get_current_user()
if user is None:
session.clear()
return redirect(url_for("login"))
active_task = store.find_active_task_for_user(user["id"])
if active_task and task_manager.stop_task(active_task["id"]):
flash("停止请求已发送。", "success")
else:
flash("当前没有可停止的任务。", "warning")
return redirect(url_for("dashboard"))
@app.get("/admin/dashboard")
@_login_required("admin")
def admin_dashboard():
return render_template("admin_dashboard.html", **_build_admin_dashboard_context())
@app.get("/admin/users")
@_login_required("admin")
def admin_users():
return render_template("admin_users.html", **_build_admin_users_context())
@app.get("/admin/schedules")
@_login_required("admin")
def admin_schedules():
return render_template("admin_schedules.html", **_build_admin_schedules_context())
@app.get("/admin/registration-codes")
@_login_required("admin")
def admin_registration_codes():
return render_template("admin_registration_codes.html", **_build_admin_registration_codes_context())
@app.get("/admin/logs")
@_login_required("admin")
def admin_logs():
return render_template("admin_logs.html", **_build_admin_logs_context())
@app.post("/admin/settings/parallel-limit")
@_login_required("admin")
def update_parallel_limit():
try:
parallel_limit = max(1, min(8, int(request.form.get("parallel_limit", str(config.default_parallel_limit)))))
except ValueError:
flash("并行数必须是 1 到 8 的整数。", "danger")
return _admin_post_redirect("admin_dashboard")
store.set_parallel_limit(parallel_limit)
flash(f"并行数已更新为 {parallel_limit}。", "success")
return _admin_post_redirect("admin_dashboard")
@app.post("/admin/users")
@_login_required("admin")
def create_user():
student_id = request.form.get("student_id", "").strip()
password = request.form.get("password", "").strip()
display_name = request.form.get("display_name", "").strip()
try:
refresh_interval_seconds = _parse_refresh_interval(
request.form.get("refresh_interval_seconds"),
default=config.poll_interval_seconds,
)
except ValueError as exc:
flash(str(exc), "danger")
return _admin_post_redirect("admin_users")
if not student_id.isdigit() or not password:
flash("请填写有效的学号和密码。", "danger")
return _admin_post_redirect("admin_users")
if store.get_user_by_student_id(student_id):
flash("该学号已经存在。", "warning")
return _admin_post_redirect("admin_users")
store.create_user(
student_id,
secret_box.encrypt(password),
display_name,
refresh_interval_seconds=refresh_interval_seconds,
)
flash("用户已创建。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/update")
@_login_required("admin")
def update_user(user_id: int):
user = store.get_user(user_id)
if user is None:
flash("用户不存在。", "danger")
return _admin_post_redirect("admin_users")
display_name = request.form.get("display_name", user["display_name"]).strip()
password = request.form.get("password", "").strip()
try:
refresh_interval_seconds = _parse_refresh_interval(
request.form.get("refresh_interval_seconds"),
default=int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
)
except ValueError as exc:
flash(str(exc), "danger")
return _admin_post_redirect("admin_users")
if password:
store.update_user(
user_id,
display_name=display_name,
password_encrypted=secret_box.encrypt(password),
refresh_interval_seconds=refresh_interval_seconds,
)
else:
store.update_user(user_id, display_name=display_name, refresh_interval_seconds=refresh_interval_seconds)
flash("用户信息已更新。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/toggle")
@_login_required("admin")
def toggle_user(user_id: int):
updated = store.toggle_user_active(user_id)
if updated is None:
flash("用户不存在。", "danger")
else:
flash("用户状态已切换。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/delete")
@_login_required("admin")
def delete_user_by_admin(user_id: int):
user = store.get_user(user_id)
if user is None:
flash("用户不存在。", "danger")
return _admin_post_redirect("admin_users")
active_task = store.find_active_task_for_user(user_id)
if active_task is not None:
flash("请先停止该用户当前任务,再删除用户。", "danger")
return _admin_post_redirect("admin_users")
store.delete_user(user_id)
flash("用户及其课程、日志、定时设置已删除。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/schedule")
@_login_required("admin")
def update_user_schedule(user_id: int):
if store.get_user(user_id) is None:
flash("用户不存在。", "danger")
return _admin_post_redirect("admin_schedules")
try:
schedule_payload = _parse_schedule_form(request.form)
except ValueError as exc:
flash(str(exc), "danger")
return _admin_post_redirect("admin_schedules")
store.upsert_user_schedule(user_id, **schedule_payload)
flash("定时启动终止设置已更新。", "success")
return _admin_post_redirect("admin_schedules")
@app.post("/admin/users/<int:user_id>/courses")
@_login_required("admin")
def admin_add_course(user_id: int):
if store.get_user(user_id) is None:
flash("用户不存在。", "danger")
return _admin_post_redirect("admin_users")
category = request.form.get("category", "free")
course_id = request.form.get("course_id", "")
course_index = request.form.get("course_index", "")
normalized_target = _validate_course_target(course_id, course_index)
if normalized_target is None:
flash("课程号需要使用 1-32 位字母或数字,课序号需要使用 1-8 位字母或数字。", "danger")
return _admin_post_redirect("admin_users")
normalized_course_id, normalized_course_index = normalized_target
store.add_course(user_id, category, normalized_course_id, normalized_course_index)
flash("课程已添加到对应用户。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/courses/<int:course_target_id>/delete")
@_login_required("admin")
def admin_delete_course(course_target_id: int):
store.delete_course(course_target_id)
flash("课程已删除。", "success")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/task/start")
@_login_required("admin")
def admin_start_user_task(user_id: int):
user = store.get_user(user_id)
if user is None:
flash("用户不存在。", "danger")
return _admin_post_redirect("admin_users")
admin_identity = _get_admin_identity()
task, created = _queue_task_for_user(user, requested_by=admin_identity["username"], requested_by_role="admin")
flash("任务已加入队列。" if created else f"该用户已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning")
return _admin_post_redirect("admin_users")
@app.post("/admin/users/<int:user_id>/task/stop")
@_login_required("admin")
def admin_stop_user_task(user_id: int):
active_task = store.find_active_task_for_user(user_id)
if active_task and task_manager.stop_task(active_task["id"]):
flash("已发送停止请求。", "success")
else:
flash("当前没有可停止任务。", "warning")
return _admin_post_redirect("admin_users")
@app.post("/admin/admins")
@_login_required("admin")
def create_admin():
if not session.get("is_super_admin", False):
flash("只有超级管理员可以新增管理员。", "danger")
return _admin_post_redirect("admin_dashboard")
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
if not username or not password:
flash("请填写管理员账号和密码。", "danger")
return _admin_post_redirect("admin_dashboard")
if username == config.super_admin_username or store.get_admin_by_username(username):
flash("该管理员账号已存在。", "warning")
return _admin_post_redirect("admin_dashboard")
store.create_admin(username, hash_password(password))
flash("管理员已创建。", "success")
return _admin_post_redirect("admin_dashboard")
@app.post("/admin/registration-codes")
@_login_required("admin")
def create_registration_code():
note = request.form.get("note", "").strip()
try:
max_uses = _parse_registration_code_max_uses(request.form.get("max_uses"))
except ValueError as exc:
flash(str(exc), "danger")
return _admin_post_redirect("admin_registration_codes")
admin_identity = _get_admin_identity()
created = store.create_registration_code(created_by=admin_identity["username"], note=note, max_uses=max_uses)
flash(f"注册码已创建:{created['code']}", "success")
return _admin_post_redirect("admin_registration_codes")
@app.post("/admin/registration-codes/<int:registration_code_id>/toggle")
@_login_required("admin")
def toggle_registration_code(registration_code_id: int):
updated = store.toggle_registration_code_active(registration_code_id)
if updated is None:
flash("注册码不存在。", "danger")
else:
flash("注册码状态已更新。", "success")
return _admin_post_redirect("admin_registration_codes")
@app.get("/api/user/status")
@_login_required("user")
def user_status():
user = _get_current_user()
if user is None:
return jsonify({"ok": False}), 401
task = store.get_latest_task_for_user(user["id"])
return jsonify(
{
"ok": True,
"task": task,
"courses": store.list_courses_for_user(user["id"]),
"user": {
"refresh_interval_seconds": int(user.get("refresh_interval_seconds") or config.poll_interval_seconds),
},
}
)
@app.get("/api/admin/status")
@_login_required("admin")
def admin_status():
return jsonify(
{
"ok": True,
"stats": store.get_admin_stats(),
"parallel_limit": store.get_parallel_limit(),
"recent_tasks": store.list_recent_tasks(limit=12),
}
)
@app.get("/api/user/logs/stream")
@_login_required("user")
def stream_user_logs():
user = _get_current_user()
if user is None:
return jsonify({"ok": False}), 401
last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(user_id=user["id"], limit=1))))
@stream_with_context
def generate():
current_last_id = last_id
while True:
logs = store.list_logs_after(current_last_id, user_id=user["id"], limit=60)
if logs:
for log in logs:
current_last_id = int(log["id"])
yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n"
else:
yield ": keep-alive\n\n"
time.sleep(1)
response = Response(generate(), mimetype="text/event-stream")
response.headers["Cache-Control"] = "no-cache"
response.headers["X-Accel-Buffering"] = "no"
return response
@app.get("/api/admin/logs/stream")
@_login_required("admin")
def stream_admin_logs():
last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(limit=1))))
@stream_with_context
def generate():
current_last_id = last_id
while True:
logs = store.list_logs_after(current_last_id, limit=80)
if logs:
for log in logs:
current_last_id = int(log["id"])
yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n"
else:
yield ": keep-alive\n\n"
time.sleep(1)
response = Response(generate(), mimetype="text/event-stream")
response.headers["Cache-Control"] = "no-cache"
response.headers["X-Accel-Buffering"] = "no"
return response