Spaces:
Paused
Paused
| from __future__ import annotations | |
| import json | |
| import time | |
| from functools import wraps | |
| from typing import Any, Callable | |
| from flask import ( | |
| Flask, | |
| Response, | |
| flash, | |
| jsonify, | |
| redirect, | |
| render_template, | |
| request, | |
| session, | |
| stream_with_context, | |
| url_for, | |
| ) | |
| from course_catcher.config import AppConfig, load_config | |
| from course_catcher.db import Database | |
| from course_catcher.security import CredentialCipher, mask_secret | |
| from course_catcher.task_manager import TaskManager | |
| def create_app() -> Flask: | |
| config = load_config() | |
| cipher = CredentialCipher(config.secret_key) | |
| db = Database(config.database_path, cipher, config.default_parallelism) | |
| db.ensure_superadmin(config.admin_username, config.admin_password) | |
| task_manager = TaskManager(db, config) | |
| task_manager.start() | |
| app = Flask( | |
| __name__, | |
| template_folder="../templates", | |
| static_folder="../static", | |
| ) | |
| app.config["SECRET_KEY"] = config.secret_key | |
| app.extensions["app_config"] = config | |
| app.extensions["db"] = db | |
| app.extensions["task_manager"] = task_manager | |
| def current_user() -> dict[str, Any] | None: | |
| user_id = session.get("user_id") | |
| if not user_id: | |
| return None | |
| return db.get_user_by_id(int(user_id)) | |
| def current_admin() -> dict[str, Any] | None: | |
| admin_id = session.get("admin_id") | |
| if not admin_id: | |
| return None | |
| return db.get_admin_by_id(int(admin_id)) | |
| def require_user(view: Callable): | |
| def wrapped(*args, **kwargs): | |
| user = current_user() | |
| if not user: | |
| return redirect(url_for("login")) | |
| return view(user, *args, **kwargs) | |
| return wrapped | |
| def require_admin(view: Callable): | |
| def wrapped(*args, **kwargs): | |
| admin = current_admin() | |
| if not admin: | |
| return redirect(url_for("admin_login")) | |
| return view(admin, *args, **kwargs) | |
| return wrapped | |
| def require_superadmin(view: Callable): | |
| def wrapped(*args, **kwargs): | |
| admin = current_admin() | |
| if not admin or admin["role"] != "superadmin": | |
| flash("只有超级管理员可以执行该操作。", "error") | |
| return redirect(url_for("admin_panel")) | |
| return view(admin, *args, **kwargs) | |
| return wrapped | |
| def normalize_student_id(student_id: str) -> str: | |
| return (student_id or "").strip() | |
| def normalize_course_pair(course_id: str, course_index: str) -> tuple[str, str]: | |
| return (course_id or "").strip(), (course_index or "").strip().zfill(2) | |
| def validate_student_login_form(student_id: str, password: str) -> str | None: | |
| if not student_id.isdigit(): | |
| return "学号必须为纯数字。" | |
| if len(student_id) < 8: | |
| return "学号长度看起来不正确。" | |
| if not password: | |
| return "密码不能为空。" | |
| return None | |
| def validate_course_form(course_id: str, course_index: str) -> str | None: | |
| if not course_id.isdigit(): | |
| return "课程号必须为纯数字。" | |
| if not course_index.isdigit(): | |
| return "课序号必须为纯数字。" | |
| if len(course_index) != 2: | |
| return "课序号必须是两位数字。" | |
| return None | |
| def build_user_view_model(user: dict[str, Any]) -> dict[str, Any]: | |
| dashboard_state = db.get_user_dashboard_state(user["id"]) | |
| courses = db.list_courses(user["id"]) | |
| logs = db.get_recent_logs(user_id=user["id"]) | |
| return { | |
| "user": user, | |
| "dashboard_state": dashboard_state, | |
| "courses": courses, | |
| "logs": logs, | |
| "masked_password": mask_secret("******"), | |
| } | |
| def build_admin_view_model(admin: dict[str, Any]) -> dict[str, Any]: | |
| summary = db.get_admin_summary() | |
| users = db.list_users_with_summary() | |
| users_with_courses: list[dict[str, Any]] = [] | |
| for user in users: | |
| entry = dict(user) | |
| entry["courses"] = db.list_courses(user["id"]) | |
| entry["dashboard_state"] = db.get_user_dashboard_state(user["id"]) | |
| users_with_courses.append(entry) | |
| return { | |
| "admin": admin, | |
| "summary": summary, | |
| "users": users_with_courses, | |
| "admins": db.list_admins(), | |
| "logs": db.get_recent_logs(), | |
| } | |
| def event_stream(log_scope_user_id: int | None = None): | |
| last_id = int(request.args.get("last_id", "0") or 0) | |
| while True: | |
| items = db.get_logs_after(last_id=last_id, user_id=log_scope_user_id) | |
| if items: | |
| for item in items: | |
| last_id = item["id"] | |
| yield f"id: {item['id']}\n" | |
| yield "event: log\n" | |
| yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n" | |
| else: | |
| yield ": keepalive\n\n" | |
| time.sleep(1) | |
| def inject_session_context(): | |
| return { | |
| "session_user": current_user(), | |
| "session_admin": current_admin(), | |
| } | |
| def index(): | |
| if current_user(): | |
| return redirect(url_for("dashboard")) | |
| if current_admin(): | |
| return redirect(url_for("admin_panel")) | |
| return redirect(url_for("login")) | |
| def login(): | |
| if current_user(): | |
| return redirect(url_for("dashboard")) | |
| if request.method == "POST": | |
| student_id = normalize_student_id(request.form.get("student_id", "")) | |
| password = request.form.get("password", "") | |
| error = validate_student_login_form(student_id, password) | |
| if error: | |
| flash(error, "error") | |
| return render_template("login.html") | |
| user, created = db.verify_or_create_user_login(student_id, password) | |
| if not user: | |
| flash("学号或密码错误。", "error") | |
| return render_template("login.html") | |
| session.clear() | |
| session["user_id"] = user["id"] | |
| flash("首次登录已自动创建账户。" if created else "登录成功。", "success") | |
| return redirect(url_for("dashboard")) | |
| return render_template("login.html") | |
| def logout(): | |
| session.clear() | |
| flash("已退出登录。", "success") | |
| return redirect(url_for("login")) | |
| def admin_login(): | |
| if current_admin(): | |
| return redirect(url_for("admin_panel")) | |
| if request.method == "POST": | |
| username = (request.form.get("username") or "").strip() | |
| password = request.form.get("password") or "" | |
| if not username or not password: | |
| flash("管理员账号和密码不能为空。", "error") | |
| return render_template("admin_login.html") | |
| admin = db.verify_admin_login(username, password) | |
| if not admin: | |
| flash("管理员账号或密码错误。", "error") | |
| return render_template("admin_login.html") | |
| session.clear() | |
| session["admin_id"] = admin["id"] | |
| flash("管理员登录成功。", "success") | |
| return redirect(url_for("admin_panel")) | |
| return render_template("admin_login.html") | |
| def admin_logout(): | |
| session.clear() | |
| flash("管理员已退出登录。", "success") | |
| return redirect(url_for("admin_login")) | |
| def dashboard(user: dict[str, Any]): | |
| return render_template("dashboard.html", **build_user_view_model(user)) | |
| def update_user_password(user: dict[str, Any]): | |
| password = request.form.get("password", "") | |
| if not password: | |
| flash("密码不能为空。", "error") | |
| return redirect(url_for("dashboard")) | |
| db.update_user_password(user["id"], password) | |
| db.add_log(None, user["id"], "user", "INFO", "用户已更新自己的登录密码。") | |
| flash("密码已更新。", "success") | |
| return redirect(url_for("dashboard")) | |
| def add_user_course(user: dict[str, Any]): | |
| course_id, course_index = normalize_course_pair( | |
| request.form.get("course_id", ""), | |
| request.form.get("course_index", ""), | |
| ) | |
| error = validate_course_form(course_id, course_index) | |
| if error: | |
| flash(error, "error") | |
| return redirect(url_for("dashboard")) | |
| db.add_course(user["id"], course_id, course_index) | |
| db.add_log(None, user["id"], "user", "INFO", f"新增课程 {course_id}_{course_index}。") | |
| flash("课程已添加。", "success") | |
| return redirect(url_for("dashboard")) | |
| def delete_user_course(user: dict[str, Any], course_record_id: int): | |
| db.delete_course(course_record_id, user["id"]) | |
| db.add_log(None, user["id"], "user", "INFO", f"删除课程记录 #{course_record_id}。") | |
| flash("课程已删除。", "success") | |
| return redirect(url_for("dashboard")) | |
| def start_user_task(user: dict[str, Any]): | |
| task = db.create_task(user["id"], "user", user["student_id"]) | |
| db.add_log(task["id"], user["id"], "user", "INFO", "用户发起了新的选课任务。") | |
| flash("任务已加入队列。", "success") | |
| return redirect(url_for("dashboard")) | |
| def stop_user_task(user: dict[str, Any]): | |
| db.request_stop_task(user["id"]) | |
| db.add_log(None, user["id"], "user", "INFO", "用户请求停止当前任务。") | |
| flash("停止请求已提交。", "success") | |
| return redirect(url_for("dashboard")) | |
| def dashboard_status(user: dict[str, Any]): | |
| return jsonify(db.get_user_dashboard_state(user["id"])) | |
| def user_log_stream(user: dict[str, Any]): | |
| response = Response(stream_with_context(event_stream(log_scope_user_id=user["id"]))) | |
| response.headers["Content-Type"] = "text/event-stream" | |
| response.headers["Cache-Control"] = "no-cache" | |
| response.headers["X-Accel-Buffering"] = "no" | |
| return response | |
| def admin_panel(admin: dict[str, Any]): | |
| return render_template("admin.html", **build_admin_view_model(admin)) | |
| def update_parallelism(admin: dict[str, Any]): | |
| raw_value = request.form.get("parallelism", "1") | |
| try: | |
| parallelism = max(1, min(8, int(raw_value))) | |
| except ValueError: | |
| flash("并行数必须是数字。", "error") | |
| return redirect(url_for("admin_panel")) | |
| db.set_setting("max_parallel_tasks", str(parallelism)) | |
| db.add_log(None, None, "admin", "INFO", f"管理员 {admin['username']} 将并行数设置为 {parallelism}。") | |
| flash("并行数已更新。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def create_or_update_user_from_admin(admin: dict[str, Any]): | |
| student_id = normalize_student_id(request.form.get("student_id", "")) | |
| password = request.form.get("password", "") | |
| error = validate_student_login_form(student_id, password) | |
| if error: | |
| flash(error, "error") | |
| return redirect(url_for("admin_panel")) | |
| user = db.create_or_update_user(student_id, password) | |
| db.add_log(None, user["id"], "admin", "INFO", f"管理员 {admin['username']} 录入或更新了用户账号。") | |
| initial_course_id, initial_course_index = normalize_course_pair( | |
| request.form.get("course_id", ""), | |
| request.form.get("course_index", ""), | |
| ) | |
| if initial_course_id or initial_course_index: | |
| course_error = validate_course_form(initial_course_id, initial_course_index) | |
| if course_error: | |
| flash(course_error, "error") | |
| return redirect(url_for("admin_panel")) | |
| db.add_course(user["id"], initial_course_id, initial_course_index) | |
| db.add_log( | |
| None, | |
| user["id"], | |
| "admin", | |
| "INFO", | |
| f"管理员 {admin['username']} 为用户新增课程 {initial_course_id}_{initial_course_index}。", | |
| ) | |
| flash("用户信息已保存。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def update_user_password_from_admin(admin: dict[str, Any], user_id: int): | |
| password = request.form.get("password", "") | |
| if not password: | |
| flash("密码不能为空。", "error") | |
| return redirect(url_for("admin_panel")) | |
| db.update_user_password(user_id, password) | |
| db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 更新了用户密码。") | |
| flash("用户密码已更新。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def add_user_course_from_admin(admin: dict[str, Any], user_id: int): | |
| course_id, course_index = normalize_course_pair( | |
| request.form.get("course_id", ""), | |
| request.form.get("course_index", ""), | |
| ) | |
| error = validate_course_form(course_id, course_index) | |
| if error: | |
| flash(error, "error") | |
| return redirect(url_for("admin_panel")) | |
| db.add_course(user_id, course_id, course_index) | |
| db.add_log( | |
| None, | |
| user_id, | |
| "admin", | |
| "INFO", | |
| f"管理员 {admin['username']} 为用户新增课程 {course_id}_{course_index}。", | |
| ) | |
| flash("课程已添加到目标用户。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def delete_user_course_from_admin(admin: dict[str, Any], user_id: int, course_record_id: int): | |
| db.delete_course(course_record_id, user_id) | |
| db.add_log( | |
| None, | |
| user_id, | |
| "admin", | |
| "INFO", | |
| f"管理员 {admin['username']} 删除了课程记录 #{course_record_id}。", | |
| ) | |
| flash("课程已删除。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def start_user_task_from_admin(admin: dict[str, Any], user_id: int): | |
| user = db.get_user_by_id(user_id) | |
| if not user: | |
| flash("用户不存在。", "error") | |
| return redirect(url_for("admin_panel")) | |
| task = db.create_task(user_id, "admin", admin["username"]) | |
| db.add_log( | |
| task["id"], | |
| user_id, | |
| "admin", | |
| "INFO", | |
| f"管理员 {admin['username']} 启动了用户 {user['student_id']} 的任务。", | |
| ) | |
| flash("任务已加入队列。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def stop_user_task_from_admin(admin: dict[str, Any], user_id: int): | |
| db.request_stop_task(user_id) | |
| db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 请求停止用户任务。") | |
| flash("停止请求已提交。", "success") | |
| return redirect(url_for("admin_panel")) | |
| def create_admin_account(admin: dict[str, Any]): | |
| username = (request.form.get("username") or "").strip() | |
| password = request.form.get("password") or "" | |
| if not username or not password: | |
| flash("管理员账号和密码不能为空。", "error") | |
| return redirect(url_for("admin_panel")) | |
| try: | |
| db.create_admin(username, password) | |
| db.add_log(None, None, "admin", "INFO", f"超级管理员 {admin['username']} 创建了管理员 {username}。") | |
| flash("管理员已创建。", "success") | |
| except Exception as exc: | |
| flash(f"管理员创建失败:{exc}", "error") | |
| return redirect(url_for("admin_panel")) | |
| def admin_status(admin: dict[str, Any]): | |
| return jsonify(db.get_admin_summary()) | |
| def admin_log_stream(admin: dict[str, Any]): | |
| response = Response(stream_with_context(event_stream(log_scope_user_id=None))) | |
| response.headers["Content-Type"] = "text/event-stream" | |
| response.headers["Cache-Control"] = "no-cache" | |
| response.headers["X-Accel-Buffering"] = "no" | |
| return response | |
| return app | |