| from __future__ import annotations |
|
|
| import json |
| import time |
| from functools import wraps |
| from typing import Any, Callable |
|
|
| from flask import ( |
| Flask, |
| Response, |
| flash, |
| jsonify, |
| redirect, |
| render_template, |
| request, |
| session, |
| stream_with_context, |
| url_for, |
| ) |
|
|
| from course_catcher.config import AppConfig, load_config |
| from course_catcher.db import Database |
| from course_catcher.security import CredentialCipher, mask_secret |
| from course_catcher.task_manager import TaskManager |
|
|
|
|
| def create_app() -> Flask: |
| config = load_config() |
| cipher = CredentialCipher(config.secret_key) |
| db = Database(config.database_path, cipher, config.default_parallelism) |
| db.ensure_superadmin(config.admin_username, config.admin_password) |
| task_manager = TaskManager(db, config) |
| task_manager.start() |
|
|
| app = Flask( |
| __name__, |
| template_folder="../templates", |
| static_folder="../static", |
| ) |
| app.config["SECRET_KEY"] = config.secret_key |
| app.extensions["app_config"] = config |
| app.extensions["db"] = db |
| app.extensions["task_manager"] = task_manager |
|
|
| def current_user() -> dict[str, Any] | None: |
| user_id = session.get("user_id") |
| if not user_id: |
| return None |
| return db.get_user_by_id(int(user_id)) |
|
|
| def current_admin() -> dict[str, Any] | None: |
| admin_id = session.get("admin_id") |
| if not admin_id: |
| return None |
| return db.get_admin_by_id(int(admin_id)) |
|
|
| def require_user(view: Callable): |
| @wraps(view) |
| def wrapped(*args, **kwargs): |
| user = current_user() |
| if not user: |
| return redirect(url_for("login")) |
| return view(user, *args, **kwargs) |
|
|
| return wrapped |
|
|
| def require_admin(view: Callable): |
| @wraps(view) |
| def wrapped(*args, **kwargs): |
| admin = current_admin() |
| if not admin: |
| return redirect(url_for("admin_login")) |
| return view(admin, *args, **kwargs) |
|
|
| return wrapped |
|
|
| def require_superadmin(view: Callable): |
| @wraps(view) |
| def wrapped(*args, **kwargs): |
| admin = current_admin() |
| if not admin or admin["role"] != "superadmin": |
| flash("只有超级管理员可以执行该操作。", "error") |
| return redirect(url_for("admin_panel")) |
| return view(admin, *args, **kwargs) |
|
|
| return wrapped |
|
|
| def normalize_student_id(student_id: str) -> str: |
| return (student_id or "").strip() |
|
|
| def normalize_course_pair(course_id: str, course_index: str) -> tuple[str, str]: |
| return (course_id or "").strip(), (course_index or "").strip().zfill(2) |
|
|
| def validate_student_login_form(student_id: str, password: str) -> str | None: |
| if not student_id.isdigit(): |
| return "学号必须为纯数字。" |
| if len(student_id) < 8: |
| return "学号长度看起来不正确。" |
| if not password: |
| return "密码不能为空。" |
| return None |
|
|
| def validate_course_form(course_id: str, course_index: str) -> str | None: |
| if not course_id.isdigit(): |
| return "课程号必须为纯数字。" |
| if not course_index.isdigit(): |
| return "课序号必须为纯数字。" |
| if len(course_index) != 2: |
| return "课序号必须是两位数字。" |
| return None |
|
|
| def build_user_view_model(user: dict[str, Any]) -> dict[str, Any]: |
| dashboard_state = db.get_user_dashboard_state(user["id"]) |
| courses = db.list_courses(user["id"]) |
| logs = db.get_recent_logs(user_id=user["id"]) |
| return { |
| "user": user, |
| "dashboard_state": dashboard_state, |
| "courses": courses, |
| "logs": logs, |
| "masked_password": mask_secret("******"), |
| } |
|
|
| def build_admin_view_model(admin: dict[str, Any]) -> dict[str, Any]: |
| summary = db.get_admin_summary() |
| users = db.list_users_with_summary() |
| users_with_courses: list[dict[str, Any]] = [] |
| for user in users: |
| entry = dict(user) |
| entry["courses"] = db.list_courses(user["id"]) |
| entry["dashboard_state"] = db.get_user_dashboard_state(user["id"]) |
| users_with_courses.append(entry) |
| return { |
| "admin": admin, |
| "summary": summary, |
| "users": users_with_courses, |
| "admins": db.list_admins(), |
| "logs": db.get_recent_logs(), |
| } |
|
|
| def event_stream(log_scope_user_id: int | None = None): |
| last_id = int(request.args.get("last_id", "0") or 0) |
| while True: |
| items = db.get_logs_after(last_id=last_id, user_id=log_scope_user_id) |
| if items: |
| for item in items: |
| last_id = item["id"] |
| yield f"id: {item['id']}\n" |
| yield "event: log\n" |
| yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n" |
| else: |
| yield ": keepalive\n\n" |
| time.sleep(1) |
|
|
| @app.context_processor |
| def inject_session_context(): |
| return { |
| "session_user": current_user(), |
| "session_admin": current_admin(), |
| } |
|
|
| @app.get("/") |
| def index(): |
| if current_user(): |
| return redirect(url_for("dashboard")) |
| if current_admin(): |
| return redirect(url_for("admin_panel")) |
| return redirect(url_for("login")) |
|
|
| @app.route("/login", methods=["GET", "POST"]) |
| def login(): |
| if current_user(): |
| return redirect(url_for("dashboard")) |
| if request.method == "POST": |
| student_id = normalize_student_id(request.form.get("student_id", "")) |
| password = request.form.get("password", "") |
| error = validate_student_login_form(student_id, password) |
| if error: |
| flash(error, "error") |
| return render_template("login.html") |
|
|
| user, created = db.verify_or_create_user_login(student_id, password) |
| if not user: |
| flash("学号或密码错误。", "error") |
| return render_template("login.html") |
|
|
| session.clear() |
| session["user_id"] = user["id"] |
| flash("首次登录已自动创建账户。" if created else "登录成功。", "success") |
| return redirect(url_for("dashboard")) |
| return render_template("login.html") |
|
|
| @app.post("/logout") |
| def logout(): |
| session.clear() |
| flash("已退出登录。", "success") |
| return redirect(url_for("login")) |
|
|
| @app.route("/admin", methods=["GET", "POST"]) |
| def admin_login(): |
| if current_admin(): |
| return redirect(url_for("admin_panel")) |
| if request.method == "POST": |
| username = (request.form.get("username") or "").strip() |
| password = request.form.get("password") or "" |
| if not username or not password: |
| flash("管理员账号和密码不能为空。", "error") |
| return render_template("admin_login.html") |
|
|
| admin = db.verify_admin_login(username, password) |
| if not admin: |
| flash("管理员账号或密码错误。", "error") |
| return render_template("admin_login.html") |
|
|
| session.clear() |
| session["admin_id"] = admin["id"] |
| flash("管理员登录成功。", "success") |
| return redirect(url_for("admin_panel")) |
| return render_template("admin_login.html") |
|
|
| @app.post("/admin/logout") |
| def admin_logout(): |
| session.clear() |
| flash("管理员已退出登录。", "success") |
| return redirect(url_for("admin_login")) |
|
|
| @app.get("/dashboard") |
| @require_user |
| def dashboard(user: dict[str, Any]): |
| return render_template("dashboard.html", **build_user_view_model(user)) |
|
|
| @app.post("/dashboard/password") |
| @require_user |
| def update_user_password(user: dict[str, Any]): |
| password = request.form.get("password", "") |
| if not password: |
| flash("密码不能为空。", "error") |
| return redirect(url_for("dashboard")) |
| db.update_user_password(user["id"], password) |
| db.add_log(None, user["id"], "user", "INFO", "用户已更新自己的登录密码。") |
| flash("密码已更新。", "success") |
| return redirect(url_for("dashboard")) |
|
|
| @app.post("/dashboard/courses") |
| @require_user |
| def add_user_course(user: dict[str, Any]): |
| course_id, course_index = normalize_course_pair( |
| request.form.get("course_id", ""), |
| request.form.get("course_index", ""), |
| ) |
| error = validate_course_form(course_id, course_index) |
| if error: |
| flash(error, "error") |
| return redirect(url_for("dashboard")) |
|
|
| db.add_course(user["id"], course_id, course_index) |
| db.add_log(None, user["id"], "user", "INFO", f"新增课程 {course_id}_{course_index}。") |
| flash("课程已添加。", "success") |
| return redirect(url_for("dashboard")) |
|
|
| @app.post("/dashboard/courses/<int:course_record_id>/delete") |
| @require_user |
| def delete_user_course(user: dict[str, Any], course_record_id: int): |
| db.delete_course(course_record_id, user["id"]) |
| db.add_log(None, user["id"], "user", "INFO", f"删除课程记录 #{course_record_id}。") |
| flash("课程已删除。", "success") |
| return redirect(url_for("dashboard")) |
|
|
| @app.post("/dashboard/tasks/start") |
| @require_user |
| def start_user_task(user: dict[str, Any]): |
| task = db.create_task(user["id"], "user", user["student_id"]) |
| db.add_log(task["id"], user["id"], "user", "INFO", "用户发起了新的选课任务。") |
| flash("任务已加入队列。", "success") |
| return redirect(url_for("dashboard")) |
|
|
| @app.post("/dashboard/tasks/stop") |
| @require_user |
| def stop_user_task(user: dict[str, Any]): |
| db.request_stop_task(user["id"]) |
| db.add_log(None, user["id"], "user", "INFO", "用户请求停止当前任务。") |
| flash("停止请求已提交。", "success") |
| return redirect(url_for("dashboard")) |
|
|
| @app.get("/api/dashboard/status") |
| @require_user |
| def dashboard_status(user: dict[str, Any]): |
| return jsonify(db.get_user_dashboard_state(user["id"])) |
|
|
| @app.get("/events/logs/user") |
| @require_user |
| def user_log_stream(user: dict[str, Any]): |
| response = Response(stream_with_context(event_stream(log_scope_user_id=user["id"]))) |
| response.headers["Content-Type"] = "text/event-stream" |
| response.headers["Cache-Control"] = "no-cache" |
| response.headers["X-Accel-Buffering"] = "no" |
| return response |
|
|
| @app.get("/admin/panel") |
| @require_admin |
| def admin_panel(admin: dict[str, Any]): |
| return render_template("admin.html", **build_admin_view_model(admin)) |
|
|
| @app.post("/admin/panel/settings/parallelism") |
| @require_admin |
| def update_parallelism(admin: dict[str, Any]): |
| raw_value = request.form.get("parallelism", "1") |
| try: |
| parallelism = max(1, min(8, int(raw_value))) |
| except ValueError: |
| flash("并行数必须是数字。", "error") |
| return redirect(url_for("admin_panel")) |
|
|
| db.set_setting("max_parallel_tasks", str(parallelism)) |
| db.add_log(None, None, "admin", "INFO", f"管理员 {admin['username']} 将并行数设置为 {parallelism}。") |
| flash("并行数已更新。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users") |
| @require_admin |
| def create_or_update_user_from_admin(admin: dict[str, Any]): |
| student_id = normalize_student_id(request.form.get("student_id", "")) |
| password = request.form.get("password", "") |
| error = validate_student_login_form(student_id, password) |
| if error: |
| flash(error, "error") |
| return redirect(url_for("admin_panel")) |
|
|
| user = db.create_or_update_user(student_id, password) |
| db.add_log(None, user["id"], "admin", "INFO", f"管理员 {admin['username']} 录入或更新了用户账号。") |
|
|
| initial_course_id, initial_course_index = normalize_course_pair( |
| request.form.get("course_id", ""), |
| request.form.get("course_index", ""), |
| ) |
| if initial_course_id or initial_course_index: |
| course_error = validate_course_form(initial_course_id, initial_course_index) |
| if course_error: |
| flash(course_error, "error") |
| return redirect(url_for("admin_panel")) |
| db.add_course(user["id"], initial_course_id, initial_course_index) |
| db.add_log( |
| None, |
| user["id"], |
| "admin", |
| "INFO", |
| f"管理员 {admin['username']} 为用户新增课程 {initial_course_id}_{initial_course_index}。", |
| ) |
|
|
| flash("用户信息已保存。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users/<int:user_id>/password") |
| @require_admin |
| def update_user_password_from_admin(admin: dict[str, Any], user_id: int): |
| password = request.form.get("password", "") |
| if not password: |
| flash("密码不能为空。", "error") |
| return redirect(url_for("admin_panel")) |
| db.update_user_password(user_id, password) |
| db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 更新了用户密码。") |
| flash("用户密码已更新。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users/<int:user_id>/courses") |
| @require_admin |
| def add_user_course_from_admin(admin: dict[str, Any], user_id: int): |
| course_id, course_index = normalize_course_pair( |
| request.form.get("course_id", ""), |
| request.form.get("course_index", ""), |
| ) |
| error = validate_course_form(course_id, course_index) |
| if error: |
| flash(error, "error") |
| return redirect(url_for("admin_panel")) |
| db.add_course(user_id, course_id, course_index) |
| db.add_log( |
| None, |
| user_id, |
| "admin", |
| "INFO", |
| f"管理员 {admin['username']} 为用户新增课程 {course_id}_{course_index}。", |
| ) |
| flash("课程已添加到目标用户。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users/<int:user_id>/courses/<int:course_record_id>/delete") |
| @require_admin |
| def delete_user_course_from_admin(admin: dict[str, Any], user_id: int, course_record_id: int): |
| db.delete_course(course_record_id, user_id) |
| db.add_log( |
| None, |
| user_id, |
| "admin", |
| "INFO", |
| f"管理员 {admin['username']} 删除了课程记录 #{course_record_id}。", |
| ) |
| flash("课程已删除。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users/<int:user_id>/tasks/start") |
| @require_admin |
| def start_user_task_from_admin(admin: dict[str, Any], user_id: int): |
| user = db.get_user_by_id(user_id) |
| if not user: |
| flash("用户不存在。", "error") |
| return redirect(url_for("admin_panel")) |
| task = db.create_task(user_id, "admin", admin["username"]) |
| db.add_log( |
| task["id"], |
| user_id, |
| "admin", |
| "INFO", |
| f"管理员 {admin['username']} 启动了用户 {user['student_id']} 的任务。", |
| ) |
| flash("任务已加入队列。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/users/<int:user_id>/tasks/stop") |
| @require_admin |
| def stop_user_task_from_admin(admin: dict[str, Any], user_id: int): |
| db.request_stop_task(user_id) |
| db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 请求停止用户任务。") |
| flash("停止请求已提交。", "success") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.post("/admin/panel/admins") |
| @require_superadmin |
| def create_admin_account(admin: dict[str, Any]): |
| username = (request.form.get("username") or "").strip() |
| password = request.form.get("password") or "" |
| if not username or not password: |
| flash("管理员账号和密码不能为空。", "error") |
| return redirect(url_for("admin_panel")) |
| try: |
| db.create_admin(username, password) |
| db.add_log(None, None, "admin", "INFO", f"超级管理员 {admin['username']} 创建了管理员 {username}。") |
| flash("管理员已创建。", "success") |
| except Exception as exc: |
| flash(f"管理员创建失败:{exc}", "error") |
| return redirect(url_for("admin_panel")) |
|
|
| @app.get("/api/admin/status") |
| @require_admin |
| def admin_status(admin: dict[str, Any]): |
| return jsonify(db.get_admin_summary()) |
|
|
| @app.get("/events/logs/admin") |
| @require_admin |
| def admin_log_stream(admin: dict[str, Any]): |
| response = Response(stream_with_context(event_stream(log_scope_user_id=None))) |
| response.headers["Content-Type"] = "text/event-stream" |
| response.headers["Cache-Control"] = "no-cache" |
| response.headers["X-Accel-Buffering"] = "no" |
| return response |
|
|
| return app |
|
|