SACC / course_catcher /web.py
cacode's picture
Deploy updated SCU course catcher
e28c9e4 verified
from __future__ import annotations
import json
import time
from functools import wraps
from typing import Any, Callable
from flask import (
Flask,
Response,
flash,
jsonify,
redirect,
render_template,
request,
session,
stream_with_context,
url_for,
)
from course_catcher.config import AppConfig, load_config
from course_catcher.db import Database
from course_catcher.security import CredentialCipher, mask_secret
from course_catcher.task_manager import TaskManager
def create_app() -> Flask:
config = load_config()
cipher = CredentialCipher(config.secret_key)
db = Database(config.database_path, cipher, config.default_parallelism)
db.ensure_superadmin(config.admin_username, config.admin_password)
task_manager = TaskManager(db, config)
task_manager.start()
app = Flask(
__name__,
template_folder="../templates",
static_folder="../static",
)
app.config["SECRET_KEY"] = config.secret_key
app.extensions["app_config"] = config
app.extensions["db"] = db
app.extensions["task_manager"] = task_manager
def current_user() -> dict[str, Any] | None:
user_id = session.get("user_id")
if not user_id:
return None
return db.get_user_by_id(int(user_id))
def current_admin() -> dict[str, Any] | None:
admin_id = session.get("admin_id")
if not admin_id:
return None
return db.get_admin_by_id(int(admin_id))
def require_user(view: Callable):
@wraps(view)
def wrapped(*args, **kwargs):
user = current_user()
if not user:
return redirect(url_for("login"))
return view(user, *args, **kwargs)
return wrapped
def require_admin(view: Callable):
@wraps(view)
def wrapped(*args, **kwargs):
admin = current_admin()
if not admin:
return redirect(url_for("admin_login"))
return view(admin, *args, **kwargs)
return wrapped
def require_superadmin(view: Callable):
@wraps(view)
def wrapped(*args, **kwargs):
admin = current_admin()
if not admin or admin["role"] != "superadmin":
flash("只有超级管理员可以执行该操作。", "error")
return redirect(url_for("admin_panel"))
return view(admin, *args, **kwargs)
return wrapped
def normalize_student_id(student_id: str) -> str:
return (student_id or "").strip()
def normalize_course_pair(course_id: str, course_index: str) -> tuple[str, str]:
return (course_id or "").strip(), (course_index or "").strip().zfill(2)
def validate_student_login_form(student_id: str, password: str) -> str | None:
if not student_id.isdigit():
return "学号必须为纯数字。"
if len(student_id) < 8:
return "学号长度看起来不正确。"
if not password:
return "密码不能为空。"
return None
def validate_course_form(course_id: str, course_index: str) -> str | None:
if not course_id.isdigit():
return "课程号必须为纯数字。"
if not course_index.isdigit():
return "课序号必须为纯数字。"
if len(course_index) != 2:
return "课序号必须是两位数字。"
return None
def build_user_view_model(user: dict[str, Any]) -> dict[str, Any]:
dashboard_state = db.get_user_dashboard_state(user["id"])
courses = db.list_courses(user["id"])
logs = db.get_recent_logs(user_id=user["id"])
return {
"user": user,
"dashboard_state": dashboard_state,
"courses": courses,
"logs": logs,
"masked_password": mask_secret("******"),
}
def build_admin_view_model(admin: dict[str, Any]) -> dict[str, Any]:
summary = db.get_admin_summary()
users = db.list_users_with_summary()
users_with_courses: list[dict[str, Any]] = []
for user in users:
entry = dict(user)
entry["courses"] = db.list_courses(user["id"])
entry["dashboard_state"] = db.get_user_dashboard_state(user["id"])
users_with_courses.append(entry)
return {
"admin": admin,
"summary": summary,
"users": users_with_courses,
"admins": db.list_admins(),
"logs": db.get_recent_logs(),
}
def event_stream(log_scope_user_id: int | None = None):
last_id = int(request.args.get("last_id", "0") or 0)
while True:
items = db.get_logs_after(last_id=last_id, user_id=log_scope_user_id)
if items:
for item in items:
last_id = item["id"]
yield f"id: {item['id']}\n"
yield "event: log\n"
yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n"
else:
yield ": keepalive\n\n"
time.sleep(1)
@app.context_processor
def inject_session_context():
return {
"session_user": current_user(),
"session_admin": current_admin(),
}
@app.get("/")
def index():
if current_user():
return redirect(url_for("dashboard"))
if current_admin():
return redirect(url_for("admin_panel"))
return redirect(url_for("login"))
@app.route("/login", methods=["GET", "POST"])
def login():
if current_user():
return redirect(url_for("dashboard"))
if request.method == "POST":
student_id = normalize_student_id(request.form.get("student_id", ""))
password = request.form.get("password", "")
error = validate_student_login_form(student_id, password)
if error:
flash(error, "error")
return render_template("login.html")
user, created = db.verify_or_create_user_login(student_id, password)
if not user:
flash("学号或密码错误。", "error")
return render_template("login.html")
session.clear()
session["user_id"] = user["id"]
flash("首次登录已自动创建账户。" if created else "登录成功。", "success")
return redirect(url_for("dashboard"))
return render_template("login.html")
@app.post("/logout")
def logout():
session.clear()
flash("已退出登录。", "success")
return redirect(url_for("login"))
@app.route("/admin", methods=["GET", "POST"])
def admin_login():
if current_admin():
return redirect(url_for("admin_panel"))
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
if not username or not password:
flash("管理员账号和密码不能为空。", "error")
return render_template("admin_login.html")
admin = db.verify_admin_login(username, password)
if not admin:
flash("管理员账号或密码错误。", "error")
return render_template("admin_login.html")
session.clear()
session["admin_id"] = admin["id"]
flash("管理员登录成功。", "success")
return redirect(url_for("admin_panel"))
return render_template("admin_login.html")
@app.post("/admin/logout")
def admin_logout():
session.clear()
flash("管理员已退出登录。", "success")
return redirect(url_for("admin_login"))
@app.get("/dashboard")
@require_user
def dashboard(user: dict[str, Any]):
return render_template("dashboard.html", **build_user_view_model(user))
@app.post("/dashboard/password")
@require_user
def update_user_password(user: dict[str, Any]):
password = request.form.get("password", "")
if not password:
flash("密码不能为空。", "error")
return redirect(url_for("dashboard"))
db.update_user_password(user["id"], password)
db.add_log(None, user["id"], "user", "INFO", "用户已更新自己的登录密码。")
flash("密码已更新。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/courses")
@require_user
def add_user_course(user: dict[str, Any]):
course_id, course_index = normalize_course_pair(
request.form.get("course_id", ""),
request.form.get("course_index", ""),
)
error = validate_course_form(course_id, course_index)
if error:
flash(error, "error")
return redirect(url_for("dashboard"))
db.add_course(user["id"], course_id, course_index)
db.add_log(None, user["id"], "user", "INFO", f"新增课程 {course_id}_{course_index}。")
flash("课程已添加。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/courses/<int:course_record_id>/delete")
@require_user
def delete_user_course(user: dict[str, Any], course_record_id: int):
db.delete_course(course_record_id, user["id"])
db.add_log(None, user["id"], "user", "INFO", f"删除课程记录 #{course_record_id}。")
flash("课程已删除。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/tasks/start")
@require_user
def start_user_task(user: dict[str, Any]):
task = db.create_task(user["id"], "user", user["student_id"])
db.add_log(task["id"], user["id"], "user", "INFO", "用户发起了新的选课任务。")
flash("任务已加入队列。", "success")
return redirect(url_for("dashboard"))
@app.post("/dashboard/tasks/stop")
@require_user
def stop_user_task(user: dict[str, Any]):
db.request_stop_task(user["id"])
db.add_log(None, user["id"], "user", "INFO", "用户请求停止当前任务。")
flash("停止请求已提交。", "success")
return redirect(url_for("dashboard"))
@app.get("/api/dashboard/status")
@require_user
def dashboard_status(user: dict[str, Any]):
return jsonify(db.get_user_dashboard_state(user["id"]))
@app.get("/events/logs/user")
@require_user
def user_log_stream(user: dict[str, Any]):
response = Response(stream_with_context(event_stream(log_scope_user_id=user["id"])))
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
response.headers["X-Accel-Buffering"] = "no"
return response
@app.get("/admin/panel")
@require_admin
def admin_panel(admin: dict[str, Any]):
return render_template("admin.html", **build_admin_view_model(admin))
@app.post("/admin/panel/settings/parallelism")
@require_admin
def update_parallelism(admin: dict[str, Any]):
raw_value = request.form.get("parallelism", "1")
try:
parallelism = max(1, min(8, int(raw_value)))
except ValueError:
flash("并行数必须是数字。", "error")
return redirect(url_for("admin_panel"))
db.set_setting("max_parallel_tasks", str(parallelism))
db.add_log(None, None, "admin", "INFO", f"管理员 {admin['username']} 将并行数设置为 {parallelism}。")
flash("并行数已更新。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users")
@require_admin
def create_or_update_user_from_admin(admin: dict[str, Any]):
student_id = normalize_student_id(request.form.get("student_id", ""))
password = request.form.get("password", "")
error = validate_student_login_form(student_id, password)
if error:
flash(error, "error")
return redirect(url_for("admin_panel"))
user = db.create_or_update_user(student_id, password)
db.add_log(None, user["id"], "admin", "INFO", f"管理员 {admin['username']} 录入或更新了用户账号。")
initial_course_id, initial_course_index = normalize_course_pair(
request.form.get("course_id", ""),
request.form.get("course_index", ""),
)
if initial_course_id or initial_course_index:
course_error = validate_course_form(initial_course_id, initial_course_index)
if course_error:
flash(course_error, "error")
return redirect(url_for("admin_panel"))
db.add_course(user["id"], initial_course_id, initial_course_index)
db.add_log(
None,
user["id"],
"admin",
"INFO",
f"管理员 {admin['username']} 为用户新增课程 {initial_course_id}_{initial_course_index}。",
)
flash("用户信息已保存。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users/<int:user_id>/password")
@require_admin
def update_user_password_from_admin(admin: dict[str, Any], user_id: int):
password = request.form.get("password", "")
if not password:
flash("密码不能为空。", "error")
return redirect(url_for("admin_panel"))
db.update_user_password(user_id, password)
db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 更新了用户密码。")
flash("用户密码已更新。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users/<int:user_id>/courses")
@require_admin
def add_user_course_from_admin(admin: dict[str, Any], user_id: int):
course_id, course_index = normalize_course_pair(
request.form.get("course_id", ""),
request.form.get("course_index", ""),
)
error = validate_course_form(course_id, course_index)
if error:
flash(error, "error")
return redirect(url_for("admin_panel"))
db.add_course(user_id, course_id, course_index)
db.add_log(
None,
user_id,
"admin",
"INFO",
f"管理员 {admin['username']} 为用户新增课程 {course_id}_{course_index}。",
)
flash("课程已添加到目标用户。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users/<int:user_id>/courses/<int:course_record_id>/delete")
@require_admin
def delete_user_course_from_admin(admin: dict[str, Any], user_id: int, course_record_id: int):
db.delete_course(course_record_id, user_id)
db.add_log(
None,
user_id,
"admin",
"INFO",
f"管理员 {admin['username']} 删除了课程记录 #{course_record_id}。",
)
flash("课程已删除。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users/<int:user_id>/tasks/start")
@require_admin
def start_user_task_from_admin(admin: dict[str, Any], user_id: int):
user = db.get_user_by_id(user_id)
if not user:
flash("用户不存在。", "error")
return redirect(url_for("admin_panel"))
task = db.create_task(user_id, "admin", admin["username"])
db.add_log(
task["id"],
user_id,
"admin",
"INFO",
f"管理员 {admin['username']} 启动了用户 {user['student_id']} 的任务。",
)
flash("任务已加入队列。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/users/<int:user_id>/tasks/stop")
@require_admin
def stop_user_task_from_admin(admin: dict[str, Any], user_id: int):
db.request_stop_task(user_id)
db.add_log(None, user_id, "admin", "INFO", f"管理员 {admin['username']} 请求停止用户任务。")
flash("停止请求已提交。", "success")
return redirect(url_for("admin_panel"))
@app.post("/admin/panel/admins")
@require_superadmin
def create_admin_account(admin: dict[str, Any]):
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
if not username or not password:
flash("管理员账号和密码不能为空。", "error")
return redirect(url_for("admin_panel"))
try:
db.create_admin(username, password)
db.add_log(None, None, "admin", "INFO", f"超级管理员 {admin['username']} 创建了管理员 {username}。")
flash("管理员已创建。", "success")
except Exception as exc:
flash(f"管理员创建失败:{exc}", "error")
return redirect(url_for("admin_panel"))
@app.get("/api/admin/status")
@require_admin
def admin_status(admin: dict[str, Any]):
return jsonify(db.get_admin_summary())
@app.get("/events/logs/admin")
@require_admin
def admin_log_stream(admin: dict[str, Any]):
response = Response(stream_with_context(event_stream(log_scope_user_id=None)))
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
response.headers["X-Accel-Buffering"] = "no"
return response
return app