# router_users_auth.py # ========================================== # 🔐 用户认证路由模块 # ========================================== # 作用:处理用户登录、注册、密码重置、验证码发送等认证相关接口 # 关联文件: # - 安全认证.py (密码哈希 + JWT 安全模块) 🔒 P0安全增强 # - verify_code_engine.py (验证码缓存与发送) # - 数据库连接.py (JSON数据库读写) # - models.py (Pydantic数据模型定义) # ========================================== from fastapi import APIRouter, HTTPException, BackgroundTasks, Request import time import re import random import json import 数据库连接 as db from models import UserRegister, UserLogin, SendCodeRequest from verify_code_engine import VERIFY_CODES, send_email_code, send_sms_code, cleanup_expired_codes, check_send_cooldown # 🔒 P0安全增强:导入密码哈希和 JWT 工具 from 安全认证 import hash_password, verify_password, create_token, require_password_match # 🚀 P2优化:速率限制 from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter(key_func=get_remote_address) # 创建子路由实例 router = APIRouter() # ========================================== # 📤 发送验证码接口(异步后台任务版) # ========================================== # 作用:接收验证码发送请求,后台异步发送邮件/短信 # 关联:verify_code_engine.py 的 send_email_code / send_sms_code # 前端调用:注册表单组件.js、重置密码表单组件.js @router.post("/api/users/send-code") @limiter.limit("5/minute") # 🚀 P2优化:每分钟最多5次 async def send_verify_code(request: Request, req: SendCodeRequest, bg_tasks: BackgroundTasks): """ 发送验证码接口(异步版本) 请求参数: - contact: 邮箱或手机号 - contact_type: "email" 或 "phone" - action_type: "register" 注册 / "reset" 重置密码 - account: (仅重置密码时必填) 用户账号 """ # 如果是重置密码,需要先验证账号与联系方式匹配 if req.action_type == "reset": if not req.account: raise HTTPException(status_code=400, detail="找回密码需先填写当前账号") users_db = db.load_data("users.json", default_data={}) user = users_db.get(req.account) if not user: raise HTTPException(status_code=404, detail="该账号不存在") # 校验联系方式是否与账号绑定的一致 if req.contact_type == "email" and user.get("email") != req.contact: raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致") if req.contact_type == "phone" and user.get("phone") != req.contact: raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致") # 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条) check_send_cooldown(req.contact) # 生成6位随机验证码 code = str(random.randint(100000, 999999)) # 🚀 P1性能优化:每次写入前触发清理过期验证码 cleanup_expired_codes() # 构建缓存键(联系方式_动作类型) cache_key = f"{req.contact}_{req.action_type}" # 将验证码存入内存缓存,有效期5分钟 VERIFY_CODES[cache_key] = { "code": code, "expires_at": int(time.time()) + 300 # 当前时间 + 300秒(5分钟) } # 根据联系方式类型,添加后台发送任务 if req.contact_type == "email": bg_tasks.add_task(send_email_code, req.contact, code, req.action_type) elif req.contact_type == "phone": bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type) else: raise HTTPException(status_code=400, detail="不支持的验证方式") return {"status": "success", "message": "验证码发送请求已提交"} # ========================================== # 📤 发送验证码接口(同步版本,备用) # ========================================== # 作用:同步方式发送验证码,部分场景需要立即知道发送结果 # 关联:verify_code_engine.py 的 send_email_code @router.post("/api/users/send_code") async def send_code_api(req: SendCodeRequest): """发送验证码接口(同步版本,直接等待发送结果)""" # 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条) check_send_cooldown(req.contact) # 生成6位随机验证码 code = str(random.randint(100000, 999999)) key = f"{req.contact}_{req.action_type}" # 存入缓存,有效期5分钟 VERIFY_CODES[key] = { "code": code, "expires_at": time.time() + 300 # 300秒(5分钟) } # 同步发送(会阻塞等待结果) if req.contact_type == "email": try: send_email_code(req.contact, code, req.action_type) return {"status": "success", "message": "验证码已成功发送至邮箱"} except Exception as e: raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}") elif req.contact_type == "phone": return {"status": "success", "message": "验证码已成功发送至手机"} else: raise HTTPException(status_code=400, detail="不支持的验证方式") # ========================================== # 📝 用户注册接口 # ========================================== # 作用:新用户注册,需要验证邮箱/手机验证码 # 关联: # - verify_code_engine.py 的 VERIFY_CODES (校验验证码) # - 数据库连接.py (保存新用户到 users.json) # - 前端 注册表单组件.js @router.post("/api/users/register") @limiter.limit("3/minute") # 🔒 P0安全优化:注册每分钟最多3次 async def register_user(request: Request, user: UserRegister): """ 用户注册接口 请求参数:(UserRegister 模型) - account: 账号(6-20位字母数字下划线) - password: 密码(至少6位) - name: 昵称 - email: 邮箱(与 phone 二选一) - phone: 手机号(与 email 二选一) - code: 验证码 - intro: 个人介绍(可选,最多100字) """ users_db = db.load_data("users.json", default_data={}) # ========== 第一步:查重检查 ========== # 检查账号是否已存在 if user.account in users_db: raise HTTPException(status_code=400, detail="该账号已被注册,请更换一个") # 检查邮箱和手机号是否已被其他用户绑定 for existing_user in users_db.values(): if user.email and existing_user.get("email") == user.email: raise HTTPException(status_code=400, detail="此邮箱已注册,请直接登录或找回密码") if user.phone and existing_user.get("phone") == user.phone: raise HTTPException(status_code=400, detail="该手机号已被绑定") # ========== 第二步:验证码校验(原子性获取+删除) ========== # 根据注册方式构建缓存键 cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register" # 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用 cached = VERIFY_CODES.pop(cache_key, None) # 兼容新老缓存格式(expires_at 或 expires) expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0 # 校验验证码是否正确且未过期 if not cached or cached["code"] != user.code or time.time() > expire_time: raise HTTPException(status_code=400, detail="验证码不正确或已过期") # ========== 第三步:格式校验 ========== if len(user.account) <= 5: raise HTTPException(status_code=400, detail="账号必须大于5个字符") if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account): raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线") if len(user.password) < 6: raise HTTPException(status_code=400, detail="密码必须大于等于6个字符") if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password): raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符") if user.intro and len(user.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") # ========== 第四步:保存新用户 ========== # 构建用户数据对象 new_user = user.dict() new_user.pop("code", None) # 移除验证码字段,不存入数据库 # 🔒 安全防护:拒绝 base64 头像数据 if (new_user.get("avatarDataUrl") or "").startswith("data:"): new_user["avatarDataUrl"] = "" # 🔒 P0安全增强:密码哈希化存储(不再存储明文密码) new_user["password"] = hash_password(new_user["password"]) new_user.update({ "created_at": int(time.time()), # 注册时间戳 "followers": [], # 粉丝列表 "following": [], # 关注列表 "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False} # 隐私设置 }) # 保存到数据库 users_db[user.account] = new_user db.save_data("users.json", users_db) # 返回用户信息(排除密码) return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}} # ========================================== # 🔑 用户登录接口 # ========================================== # 作用:验证账号密码,返回登录凭证 # 关联: # - 数据库连接.py (读取 users.json 校验密码) # - 前端 登录表单组件.js @router.post("/api/users/login") @limiter.limit("10/minute") # 🚀 P2优化:每分钟最多10次登录尝试 async def login_user(request: Request, user: UserLogin): """ 用户登录接口 请求参数:(UserLogin 模型) - account: 账号 - password: 密码 """ users_db = db.load_data("users.json", default_data={}) # 检查账号是否存在 if user.account not in users_db: raise HTTPException(status_code=404, detail="账号不存在") user_data = users_db[user.account] stored_password = user_data.get("password", "") # 🔒 P0安全增强:密码哈希验证(使用统一验证函数) if not require_password_match(stored_password, user.password): raise HTTPException(status_code=401, detail="密码错误") # 🔒 P0安全增强:登录成功后,检查是否需要迁移旧密码为bcrypt if not user_data["password"].startswith('$2b$') and not user_data["password"].startswith('$2a$'): # 旧版SHA256密码,自动迁移为bcrypt user_data["password"] = hash_password(user.password) db.save_data("users.json", users_db) # 🔒 P0安全增强:生成 JWT Token(替代 mock_token) # 获取password_version用于Token生成(如不存在则默认为0) password_version = user_data.get("password_version", 0) # 根据用户选择的"保持登录"选项设置Token有效期 token = create_token(user.account, extra_data={"pwd_ver": password_version}, remember=user.remember) return { "status": "success", "token": token, # 🔒 JWT Token "account": user.account, "name": user_data["name"], "avatar": user_data.get("avatarDataUrl", "") } # ========================================== # 🔄 重置密码接口(万能兼容版) # ========================================== # 作用:通过邮箱/手机验证码重置用户密码 # 关联: # - verify_code_engine.py 的 VERIFY_CODES (校验验证码) # - 数据库连接.py (更新 users.json 中的密码) # - 前端 重置密码表单组件.js # 特点:万能解析器,兼容各种前端数据格式 @router.post("/api/users/reset_password") @limiter.limit("3/minute") # 🔒 P0安全优化:重置密码每分钟最多3次 async def reset_password(request: Request): """ 重置密码接口(万能兼容版) 支持的请求格式: - 标准 JSON - 双重字符串化 JSON - FormData """ # ========== 第一步:万能数据解析器 ========== # 作用:兼容各种前端可能发送的数据格式 try: data = await request.json() # 处理前端可能造成的"双重字符串化"问题 if isinstance(data, str): data = json.loads(data) except Exception: # 降级尝试 FormData 格式 try: form = await request.form() data = dict(form) except Exception: raise HTTPException(status_code=400, detail="请求数据解析失败,请检查网络") if not isinstance(data, dict): raise HTTPException(status_code=400, detail=f"前端数据格式异常,收到的是: {type(data).__name__}") # ========== 第二步:万能字段提取器 ========== # 作用:兼容前端可能使用的各种字段命名 account = data.get("account") new_password = data.get("new_password") or data.get("password") verify_contact = data.get("verifyContact") or data.get("verify_contact") or data.get("email") or data.get("phone") verify_type = data.get("verifyType") or data.get("verify_type") or data.get("contact_type") code = data.get("code") # 参数完整性校验 if not all([account, new_password, verify_contact, verify_type, code]): raise HTTPException(status_code=400, detail="缺失必要参数 (账号/密码/验证码/联系方式),请检查表单") # ========== 第三步:核心业务逻辑 ========== users_db = db.load_data("users.json", default_data={}) # 检查用户是否存在 if account not in users_db: raise HTTPException(status_code=404, detail="该用户不存在") user = users_db[account] # 校验联系方式是否与账号绑定的一致 if verify_type == "email" and user.get("email") != verify_contact: raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配") if verify_type == "phone" and user.get("phone") != verify_contact: raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配") # 校验验证码(原子性获取+删除) cache_key = f"{verify_contact}_reset" # 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用 cached = VERIFY_CODES.pop(cache_key, None) expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0 if not cached or cached["code"] != code or time.time() > expire_time: raise HTTPException(status_code=400, detail="验证码不正确或已过期") # 校验新密码格式 if len(new_password) < 6: raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符") if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', new_password): raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符") # ========== 第四步:更新密码并保存 ========== # 🔒 P0安全增强:新密码哈希化存储 user["password"] = hash_password(new_password) # 🔒 P0安全增强:更新password_version使旧Token失效 import time as time_module user["password_version"] = int(time_module.time()) db.save_data("users.json", users_db) # 🔒 P0安全增强:生成新Token返回给前端替换旧Token new_token = create_token(account, extra_data={"pwd_ver": user["password_version"]}) return {"status": "success", "message": "密码修改成功", "token": new_token}