cam / app /routes /auth.py
cacode's picture
Upload 67 files
cf289c1 verified
from __future__ import annotations
from fastapi import APIRouter, Depends, Form, Request
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session, joinedload
from app.auth import get_current_admin, get_current_user, sign_in_admin, sign_in_user, sign_out
from app.database import get_db
from app.models import Admin, Group, User
from app.security import hash_password, verify_password
from app.services.presence import is_online, should_write_presence, unix_seconds
from app.services.review_queue import rebalance_pending_reviews
from app.web import add_flash, local_now, redirect, render
router = APIRouter()
def load_account_context(request: Request, db: Session):
admin = get_current_admin(request, db)
user = get_current_user(request, db)
if admin:
groups = db.query(Group).options(joinedload(Group.members)).order_by(Group.name.asc()).all()
group_cards = [
{
"name": group.name,
"members": [member.full_name for member in group.members],
}
for group in groups
]
return {
"admin": admin,
"user": None,
"identity_name": admin.display_name,
"identity_label": "管理员账号",
"group_cards": group_cards,
}
if user:
user = (
db.query(User)
.options(joinedload(User.group).joinedload(Group.members))
.filter(User.id == user.id)
.first()
)
group_cards = []
if user and user.group:
group_cards.append(
{
"name": user.group.name,
"members": [member.full_name for member in user.group.members],
}
)
return {
"admin": None,
"user": user,
"identity_name": user.full_name if user else "",
"identity_label": "用户账号",
"group_cards": group_cards,
}
return None
@router.get("/login")
def login_page(request: Request, db: Session = Depends(get_db)):
if get_current_user(request, db):
return redirect("/dashboard")
return render(request, "login.html", {"page_title": "用户登录"})
@router.post("/login")
def login_submit(
request: Request,
student_id: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
user = db.query(User).filter(User.student_id == student_id.strip()).first()
if not user or not user.is_active or not verify_password(password, user.password_hash):
add_flash(request, "error", "学号或密码错误。")
return redirect("/login")
sign_in_user(request, user)
add_flash(request, "success", f"欢迎回来,{user.full_name}。")
return redirect("/dashboard")
@router.get("/logout")
def user_logout(request: Request):
sign_out(request)
return redirect("/login")
@router.get("/admin")
def admin_login_page(request: Request, db: Session = Depends(get_db)):
if get_current_admin(request, db):
return redirect("/admin/dashboard")
return render(request, "admin_login.html", {"page_title": "管理员登录"})
@router.post("/admin")
def admin_login_submit(
request: Request,
username: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db),
):
admin = db.query(Admin).filter(Admin.username == username.strip()).first()
if not admin or not admin.is_active or not verify_password(password, admin.password_hash):
add_flash(request, "error", "管理员账号或密码错误。")
return redirect("/admin")
admin.last_seen_at = local_now()
db.add(admin)
db.commit()
sign_in_admin(request, admin)
rebalance_pending_reviews(db)
add_flash(request, "success", f"管理员 {admin.display_name} 已登录。")
return redirect("/admin/dashboard")
@router.get("/admin/logout")
def admin_logout(request: Request):
sign_out(request)
return redirect("/admin")
@router.get("/account")
def account_page(request: Request, db: Session = Depends(get_db)):
account_context = load_account_context(request, db)
if not account_context:
return redirect("/login")
return render(
request,
"account.html",
{
"page_title": "账号中心",
**account_context,
},
)
@router.post("/account/password")
def change_password(
request: Request,
current_password: str = Form(...),
new_password: str = Form(...),
confirm_password: str = Form(...),
db: Session = Depends(get_db),
):
admin = get_current_admin(request, db)
user = get_current_user(request, db)
if not admin and not user:
return redirect("/login")
identity = admin or user
password_hash = identity.password_hash
if not verify_password(current_password, password_hash):
add_flash(request, "error", "当前密码不正确。")
return redirect("/account")
if len(new_password.strip()) < 6:
add_flash(request, "error", "新密码至少需要 6 位。")
return redirect("/account")
if new_password != confirm_password:
add_flash(request, "error", "两次输入的新密码不一致。")
return redirect("/account")
identity.password_hash = hash_password(new_password)
db.add(identity)
db.commit()
add_flash(request, "success", "密码已更新,下次登录请使用新密码。")
return redirect("/account")
@router.api_route("/api/presence/ping", methods=["GET", "POST"])
def presence_ping(request: Request, db: Session = Depends(get_db)):
admin = get_current_admin(request, db)
if not admin:
return JSONResponse(
{"ok": False, "skipped": True},
headers={"Cache-Control": "no-store"},
)
now = local_now()
wrote = False
admin_was_online = bool(is_online(admin.last_seen_at, now))
if should_write_presence(request.session, now):
admin.last_seen_at = now
db.add(admin)
db.commit()
wrote = True
# Only rebalance on an offline -> online transition. Fresh uploads are handled
# at submit time, so this keeps the 5s heartbeat lightweight.
if not admin_was_online:
rebalance_pending_reviews(db)
return JSONResponse(
{
"ok": True,
"wrote": wrote,
"server_ts": unix_seconds(now),
},
headers={"Cache-Control": "no-store"},
)