ComfyUI-Ranking-API / router_users_auth.py
ZHIWEI666's picture
同步登录认证
e3fab5f verified
# router_users_auth.py
# ==========================================
# 🔐 用户认证路由模块
# ==========================================
# 作用:处理用户登录、注册、密码重置、验证码发送等认证相关接口
# 关联文件:
# - 安全认证.py (密码哈希 + JWT 安全模块) 🔒 P0安全增强
# - verify_code_engine.py (验证码缓存与发送)
# - 数据库连接.py (JSON数据库读写)
# - models.py (Pydantic数据模型定义)
# ==========================================
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request
import time
import re
import random
import json
import 数据库连接 as db
from models import UserRegister, UserLogin, SendCodeRequest
from verify_code_engine import VERIFY_CODES, send_email_code, send_sms_code, cleanup_expired_codes, check_send_cooldown
# 🔒 P0安全增强:导入密码哈希和 JWT 工具
from 安全认证 import hash_password, verify_password, create_token, require_password_match
# 🚀 P2优化:速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# 创建子路由实例
router = APIRouter()
# ==========================================
# 📤 发送验证码接口(异步后台任务版)
# ==========================================
# 作用:接收验证码发送请求,后台异步发送邮件/短信
# 关联:verify_code_engine.py 的 send_email_code / send_sms_code
# 前端调用:注册表单组件.js、重置密码表单组件.js
@router.post("/api/users/send-code")
@limiter.limit("5/minute") # 🚀 P2优化:每分钟最多5次
async def send_verify_code(request: Request, req: SendCodeRequest, bg_tasks: BackgroundTasks):
"""
发送验证码接口(异步版本)
请求参数:
- contact: 邮箱或手机号
- contact_type: "email" 或 "phone"
- action_type: "register" 注册 / "reset" 重置密码
- account: (仅重置密码时必填) 用户账号
"""
# 如果是重置密码,需要先验证账号与联系方式匹配
if req.action_type == "reset":
if not req.account:
raise HTTPException(status_code=400, detail="找回密码需先填写当前账号")
users_db = db.load_data("users.json", default_data={})
user = users_db.get(req.account)
if not user:
raise HTTPException(status_code=404, detail="该账号不存在")
# 校验联系方式是否与账号绑定的一致
if req.contact_type == "email" and user.get("email") != req.contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致")
if req.contact_type == "phone" and user.get("phone") != req.contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致")
# 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条)
check_send_cooldown(req.contact)
# 生成6位随机验证码
code = str(random.randint(100000, 999999))
# 🚀 P1性能优化:每次写入前触发清理过期验证码
cleanup_expired_codes()
# 构建缓存键(联系方式_动作类型)
cache_key = f"{req.contact}_{req.action_type}"
# 将验证码存入内存缓存,有效期5分钟
VERIFY_CODES[cache_key] = {
"code": code,
"expires_at": int(time.time()) + 300 # 当前时间 + 300秒(5分钟)
}
# 根据联系方式类型,添加后台发送任务
if req.contact_type == "email":
bg_tasks.add_task(send_email_code, req.contact, code, req.action_type)
elif req.contact_type == "phone":
bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type)
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
return {"status": "success", "message": "验证码发送请求已提交"}
# ==========================================
# 📤 发送验证码接口(同步版本,备用)
# ==========================================
# 作用:同步方式发送验证码,部分场景需要立即知道发送结果
# 关联:verify_code_engine.py 的 send_email_code
@router.post("/api/users/send_code")
async def send_code_api(req: SendCodeRequest):
"""发送验证码接口(同步版本,直接等待发送结果)"""
# 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条)
check_send_cooldown(req.contact)
# 生成6位随机验证码
code = str(random.randint(100000, 999999))
key = f"{req.contact}_{req.action_type}"
# 存入缓存,有效期5分钟
VERIFY_CODES[key] = {
"code": code,
"expires_at": time.time() + 300 # 300秒(5分钟)
}
# 同步发送(会阻塞等待结果)
if req.contact_type == "email":
try:
send_email_code(req.contact, code, req.action_type)
return {"status": "success", "message": "验证码已成功发送至邮箱"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}")
elif req.contact_type == "phone":
return {"status": "success", "message": "验证码已成功发送至手机"}
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
# ==========================================
# 📝 用户注册接口
# ==========================================
# 作用:新用户注册,需要验证邮箱/手机验证码
# 关联:
# - verify_code_engine.py 的 VERIFY_CODES (校验验证码)
# - 数据库连接.py (保存新用户到 users.json)
# - 前端 注册表单组件.js
@router.post("/api/users/register")
@limiter.limit("3/minute") # 🔒 P0安全优化:注册每分钟最多3次
async def register_user(request: Request, user: UserRegister):
"""
用户注册接口
请求参数:(UserRegister 模型)
- account: 账号(6-20位字母数字下划线)
- password: 密码(至少6位)
- name: 昵称
- email: 邮箱(与 phone 二选一)
- phone: 手机号(与 email 二选一)
- code: 验证码
- intro: 个人介绍(可选,最多100字)
"""
users_db = db.load_data("users.json", default_data={})
# ========== 第一步:查重检查 ==========
# 检查账号是否已存在
if user.account in users_db:
raise HTTPException(status_code=400, detail="该账号已被注册,请更换一个")
# 检查邮箱和手机号是否已被其他用户绑定
for existing_user in users_db.values():
if user.email and existing_user.get("email") == user.email:
raise HTTPException(status_code=400, detail="此邮箱已注册,请直接登录或找回密码")
if user.phone and existing_user.get("phone") == user.phone:
raise HTTPException(status_code=400, detail="该手机号已被绑定")
# ========== 第二步:验证码校验(原子性获取+删除) ==========
# 根据注册方式构建缓存键
cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register"
# 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用
cached = VERIFY_CODES.pop(cache_key, None)
# 兼容新老缓存格式(expires_at 或 expires)
expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0
# 校验验证码是否正确且未过期
if not cached or cached["code"] != user.code or time.time() > expire_time:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
# ========== 第三步:格式校验 ==========
if len(user.account) <= 5:
raise HTTPException(status_code=400, detail="账号必须大于5个字符")
if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account):
raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线")
if len(user.password) < 6:
raise HTTPException(status_code=400, detail="密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password):
raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符")
if user.intro and len(user.intro) > 100:
raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符")
# ========== 第四步:保存新用户 ==========
# 构建用户数据对象
new_user = user.dict()
new_user.pop("code", None) # 移除验证码字段,不存入数据库
# 🔒 安全防护:拒绝 base64 头像数据
if (new_user.get("avatarDataUrl") or "").startswith("data:"):
new_user["avatarDataUrl"] = ""
# 🔒 P0安全增强:密码哈希化存储(不再存储明文密码)
new_user["password"] = hash_password(new_user["password"])
new_user.update({
"created_at": int(time.time()), # 注册时间戳
"followers": [], # 粉丝列表
"following": [], # 关注列表
"privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False} # 隐私设置
})
# 保存到数据库
users_db[user.account] = new_user
db.save_data("users.json", users_db)
# 返回用户信息(排除密码)
return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}}
# ==========================================
# 🔑 用户登录接口
# ==========================================
# 作用:验证账号密码,返回登录凭证
# 关联:
# - 数据库连接.py (读取 users.json 校验密码)
# - 前端 登录表单组件.js
@router.post("/api/users/login")
@limiter.limit("10/minute") # 🚀 P2优化:每分钟最多10次登录尝试
async def login_user(request: Request, user: UserLogin):
"""
用户登录接口
请求参数:(UserLogin 模型)
- account: 账号
- password: 密码
"""
users_db = db.load_data("users.json", default_data={})
# 检查账号是否存在
if user.account not in users_db:
raise HTTPException(status_code=404, detail="账号不存在")
user_data = users_db[user.account]
stored_password = user_data.get("password", "")
# 🔒 P0安全增强:密码哈希验证(使用统一验证函数)
if not require_password_match(stored_password, user.password):
raise HTTPException(status_code=401, detail="密码错误")
# 🔒 P0安全增强:登录成功后,检查是否需要迁移旧密码为bcrypt
if not user_data["password"].startswith('$2b$') and not user_data["password"].startswith('$2a$'):
# 旧版SHA256密码,自动迁移为bcrypt
user_data["password"] = hash_password(user.password)
db.save_data("users.json", users_db)
# 🔒 P0安全增强:生成 JWT Token(替代 mock_token)
# 获取password_version用于Token生成(如不存在则默认为0)
password_version = user_data.get("password_version", 0)
# 根据用户选择的"保持登录"选项设置Token有效期
token = create_token(user.account, extra_data={"pwd_ver": password_version}, remember=user.remember)
return {
"status": "success",
"token": token, # 🔒 JWT Token
"account": user.account,
"name": user_data["name"],
"avatar": user_data.get("avatarDataUrl", "")
}
# ==========================================
# 🔄 重置密码接口(万能兼容版)
# ==========================================
# 作用:通过邮箱/手机验证码重置用户密码
# 关联:
# - verify_code_engine.py 的 VERIFY_CODES (校验验证码)
# - 数据库连接.py (更新 users.json 中的密码)
# - 前端 重置密码表单组件.js
# 特点:万能解析器,兼容各种前端数据格式
@router.post("/api/users/reset_password")
@limiter.limit("3/minute") # 🔒 P0安全优化:重置密码每分钟最多3次
async def reset_password(request: Request):
"""
重置密码接口(万能兼容版)
支持的请求格式:
- 标准 JSON
- 双重字符串化 JSON
- FormData
"""
# ========== 第一步:万能数据解析器 ==========
# 作用:兼容各种前端可能发送的数据格式
try:
data = await request.json()
# 处理前端可能造成的"双重字符串化"问题
if isinstance(data, str):
data = json.loads(data)
except Exception:
# 降级尝试 FormData 格式
try:
form = await request.form()
data = dict(form)
except Exception:
raise HTTPException(status_code=400, detail="请求数据解析失败,请检查网络")
if not isinstance(data, dict):
raise HTTPException(status_code=400, detail=f"前端数据格式异常,收到的是: {type(data).__name__}")
# ========== 第二步:万能字段提取器 ==========
# 作用:兼容前端可能使用的各种字段命名
account = data.get("account")
new_password = data.get("new_password") or data.get("password")
verify_contact = data.get("verifyContact") or data.get("verify_contact") or data.get("email") or data.get("phone")
verify_type = data.get("verifyType") or data.get("verify_type") or data.get("contact_type")
code = data.get("code")
# 参数完整性校验
if not all([account, new_password, verify_contact, verify_type, code]):
raise HTTPException(status_code=400, detail="缺失必要参数 (账号/密码/验证码/联系方式),请检查表单")
# ========== 第三步:核心业务逻辑 ==========
users_db = db.load_data("users.json", default_data={})
# 检查用户是否存在
if account not in users_db:
raise HTTPException(status_code=404, detail="该用户不存在")
user = users_db[account]
# 校验联系方式是否与账号绑定的一致
if verify_type == "email" and user.get("email") != verify_contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配")
if verify_type == "phone" and user.get("phone") != verify_contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配")
# 校验验证码(原子性获取+删除)
cache_key = f"{verify_contact}_reset"
# 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用
cached = VERIFY_CODES.pop(cache_key, None)
expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0
if not cached or cached["code"] != code or time.time() > expire_time:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
# 校验新密码格式
if len(new_password) < 6:
raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', new_password):
raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符")
# ========== 第四步:更新密码并保存 ==========
# 🔒 P0安全增强:新密码哈希化存储
user["password"] = hash_password(new_password)
# 🔒 P0安全增强:更新password_version使旧Token失效
import time as time_module
user["password_version"] = int(time_module.time())
db.save_data("users.json", users_db)
# 🔒 P0安全增强:生成新Token返回给前端替换旧Token
new_token = create_token(account, extra_data={"pwd_ver": user["password_version"]})
return {"status": "success", "message": "密码修改成功", "token": new_token}