from __future__ import annotations import atexit import json import time from functools import wraps from typing import Callable from flask import Flask, Response, flash, g, jsonify, redirect, render_template, request, session, stream_with_context, url_for from core.config import AppConfig from core.db import Database from core.security import SecretBox, hash_password, verify_password from core.task_manager import TaskManager from core.runtime_logging import configure_logging, get_logger configure_logging() APP_LOGGER = get_logger("sacc.web") config = AppConfig.load() store = Database(config.db_path, default_parallel_limit=config.default_parallel_limit) store.init_db() secret_box = SecretBox(config.encryption_key) task_manager = TaskManager(config=config, store=store, secret_box=secret_box) def _seed_legacy_user() -> None: if store.list_users(): return legacy_path = config.root_dir / "user_data.json" if not legacy_path.exists(): return try: payload = json.loads(legacy_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return student_id = str(payload.get("std_id", "")).strip() password = str(payload.get("password", "")).strip() if not student_id or not password: return user_id = store.create_user(student_id, secret_box.encrypt(password), "Legacy User") for source_key, category in (("a", "plan"), ("b", "free")): for course in payload.get("course", {}).get(source_key, []): course_id = str(course.get("course_id", "")).strip() course_index = str(course.get("course_index", "")).strip() if course_id and course_index: store.add_course(user_id, category, course_id, course_index) _seed_legacy_user() task_manager.start() atexit.register(task_manager.shutdown) app = Flask(__name__, template_folder="templates", static_folder="static") app.secret_key = config.session_secret app.config.update(SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax") APP_LOGGER.info( "Application bootstrap complete | data_dir=%s db_path=%s chrome_binary=%s chromedriver_path=%s default_parallel_limit=%s", config.data_dir, config.db_path, config.chrome_binary, config.chromedriver_path, config.default_parallel_limit, ) CATEGORY_LABELS = { "plan": "方案选课", "free": "自由选课", } TASK_LABELS = { "pending": "排队中", "running": "执行中", "cancel_requested": "停止中", "completed": "已完成", "stopped": "已停止", "failed": "失败", } SKIPPED_REQUEST_LOG_PREFIXES = ( "/static/", "/api/", ) SKIPPED_REQUEST_LOG_PATHS = { "/favicon.ico", } def _current_actor_label() -> str: role = session.get("role", "guest") if role == "user": return f"user:{session.get('user_id', '-')}" if role == "admin": return f"admin:{session.get('admin_username', '-')}" return "guest" @app.before_request def before_request_logging() -> None: g.request_started_at = time.perf_counter() @app.after_request def after_request_logging(response): if request.path in SKIPPED_REQUEST_LOG_PATHS: return response if any(request.path.startswith(prefix) for prefix in SKIPPED_REQUEST_LOG_PREFIXES): return response started_at = getattr(g, "request_started_at", None) duration_ms = 0.0 if started_at is None else (time.perf_counter() - started_at) * 1000 remote_addr = request.headers.get("x-forwarded-for") or request.remote_addr or "-" APP_LOGGER.info( "HTTP %s %s -> %s in %.1fms | actor=%s remote=%s", request.method, request.path, response.status_code, duration_ms, _current_actor_label(), remote_addr, ) return response @app.context_processor def inject_globals() -> dict: return { "category_labels": CATEGORY_LABELS, "task_labels": TASK_LABELS, "session_role": session.get("role", "guest"), } def _login_required(role: str) -> Callable: def decorator(view: Callable) -> Callable: @wraps(view) def wrapped(*args, **kwargs): current_role = session.get("role") if role == "user" and current_role != "user": flash("请先登录学生账号。", "warning") return redirect(url_for("login")) if role == "admin" and current_role != "admin": flash("请先登录管理员账号。", "warning") return redirect(url_for("admin_login")) return view(*args, **kwargs) return wrapped return decorator def _get_current_user() -> dict | None: user_id = session.get("user_id") if not user_id: return None return store.get_user(int(user_id)) def _get_admin_identity() -> dict: return { "username": session.get("admin_username", ""), "is_super_admin": bool(session.get("is_super_admin", False)), } def _user_owns_course(user_id: int, course_target_id: int) -> bool: return any(course["id"] == course_target_id for course in store.list_courses_for_user(user_id)) def _build_user_dashboard_context(user: dict) -> dict: return { "current_user": user, "courses": store.list_courses_for_user(user["id"]), "task": store.get_latest_task_for_user(user["id"]), "recent_logs": store.list_recent_logs(user_id=user["id"], limit=config.logs_page_size), } def _build_admin_dashboard_context() -> dict: users = store.list_users() for user in users: user["courses"] = store.list_courses_for_user(user["id"]) user["latest_task"] = store.get_latest_task_for_user(user["id"]) admin_identity = _get_admin_identity() return { "users": users, "admins": store.list_admins(), "stats": store.get_admin_stats(), "recent_tasks": store.list_recent_tasks(limit=18), "recent_logs": store.list_recent_logs(limit=config.logs_page_size), "parallel_limit": store.get_parallel_limit(), "is_super_admin": admin_identity["is_super_admin"], "admin_identity": admin_identity, } def _queue_task_for_user(user: dict, *, requested_by: str, requested_by_role: str) -> tuple[dict, bool]: return task_manager.queue_task(user["id"], requested_by=requested_by, requested_by_role=requested_by_role) def _latest_log_id(logs: list[dict]) -> int: if not logs: return 0 return int(logs[-1]["id"]) @app.get("/") def index(): if session.get("role") == "user": return redirect(url_for("dashboard")) if session.get("role") == "admin": return redirect(url_for("admin_dashboard")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": student_id = request.form.get("student_id", "").strip() password = request.form.get("password", "") user = store.get_user_by_student_id(student_id) if user is None: flash("没有找到该学号对应的账号,请联系管理员录入。", "danger") return render_template("login.html") if not user["is_active"]: flash("该账号已被管理员禁用。", "danger") return render_template("login.html") try: stored_password = secret_box.decrypt(user["password_encrypted"]) except Exception: flash("账号数据损坏,请联系管理员重置密码。", "danger") return render_template("login.html") if stored_password != password: flash("学号或密码不正确。", "danger") return render_template("login.html") session.clear() session["role"] = "user" session["user_id"] = user["id"] return redirect(url_for("dashboard")) return render_template("login.html") @app.post("/logout") def logout(): session.clear() return redirect(url_for("login")) @app.route("/admin", methods=["GET", "POST"]) def admin_login(): if request.method == "POST": username = request.form.get("username", "").strip() password = request.form.get("password", "") is_super_admin = username == config.super_admin_username and password == config.super_admin_password admin_row = store.get_admin_by_username(username) is_regular_admin = bool(admin_row and verify_password(admin_row["password_hash"], password)) if not is_super_admin and not is_regular_admin: flash("管理员账号或密码错误。", "danger") return render_template("admin_login.html") session.clear() session["role"] = "admin" session["admin_username"] = username session["is_super_admin"] = is_super_admin return redirect(url_for("admin_dashboard")) return render_template("admin_login.html") @app.post("/admin/logout") def admin_logout(): session.clear() return redirect(url_for("admin_login")) @app.get("/dashboard") @_login_required("user") def dashboard(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) return render_template("dashboard.html", **_build_user_dashboard_context(user)) @app.post("/dashboard/profile") @_login_required("user") def update_profile(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) password = request.form.get("password", "").strip() display_name = request.form.get("display_name", "").strip() if not password: flash("密码不能为空。", "danger") return redirect(url_for("dashboard")) store.update_user(user["id"], password_encrypted=secret_box.encrypt(password), display_name=display_name) flash("账号信息已更新。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses") @_login_required("user") def add_course(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) category = request.form.get("category", "free") course_id = request.form.get("course_id", "").strip() course_index = request.form.get("course_index", "").strip() if not course_id.isdigit() or not course_index.isdigit() or len(course_index) != 2: flash("课程号必须为数字,课序号必须是 2 位数字。", "danger") return redirect(url_for("dashboard")) store.add_course(user["id"], category, course_id, course_index) flash("课程已加入任务列表。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses//delete") @_login_required("user") def delete_course(course_target_id: int): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) if not _user_owns_course(user["id"], course_target_id): flash("不能删除不属于当前账号的课程。", "danger") return redirect(url_for("dashboard")) store.delete_course(course_target_id) flash("课程已移除。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/task/start") @_login_required("user") def start_task(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) task, created = _queue_task_for_user(user, requested_by=user["student_id"], requested_by_role="user") flash("任务已启动。" if created else f"已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") return redirect(url_for("dashboard")) @app.post("/dashboard/task/stop") @_login_required("user") def stop_task(): user = _get_current_user() if user is None: session.clear() return redirect(url_for("login")) active_task = store.find_active_task_for_user(user["id"]) if active_task and task_manager.stop_task(active_task["id"]): flash("停止请求已发送。", "success") else: flash("当前没有可停止的任务。", "warning") return redirect(url_for("dashboard")) @app.get("/admin/dashboard") @_login_required("admin") def admin_dashboard(): return render_template("admin_dashboard.html", **_build_admin_dashboard_context()) @app.post("/admin/settings/parallel-limit") @_login_required("admin") def update_parallel_limit(): try: parallel_limit = max(1, min(8, int(request.form.get("parallel_limit", "2")))) except ValueError: flash("并行数必须是 1 到 8 的整数。", "danger") return redirect(url_for("admin_dashboard")) store.set_parallel_limit(parallel_limit) flash(f"并行数已更新为 {parallel_limit}。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/users") @_login_required("admin") def create_user(): student_id = request.form.get("student_id", "").strip() password = request.form.get("password", "").strip() display_name = request.form.get("display_name", "").strip() if not student_id.isdigit() or not password: flash("请填写有效的学号和密码。", "danger") return redirect(url_for("admin_dashboard")) if store.get_user_by_student_id(student_id): flash("该学号已经存在。", "warning") return redirect(url_for("admin_dashboard")) store.create_user(student_id, secret_box.encrypt(password), display_name) flash("用户已创建。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/users//update") @_login_required("admin") def update_user(user_id: int): user = store.get_user(user_id) if user is None: flash("用户不存在。", "danger") return redirect(url_for("admin_dashboard")) display_name = request.form.get("display_name", user["display_name"]).strip() password = request.form.get("password", "").strip() if password: store.update_user(user_id, display_name=display_name, password_encrypted=secret_box.encrypt(password)) else: store.update_user(user_id, display_name=display_name) flash("用户信息已更新。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/users//toggle") @_login_required("admin") def toggle_user(user_id: int): updated = store.toggle_user_active(user_id) if updated is None: flash("用户不存在。", "danger") else: flash("用户状态已切换。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/users//courses") @_login_required("admin") def admin_add_course(user_id: int): if store.get_user(user_id) is None: flash("用户不存在。", "danger") return redirect(url_for("admin_dashboard")) category = request.form.get("category", "free") course_id = request.form.get("course_id", "").strip() course_index = request.form.get("course_index", "").strip() if not course_id.isdigit() or not course_index.isdigit() or len(course_index) != 2: flash("课程号必须为数字,课序号必须是 2 位数字。", "danger") return redirect(url_for("admin_dashboard")) store.add_course(user_id, category, course_id, course_index) flash("课程已添加到对应用户。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/courses//delete") @_login_required("admin") def admin_delete_course(course_target_id: int): store.delete_course(course_target_id) flash("课程已删除。", "success") return redirect(url_for("admin_dashboard")) @app.post("/admin/users//task/start") @_login_required("admin") def admin_start_user_task(user_id: int): user = store.get_user(user_id) if user is None: flash("用户不存在。", "danger") return redirect(url_for("admin_dashboard")) admin_identity = _get_admin_identity() task, created = _queue_task_for_user(user, requested_by=admin_identity["username"], requested_by_role="admin") flash("任务已加入队列。" if created else f"该用户已有任务处于 {TASK_LABELS.get(task['status'], task['status'])} 状态。", "success" if created else "warning") return redirect(url_for("admin_dashboard")) @app.post("/admin/users//task/stop") @_login_required("admin") def admin_stop_user_task(user_id: int): active_task = store.find_active_task_for_user(user_id) if active_task and task_manager.stop_task(active_task["id"]): flash("已发送停止请求。", "success") else: flash("当前没有可停止任务。", "warning") return redirect(url_for("admin_dashboard")) @app.post("/admin/admins") @_login_required("admin") def create_admin(): if not session.get("is_super_admin", False): flash("只有超级管理员可以新增管理员。", "danger") return redirect(url_for("admin_dashboard")) username = request.form.get("username", "").strip() password = request.form.get("password", "").strip() if not username or not password: flash("请填写管理员账号和密码。", "danger") return redirect(url_for("admin_dashboard")) if username == config.super_admin_username or store.get_admin_by_username(username): flash("该管理员账号已存在。", "warning") return redirect(url_for("admin_dashboard")) store.create_admin(username, hash_password(password)) flash("管理员已创建。", "success") return redirect(url_for("admin_dashboard")) @app.get("/api/user/status") @_login_required("user") def user_status(): user = _get_current_user() if user is None: return jsonify({"ok": False}), 401 task = store.get_latest_task_for_user(user["id"]) return jsonify({"ok": True, "task": task, "courses": store.list_courses_for_user(user["id"])}) @app.get("/api/admin/status") @_login_required("admin") def admin_status(): return jsonify( { "ok": True, "stats": store.get_admin_stats(), "parallel_limit": store.get_parallel_limit(), "recent_tasks": store.list_recent_tasks(limit=12), } ) @app.get("/api/user/logs/stream") @_login_required("user") def stream_user_logs(): user = _get_current_user() if user is None: return jsonify({"ok": False}), 401 last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(user_id=user["id"], limit=1)))) @stream_with_context def generate(): current_last_id = last_id while True: logs = store.list_logs_after(current_last_id, user_id=user["id"], limit=60) if logs: for log in logs: current_last_id = int(log["id"]) yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" else: yield ": keep-alive\n\n" time.sleep(1) response = Response(generate(), mimetype="text/event-stream") response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response @app.get("/api/admin/logs/stream") @_login_required("admin") def stream_admin_logs(): last_id = int(request.args.get("last_id", _latest_log_id(store.list_recent_logs(limit=1)))) @stream_with_context def generate(): current_last_id = last_id while True: logs = store.list_logs_after(current_last_id, limit=80) if logs: for log in logs: current_last_id = int(log["id"]) yield f"id: {log['id']}\ndata: {json.dumps(log, ensure_ascii=False)}\n\n" else: yield ": keep-alive\n\n" time.sleep(1) response = Response(generate(), mimetype="text/event-stream") response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response