Spaces:
Running
Running
| # router_users_auth.py | |
| # ========================================== | |
| # 🔐 用户认证路由模块 | |
| # ========================================== | |
| # 作用:处理用户登录、注册、密码重置、验证码发送等认证相关接口 | |
| # 关联文件: | |
| # - 安全认证.py (密码哈希 + JWT 安全模块) 🔒 P0安全增强 | |
| # - verify_code_engine.py (验证码缓存与发送) | |
| # - 数据库连接.py (JSON数据库读写) | |
| # - models.py (Pydantic数据模型定义) | |
| # ========================================== | |
| from fastapi import APIRouter, HTTPException, BackgroundTasks, Request | |
| import time | |
| import re | |
| import random | |
| import json | |
| import 数据库连接 as db | |
| from models import UserRegister, UserLogin, SendCodeRequest | |
| from verify_code_engine import VERIFY_CODES, send_email_code, send_sms_code, cleanup_expired_codes, check_send_cooldown | |
| # 🔒 P0安全增强:导入密码哈希和 JWT 工具 | |
| from 安全认证 import hash_password, verify_password, create_token, require_password_match | |
| # 🚀 P2优化:速率限制 | |
| from slowapi import Limiter | |
| from slowapi.util import get_remote_address | |
| limiter = Limiter(key_func=get_remote_address) | |
| # 创建子路由实例 | |
| router = APIRouter() | |
| # ========================================== | |
| # 📤 发送验证码接口(异步后台任务版) | |
| # ========================================== | |
| # 作用:接收验证码发送请求,后台异步发送邮件/短信 | |
| # 关联:verify_code_engine.py 的 send_email_code / send_sms_code | |
| # 前端调用:注册表单组件.js、重置密码表单组件.js | |
| # 🚀 P2优化:每分钟最多5次 | |
| async def send_verify_code(request: Request, req: SendCodeRequest, bg_tasks: BackgroundTasks): | |
| """ | |
| 发送验证码接口(异步版本) | |
| 请求参数: | |
| - contact: 邮箱或手机号 | |
| - contact_type: "email" 或 "phone" | |
| - action_type: "register" 注册 / "reset" 重置密码 | |
| - account: (仅重置密码时必填) 用户账号 | |
| """ | |
| # 如果是重置密码,需要先验证账号与联系方式匹配 | |
| if req.action_type == "reset": | |
| if not req.account: | |
| raise HTTPException(status_code=400, detail="找回密码需先填写当前账号") | |
| users_db = db.load_data("users.json", default_data={}) | |
| user = users_db.get(req.account) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="该账号不存在") | |
| # 校验联系方式是否与账号绑定的一致 | |
| if req.contact_type == "email" and user.get("email") != req.contact: | |
| raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致") | |
| if req.contact_type == "phone" and user.get("phone") != req.contact: | |
| raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致") | |
| # 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条) | |
| check_send_cooldown(req.contact) | |
| # 生成6位随机验证码 | |
| code = str(random.randint(100000, 999999)) | |
| # 🚀 P1性能优化:每次写入前触发清理过期验证码 | |
| cleanup_expired_codes() | |
| # 构建缓存键(联系方式_动作类型) | |
| cache_key = f"{req.contact}_{req.action_type}" | |
| # 将验证码存入内存缓存,有效期5分钟 | |
| VERIFY_CODES[cache_key] = { | |
| "code": code, | |
| "expires_at": int(time.time()) + 300 # 当前时间 + 300秒(5分钟) | |
| } | |
| # 根据联系方式类型,添加后台发送任务 | |
| if req.contact_type == "email": | |
| bg_tasks.add_task(send_email_code, req.contact, code, req.action_type) | |
| elif req.contact_type == "phone": | |
| bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type) | |
| else: | |
| raise HTTPException(status_code=400, detail="不支持的验证方式") | |
| return {"status": "success", "message": "验证码发送请求已提交"} | |
| # ========================================== | |
| # 📤 发送验证码接口(同步版本,备用) | |
| # ========================================== | |
| # 作用:同步方式发送验证码,部分场景需要立即知道发送结果 | |
| # 关联:verify_code_engine.py 的 send_email_code | |
| async def send_code_api(req: SendCodeRequest): | |
| """发送验证码接口(同步版本,直接等待发送结果)""" | |
| # 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条) | |
| check_send_cooldown(req.contact) | |
| # 生成6位随机验证码 | |
| code = str(random.randint(100000, 999999)) | |
| key = f"{req.contact}_{req.action_type}" | |
| # 存入缓存,有效期5分钟 | |
| VERIFY_CODES[key] = { | |
| "code": code, | |
| "expires_at": time.time() + 300 # 300秒(5分钟) | |
| } | |
| # 同步发送(会阻塞等待结果) | |
| if req.contact_type == "email": | |
| try: | |
| send_email_code(req.contact, code, req.action_type) | |
| return {"status": "success", "message": "验证码已成功发送至邮箱"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}") | |
| elif req.contact_type == "phone": | |
| return {"status": "success", "message": "验证码已成功发送至手机"} | |
| else: | |
| raise HTTPException(status_code=400, detail="不支持的验证方式") | |
| # ========================================== | |
| # 📝 用户注册接口 | |
| # ========================================== | |
| # 作用:新用户注册,需要验证邮箱/手机验证码 | |
| # 关联: | |
| # - verify_code_engine.py 的 VERIFY_CODES (校验验证码) | |
| # - 数据库连接.py (保存新用户到 users.json) | |
| # - 前端 注册表单组件.js | |
| # 🔒 P0安全优化:注册每分钟最多3次 | |
| async def register_user(request: Request, user: UserRegister): | |
| """ | |
| 用户注册接口 | |
| 请求参数:(UserRegister 模型) | |
| - account: 账号(6-20位字母数字下划线) | |
| - password: 密码(至少6位) | |
| - name: 昵称 | |
| - email: 邮箱(与 phone 二选一) | |
| - phone: 手机号(与 email 二选一) | |
| - code: 验证码 | |
| - intro: 个人介绍(可选,最多100字) | |
| """ | |
| users_db = db.load_data("users.json", default_data={}) | |
| # ========== 第一步:查重检查 ========== | |
| # 检查账号是否已存在 | |
| if user.account in users_db: | |
| raise HTTPException(status_code=400, detail="该账号已被注册,请更换一个") | |
| # 检查邮箱和手机号是否已被其他用户绑定 | |
| for existing_user in users_db.values(): | |
| if user.email and existing_user.get("email") == user.email: | |
| raise HTTPException(status_code=400, detail="此邮箱已注册,请直接登录或找回密码") | |
| if user.phone and existing_user.get("phone") == user.phone: | |
| raise HTTPException(status_code=400, detail="该手机号已被绑定") | |
| # ========== 第二步:验证码校验(原子性获取+删除) ========== | |
| # 根据注册方式构建缓存键 | |
| cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register" | |
| # 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用 | |
| cached = VERIFY_CODES.pop(cache_key, None) | |
| # 兼容新老缓存格式(expires_at 或 expires) | |
| expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0 | |
| # 校验验证码是否正确且未过期 | |
| if not cached or cached["code"] != user.code or time.time() > expire_time: | |
| raise HTTPException(status_code=400, detail="验证码不正确或已过期") | |
| # ========== 第三步:格式校验 ========== | |
| if len(user.account) <= 5: | |
| raise HTTPException(status_code=400, detail="账号必须大于5个字符") | |
| if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account): | |
| raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线") | |
| if len(user.password) < 6: | |
| raise HTTPException(status_code=400, detail="密码必须大于等于6个字符") | |
| if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password): | |
| raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符") | |
| if user.intro and len(user.intro) > 100: | |
| raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") | |
| # ========== 第四步:保存新用户 ========== | |
| # 构建用户数据对象 | |
| new_user = user.dict() | |
| new_user.pop("code", None) # 移除验证码字段,不存入数据库 | |
| # 🔒 安全防护:拒绝 base64 头像数据 | |
| if (new_user.get("avatarDataUrl") or "").startswith("data:"): | |
| new_user["avatarDataUrl"] = "" | |
| # 🔒 P0安全增强:密码哈希化存储(不再存储明文密码) | |
| new_user["password"] = hash_password(new_user["password"]) | |
| new_user.update({ | |
| "created_at": int(time.time()), # 注册时间戳 | |
| "followers": [], # 粉丝列表 | |
| "following": [], # 关注列表 | |
| "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False} # 隐私设置 | |
| }) | |
| # 保存到数据库 | |
| users_db[user.account] = new_user | |
| db.save_data("users.json", users_db) | |
| # 返回用户信息(排除密码) | |
| return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}} | |
| # ========================================== | |
| # 🔑 用户登录接口 | |
| # ========================================== | |
| # 作用:验证账号密码,返回登录凭证 | |
| # 关联: | |
| # - 数据库连接.py (读取 users.json 校验密码) | |
| # - 前端 登录表单组件.js | |
| # 🚀 P2优化:每分钟最多10次登录尝试 | |
| async def login_user(request: Request, user: UserLogin): | |
| """ | |
| 用户登录接口 | |
| 请求参数:(UserLogin 模型) | |
| - account: 账号 | |
| - password: 密码 | |
| """ | |
| users_db = db.load_data("users.json", default_data={}) | |
| # 检查账号是否存在 | |
| if user.account not in users_db: | |
| raise HTTPException(status_code=404, detail="账号不存在") | |
| user_data = users_db[user.account] | |
| stored_password = user_data.get("password", "") | |
| # 🔒 P0安全增强:密码哈希验证(使用统一验证函数) | |
| if not require_password_match(stored_password, user.password): | |
| raise HTTPException(status_code=401, detail="密码错误") | |
| # 🔒 P0安全增强:登录成功后,检查是否需要迁移旧密码为bcrypt | |
| if not user_data["password"].startswith('$2b$') and not user_data["password"].startswith('$2a$'): | |
| # 旧版SHA256密码,自动迁移为bcrypt | |
| user_data["password"] = hash_password(user.password) | |
| db.save_data("users.json", users_db) | |
| # 🔒 P0安全增强:生成 JWT Token(替代 mock_token) | |
| # 获取password_version用于Token生成(如不存在则默认为0) | |
| password_version = user_data.get("password_version", 0) | |
| # 根据用户选择的"保持登录"选项设置Token有效期 | |
| token = create_token(user.account, extra_data={"pwd_ver": password_version}, remember=user.remember) | |
| return { | |
| "status": "success", | |
| "token": token, # 🔒 JWT Token | |
| "account": user.account, | |
| "name": user_data["name"], | |
| "avatar": user_data.get("avatarDataUrl", "") | |
| } | |
| # ========================================== | |
| # 🔄 重置密码接口(万能兼容版) | |
| # ========================================== | |
| # 作用:通过邮箱/手机验证码重置用户密码 | |
| # 关联: | |
| # - verify_code_engine.py 的 VERIFY_CODES (校验验证码) | |
| # - 数据库连接.py (更新 users.json 中的密码) | |
| # - 前端 重置密码表单组件.js | |
| # 特点:万能解析器,兼容各种前端数据格式 | |
| # 🔒 P0安全优化:重置密码每分钟最多3次 | |
| async def reset_password(request: Request): | |
| """ | |
| 重置密码接口(万能兼容版) | |
| 支持的请求格式: | |
| - 标准 JSON | |
| - 双重字符串化 JSON | |
| - FormData | |
| """ | |
| # ========== 第一步:万能数据解析器 ========== | |
| # 作用:兼容各种前端可能发送的数据格式 | |
| try: | |
| data = await request.json() | |
| # 处理前端可能造成的"双重字符串化"问题 | |
| if isinstance(data, str): | |
| data = json.loads(data) | |
| except Exception: | |
| # 降级尝试 FormData 格式 | |
| try: | |
| form = await request.form() | |
| data = dict(form) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="请求数据解析失败,请检查网络") | |
| if not isinstance(data, dict): | |
| raise HTTPException(status_code=400, detail=f"前端数据格式异常,收到的是: {type(data).__name__}") | |
| # ========== 第二步:万能字段提取器 ========== | |
| # 作用:兼容前端可能使用的各种字段命名 | |
| account = data.get("account") | |
| new_password = data.get("new_password") or data.get("password") | |
| verify_contact = data.get("verifyContact") or data.get("verify_contact") or data.get("email") or data.get("phone") | |
| verify_type = data.get("verifyType") or data.get("verify_type") or data.get("contact_type") | |
| code = data.get("code") | |
| # 参数完整性校验 | |
| if not all([account, new_password, verify_contact, verify_type, code]): | |
| raise HTTPException(status_code=400, detail="缺失必要参数 (账号/密码/验证码/联系方式),请检查表单") | |
| # ========== 第三步:核心业务逻辑 ========== | |
| users_db = db.load_data("users.json", default_data={}) | |
| # 检查用户是否存在 | |
| if account not in users_db: | |
| raise HTTPException(status_code=404, detail="该用户不存在") | |
| user = users_db[account] | |
| # 校验联系方式是否与账号绑定的一致 | |
| if verify_type == "email" and user.get("email") != verify_contact: | |
| raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配") | |
| if verify_type == "phone" and user.get("phone") != verify_contact: | |
| raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配") | |
| # 校验验证码(原子性获取+删除) | |
| cache_key = f"{verify_contact}_reset" | |
| # 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用 | |
| cached = VERIFY_CODES.pop(cache_key, None) | |
| expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0 | |
| if not cached or cached["code"] != code or time.time() > expire_time: | |
| raise HTTPException(status_code=400, detail="验证码不正确或已过期") | |
| # 校验新密码格式 | |
| if len(new_password) < 6: | |
| raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符") | |
| if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', new_password): | |
| raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符") | |
| # ========== 第四步:更新密码并保存 ========== | |
| # 🔒 P0安全增强:新密码哈希化存储 | |
| user["password"] = hash_password(new_password) | |
| # 🔒 P0安全增强:更新password_version使旧Token失效 | |
| import time as time_module | |
| user["password_version"] = int(time_module.time()) | |
| db.save_data("users.json", users_db) | |
| # 🔒 P0安全增强:生成新Token返回给前端替换旧Token | |
| new_token = create_token(account, extra_data={"pwd_ver": user["password_version"]}) | |
| return {"status": "success", "message": "密码修改成功", "token": new_token} | |