from __future__ import annotations import json import time from functools import wraps from typing import Any, Callable from flask import ( Flask, Response, flash, jsonify, redirect, render_template, request, session, stream_with_context, url_for, ) from course_catcher.config import AppConfig, load_config from course_catcher.db import Database from course_catcher.security import CredentialCipher, mask_secret from course_catcher.task_manager import TaskManager def create_app() -> Flask: config = load_config() cipher = CredentialCipher(config.secret_key) db = Database(config.database_path, cipher, config.default_parallelism) db.ensure_superadmin(config.admin_username, config.admin_password) task_manager = TaskManager(db, config) task_manager.start() app = Flask( __name__, template_folder="../templates", static_folder="../static", ) app.config["SECRET_KEY"] = config.secret_key app.extensions["app_config"] = config app.extensions["db"] = db app.extensions["task_manager"] = task_manager def current_user() -> dict[str, Any] | None: user_id = session.get("user_id") if not user_id: return None return db.get_user_by_id(int(user_id)) def current_admin() -> dict[str, Any] | None: admin_id = session.get("admin_id") if not admin_id: return None return db.get_admin_by_id(int(admin_id)) def require_user(view: Callable): @wraps(view) def wrapped(*args, **kwargs): user = current_user() if not user: return redirect(url_for("login")) return view(user, *args, **kwargs) return wrapped def require_admin(view: Callable): @wraps(view) def wrapped(*args, **kwargs): admin = current_admin() if not admin: return redirect(url_for("admin_login")) return view(admin, *args, **kwargs) return wrapped def require_superadmin(view: Callable): @wraps(view) def wrapped(*args, **kwargs): admin = current_admin() if not admin or admin["role"] != "superadmin": flash("只有超级管理员可以执行该操作。", "error") return redirect(url_for("admin_panel")) return view(admin, *args, **kwargs) return wrapped def normalize_student_id(student_id: str) -> str: return (student_id or "").strip() def normalize_course_pair(course_id: str, course_index: str) -> tuple[str, str]: return (course_id or "").strip(), (course_index or "").strip().zfill(2) def validate_student_login_form(student_id: str, password: str) -> str | None: if not student_id.isdigit(): return "学号必须为纯数字。" if len(student_id) < 8: return "学号长度看起来不正确。" if not password: return "密码不能为空。" return None def validate_course_form(course_id: str, course_index: str) -> str | None: if not course_id.isdigit(): return "课程号必须为纯数字。" if not course_index.isdigit(): return "课序号必须为纯数字。" if len(course_index) != 2: return "课序号必须是两位数字。" return None def build_user_view_model(user: dict[str, Any]) -> dict[str, Any]: dashboard_state = db.get_user_dashboard_state(user["id"]) courses = db.list_courses(user["id"]) logs = db.get_recent_logs(user_id=user["id"]) return { "user": user, "dashboard_state": dashboard_state, "courses": courses, "logs": logs, "masked_password": mask_secret("******"), } def build_admin_view_model(admin: dict[str, Any]) -> dict[str, Any]: summary = db.get_admin_summary() users = db.list_users_with_summary() users_with_courses: list[dict[str, Any]] = [] for user in users: entry = dict(user) entry["courses"] = db.list_courses(user["id"]) entry["dashboard_state"] = db.get_user_dashboard_state(user["id"]) users_with_courses.append(entry) return { "admin": admin, "summary": summary, "users": users_with_courses, "admins": db.list_admins(), "logs": db.get_recent_logs(), } def event_stream(log_scope_user_id: int | None = None): last_id = int(request.args.get("last_id", "0") or 0) while True: items = db.get_logs_after(last_id=last_id, user_id=log_scope_user_id) if items: for item in items: last_id = item["id"] yield f"id: {item['id']}\n" yield "event: log\n" yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n" else: yield ": keepalive\n\n" time.sleep(1) @app.context_processor def inject_session_context(): return { "session_user": current_user(), "session_admin": current_admin(), } @app.get("/") def index(): if current_user(): return redirect(url_for("dashboard")) if current_admin(): return redirect(url_for("admin_panel")) return redirect(url_for("login")) @app.route("/login", methods=["GET", "POST"]) def login(): if current_user(): return redirect(url_for("dashboard")) if request.method == "POST": student_id = normalize_student_id(request.form.get("student_id", "")) password = request.form.get("password", "") error = validate_student_login_form(student_id, password) if error: flash(error, "error") return render_template("login.html") user, created = db.verify_or_create_user_login(student_id, password) if not user: flash("学号或密码错误。", "error") return render_template("login.html") session.clear() session["user_id"] = user["id"] flash("首次登录已自动创建账户。" if created else "登录成功。", "success") return redirect(url_for("dashboard")) return render_template("login.html") @app.post("/logout") def logout(): session.clear() flash("已退出登录。", "success") return redirect(url_for("login")) @app.route("/admin", methods=["GET", "POST"]) def admin_login(): if current_admin(): return redirect(url_for("admin_panel")) if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" if not username or not password: flash("管理员账号和密码不能为空。", "error") return render_template("admin_login.html") admin = db.verify_admin_login(username, password) if not admin: flash("管理员账号或密码错误。", "error") return render_template("admin_login.html") session.clear() session["admin_id"] = admin["id"] flash("管理员登录成功。", "success") return redirect(url_for("admin_panel")) return render_template("admin_login.html") @app.post("/admin/logout") def admin_logout(): session.clear() flash("管理员已退出登录。", "success") return redirect(url_for("admin_login")) @app.get("/dashboard") @require_user def dashboard(user: dict[str, Any]): return render_template("dashboard.html", **build_user_view_model(user)) @app.post("/dashboard/password") @require_user def update_user_password(user: dict[str, Any]): password = request.form.get("password", "") if not password: flash("密码不能为空。", "error") return redirect(url_for("dashboard")) db.update_user_password(user["id"], password) db.add_log(None, user["id"], "user", "INFO", "用户已更新自己的登录密码。") flash("密码已更新。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses") @require_user def add_user_course(user: dict[str, Any]): course_id, course_index = normalize_course_pair( request.form.get("course_id", ""), request.form.get("course_index", ""), ) error = validate_course_form(course_id, course_index) if error: flash(error, "error") return redirect(url_for("dashboard")) db.add_course(user["id"], course_id, course_index) db.add_log(None, user["id"], "user", "INFO", f"新增课程 {course_id}_{course_index}。") flash("课程已添加。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/courses//delete") @require_user def delete_user_course(user: dict[str, Any], course_record_id: int): db.delete_course(course_record_id, user["id"]) db.add_log(None, user["id"], "user", "INFO", f"删除课程记录 #{course_record_id}。") flash("课程已删除。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/tasks/start") @require_user def start_user_task(user: dict[str, Any]): task = db.create_task(user["id"], "user", user["student_id"]) db.add_log(task["id"], user["id"], "user", "INFO", "用户发起了新的选课任务。") flash("任务已加入队列。", "success") return redirect(url_for("dashboard")) @app.post("/dashboard/tasks/stop") @require_user def stop_user_task(user: dict[str, Any]): db.request_stop_task(user["id"]) db.add_log(None, user["id"], "user", "INFO", "用户请求停止当前任务。") flash("停止请求已提交。", "success") return redirect(url_for("dashboard")) @app.get("/api/dashboard/status") @require_user def dashboard_status(user: dict[str, Any]): return jsonify(db.get_user_dashboard_state(user["id"])) @app.get("/events/logs/user") @require_user def user_log_stream(user: dict[str, Any]): response = Response(stream_with_context(event_stream(log_scope_user_id=user["id"]))) response.headers["Content-Type"] = "text/event-stream" response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response @app.get("/admin/panel") @require_admin def admin_panel(admin: dict[str, Any]): return render_template("admin.html", **build_admin_view_model(admin)) @app.post("/admin/panel/settings/parallelism") @require_admin def update_parallelism(admin: dict[str, Any]): raw_value = request.form.get("parallelism", "1") try: parallelism = max(1, min(8, int(raw_value))) except ValueError: flash("并行数必须是数字。", "error") return redirect(url_for("admin_panel")) db.set_setting("max_parallel_tasks", str(parallelism)) db.add_log(None, None, "admin", "INFO", f"管理员 {admin['username']} 将并行数设置为 {parallelism}。") flash("并行数已更新。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users") @require_admin def create_or_update_user_from_admin(admin: dict[str, Any]): student_id = normalize_student_id(request.form.get("student_id", "")) password = request.form.get("password", "") error = validate_student_login_form(student_id, password) if error: flash(error, "error") return redirect(url_for("admin_panel")) user = db.create_or_update_user(student_id, password) db.add_log(None, user["id"], "admin", "INFO", f"管理员 {admin['username']} 录入或更新了用户账号。") initial_course_id, initial_course_index = normalize_course_pair( request.form.get("course_id", ""), request.form.get("course_index", ""), ) if initial_course_id or initial_course_index: course_error = validate_course_form(initial_course_id, initial_course_index) if course_error: flash(course_error, "error") return redirect(url_for("admin_panel")) db.add_course(user["id"], initial_course_id, initial_course_index) db.add_log( None, user["id"], "admin", "INFO", f"管理员 {admin['username']} 为用户新增课程 {initial_course_id}_{initial_course_index}。", ) flash("用户信息已保存。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users//password") @require_admin def update_user_password_from_admin(admin: dict[str, Any], user_id: int): password = request.form.get("password", "") if not password: flash("密码不能为空。", "error") return redirect(url_for("admin_panel")) db.update_user_password(user_id, password) db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 更新了用户密码。") flash("用户密码已更新。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users//courses") @require_admin def add_user_course_from_admin(admin: dict[str, Any], user_id: int): course_id, course_index = normalize_course_pair( request.form.get("course_id", ""), request.form.get("course_index", ""), ) error = validate_course_form(course_id, course_index) if error: flash(error, "error") return redirect(url_for("admin_panel")) db.add_course(user_id, course_id, course_index) db.add_log( None, user_id, "admin", "INFO", f"管理员 {admin['username']} 为用户新增课程 {course_id}_{course_index}。", ) flash("课程已添加到目标用户。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users//courses//delete") @require_admin def delete_user_course_from_admin(admin: dict[str, Any], user_id: int, course_record_id: int): db.delete_course(course_record_id, user_id) db.add_log( None, user_id, "admin", "INFO", f"管理员 {admin['username']} 删除了课程记录 #{course_record_id}。", ) flash("课程已删除。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users//tasks/start") @require_admin def start_user_task_from_admin(admin: dict[str, Any], user_id: int): user = db.get_user_by_id(user_id) if not user: flash("用户不存在。", "error") return redirect(url_for("admin_panel")) task = db.create_task(user_id, "admin", admin["username"]) db.add_log( task["id"], user_id, "admin", "INFO", f"管理员 {admin['username']} 启动了用户 {user['student_id']} 的任务。", ) flash("任务已加入队列。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/users//tasks/stop") @require_admin def stop_user_task_from_admin(admin: dict[str, Any], user_id: int): db.request_stop_task(user_id) db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 请求停止用户任务。") flash("停止请求已提交。", "success") return redirect(url_for("admin_panel")) @app.post("/admin/panel/admins") @require_superadmin def create_admin_account(admin: dict[str, Any]): username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" if not username or not password: flash("管理员账号和密码不能为空。", "error") return redirect(url_for("admin_panel")) try: db.create_admin(username, password) db.add_log(None, None, "admin", "INFO", f"超级管理员 {admin['username']} 创建了管理员 {username}。") flash("管理员已创建。", "success") except Exception as exc: flash(f"管理员创建失败:{exc}", "error") return redirect(url_for("admin_panel")) @app.get("/api/admin/status") @require_admin def admin_status(admin: dict[str, Any]): return jsonify(db.get_admin_summary()) @app.get("/events/logs/admin") @require_admin def admin_log_stream(admin: dict[str, Any]): response = Response(stream_with_context(event_stream(log_scope_user_id=None))) response.headers["Content-Type"] = "text/event-stream" response.headers["Cache-Control"] = "no-cache" response.headers["X-Accel-Buffering"] = "no" return response return app