Spaces:
Running
Running
File size: 16,533 Bytes
f68778c 7d1c681 f68778c 8f9d15a f68778c a0ab3de f68778c a0ab3de f68778c 7d1c681 f68778c a0ab3de f68778c 7d1c681 f68778c 7d1c681 f68778c 7d1c681 f68778c 7d1c681 f68778c 7d1c681 f68778c c60e0ef f68778c 7d1c681 f68778c 7d1c681 f68778c 515f3e9 83b860f 515f3e9 f68778c a0ab3de f68778c 8f9d15a f68778c 7d1c681 f68778c 7d1c681 e3fab5f f68778c 7d1c681 f68778c c60e0ef f68778c c60e0ef f68778c c60e0ef f68778c 7d1c681 f68778c 7d1c681 f68778c 7d1c681 f68778c 7d1c681 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 | # router_users_auth.py
# ==========================================
# 🔐 用户认证路由模块
# ==========================================
# 作用:处理用户登录、注册、密码重置、验证码发送等认证相关接口
# 关联文件:
# - 安全认证.py (密码哈希 + JWT 安全模块) 🔒 P0安全增强
# - verify_code_engine.py (验证码缓存与发送)
# - 数据库连接.py (JSON数据库读写)
# - models.py (Pydantic数据模型定义)
# ==========================================
from fastapi import APIRouter, HTTPException, BackgroundTasks, Request
import time
import re
import random
import json
import 数据库连接 as db
from models import UserRegister, UserLogin, SendCodeRequest
from verify_code_engine import VERIFY_CODES, send_email_code, send_sms_code, cleanup_expired_codes, check_send_cooldown
# 🔒 P0安全增强:导入密码哈希和 JWT 工具
from 安全认证 import hash_password, verify_password, create_token, require_password_match
# 🚀 P2优化:速率限制
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# 创建子路由实例
router = APIRouter()
# ==========================================
# 📤 发送验证码接口(异步后台任务版)
# ==========================================
# 作用:接收验证码发送请求,后台异步发送邮件/短信
# 关联:verify_code_engine.py 的 send_email_code / send_sms_code
# 前端调用:注册表单组件.js、重置密码表单组件.js
@router.post("/api/users/send-code")
@limiter.limit("5/minute") # 🚀 P2优化:每分钟最多5次
async def send_verify_code(request: Request, req: SendCodeRequest, bg_tasks: BackgroundTasks):
"""
发送验证码接口(异步版本)
请求参数:
- contact: 邮箱或手机号
- contact_type: "email" 或 "phone"
- action_type: "register" 注册 / "reset" 重置密码
- account: (仅重置密码时必填) 用户账号
"""
# 如果是重置密码,需要先验证账号与联系方式匹配
if req.action_type == "reset":
if not req.account:
raise HTTPException(status_code=400, detail="找回密码需先填写当前账号")
users_db = db.load_data("users.json", default_data={})
user = users_db.get(req.account)
if not user:
raise HTTPException(status_code=404, detail="该账号不存在")
# 校验联系方式是否与账号绑定的一致
if req.contact_type == "email" and user.get("email") != req.contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致")
if req.contact_type == "phone" and user.get("phone") != req.contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致")
# 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条)
check_send_cooldown(req.contact)
# 生成6位随机验证码
code = str(random.randint(100000, 999999))
# 🚀 P1性能优化:每次写入前触发清理过期验证码
cleanup_expired_codes()
# 构建缓存键(联系方式_动作类型)
cache_key = f"{req.contact}_{req.action_type}"
# 将验证码存入内存缓存,有效期5分钟
VERIFY_CODES[cache_key] = {
"code": code,
"expires_at": int(time.time()) + 300 # 当前时间 + 300秒(5分钟)
}
# 根据联系方式类型,添加后台发送任务
if req.contact_type == "email":
bg_tasks.add_task(send_email_code, req.contact, code, req.action_type)
elif req.contact_type == "phone":
bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type)
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
return {"status": "success", "message": "验证码发送请求已提交"}
# ==========================================
# 📤 发送验证码接口(同步版本,备用)
# ==========================================
# 作用:同步方式发送验证码,部分场景需要立即知道发送结果
# 关联:verify_code_engine.py 的 send_email_code
@router.post("/api/users/send_code")
async def send_code_api(req: SendCodeRequest):
"""发送验证码接口(同步版本,直接等待发送结果)"""
# 🔒 检查发送频率限制(同一联系方式60秒内只能发送1条)
check_send_cooldown(req.contact)
# 生成6位随机验证码
code = str(random.randint(100000, 999999))
key = f"{req.contact}_{req.action_type}"
# 存入缓存,有效期5分钟
VERIFY_CODES[key] = {
"code": code,
"expires_at": time.time() + 300 # 300秒(5分钟)
}
# 同步发送(会阻塞等待结果)
if req.contact_type == "email":
try:
send_email_code(req.contact, code, req.action_type)
return {"status": "success", "message": "验证码已成功发送至邮箱"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}")
elif req.contact_type == "phone":
return {"status": "success", "message": "验证码已成功发送至手机"}
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
# ==========================================
# 📝 用户注册接口
# ==========================================
# 作用:新用户注册,需要验证邮箱/手机验证码
# 关联:
# - verify_code_engine.py 的 VERIFY_CODES (校验验证码)
# - 数据库连接.py (保存新用户到 users.json)
# - 前端 注册表单组件.js
@router.post("/api/users/register")
@limiter.limit("3/minute") # 🔒 P0安全优化:注册每分钟最多3次
async def register_user(request: Request, user: UserRegister):
"""
用户注册接口
请求参数:(UserRegister 模型)
- account: 账号(6-20位字母数字下划线)
- password: 密码(至少6位)
- name: 昵称
- email: 邮箱(与 phone 二选一)
- phone: 手机号(与 email 二选一)
- code: 验证码
- intro: 个人介绍(可选,最多100字)
"""
users_db = db.load_data("users.json", default_data={})
# ========== 第一步:查重检查 ==========
# 检查账号是否已存在
if user.account in users_db:
raise HTTPException(status_code=400, detail="该账号已被注册,请更换一个")
# 检查邮箱和手机号是否已被其他用户绑定
for existing_user in users_db.values():
if user.email and existing_user.get("email") == user.email:
raise HTTPException(status_code=400, detail="此邮箱已注册,请直接登录或找回密码")
if user.phone and existing_user.get("phone") == user.phone:
raise HTTPException(status_code=400, detail="该手机号已被绑定")
# ========== 第二步:验证码校验(原子性获取+删除) ==========
# 根据注册方式构建缓存键
cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register"
# 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用
cached = VERIFY_CODES.pop(cache_key, None)
# 兼容新老缓存格式(expires_at 或 expires)
expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0
# 校验验证码是否正确且未过期
if not cached or cached["code"] != user.code or time.time() > expire_time:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
# ========== 第三步:格式校验 ==========
if len(user.account) <= 5:
raise HTTPException(status_code=400, detail="账号必须大于5个字符")
if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account):
raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线")
if len(user.password) < 6:
raise HTTPException(status_code=400, detail="密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password):
raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符")
if user.intro and len(user.intro) > 100:
raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符")
# ========== 第四步:保存新用户 ==========
# 构建用户数据对象
new_user = user.dict()
new_user.pop("code", None) # 移除验证码字段,不存入数据库
# 🔒 安全防护:拒绝 base64 头像数据
if (new_user.get("avatarDataUrl") or "").startswith("data:"):
new_user["avatarDataUrl"] = ""
# 🔒 P0安全增强:密码哈希化存储(不再存储明文密码)
new_user["password"] = hash_password(new_user["password"])
new_user.update({
"created_at": int(time.time()), # 注册时间戳
"followers": [], # 粉丝列表
"following": [], # 关注列表
"privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False} # 隐私设置
})
# 保存到数据库
users_db[user.account] = new_user
db.save_data("users.json", users_db)
# 返回用户信息(排除密码)
return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}}
# ==========================================
# 🔑 用户登录接口
# ==========================================
# 作用:验证账号密码,返回登录凭证
# 关联:
# - 数据库连接.py (读取 users.json 校验密码)
# - 前端 登录表单组件.js
@router.post("/api/users/login")
@limiter.limit("10/minute") # 🚀 P2优化:每分钟最多10次登录尝试
async def login_user(request: Request, user: UserLogin):
"""
用户登录接口
请求参数:(UserLogin 模型)
- account: 账号
- password: 密码
"""
users_db = db.load_data("users.json", default_data={})
# 检查账号是否存在
if user.account not in users_db:
raise HTTPException(status_code=404, detail="账号不存在")
user_data = users_db[user.account]
stored_password = user_data.get("password", "")
# 🔒 P0安全增强:密码哈希验证(使用统一验证函数)
if not require_password_match(stored_password, user.password):
raise HTTPException(status_code=401, detail="密码错误")
# 🔒 P0安全增强:登录成功后,检查是否需要迁移旧密码为bcrypt
if not user_data["password"].startswith('$2b$') and not user_data["password"].startswith('$2a$'):
# 旧版SHA256密码,自动迁移为bcrypt
user_data["password"] = hash_password(user.password)
db.save_data("users.json", users_db)
# 🔒 P0安全增强:生成 JWT Token(替代 mock_token)
# 获取password_version用于Token生成(如不存在则默认为0)
password_version = user_data.get("password_version", 0)
# 根据用户选择的"保持登录"选项设置Token有效期
token = create_token(user.account, extra_data={"pwd_ver": password_version}, remember=user.remember)
return {
"status": "success",
"token": token, # 🔒 JWT Token
"account": user.account,
"name": user_data["name"],
"avatar": user_data.get("avatarDataUrl", "")
}
# ==========================================
# 🔄 重置密码接口(万能兼容版)
# ==========================================
# 作用:通过邮箱/手机验证码重置用户密码
# 关联:
# - verify_code_engine.py 的 VERIFY_CODES (校验验证码)
# - 数据库连接.py (更新 users.json 中的密码)
# - 前端 重置密码表单组件.js
# 特点:万能解析器,兼容各种前端数据格式
@router.post("/api/users/reset_password")
@limiter.limit("3/minute") # 🔒 P0安全优化:重置密码每分钟最多3次
async def reset_password(request: Request):
"""
重置密码接口(万能兼容版)
支持的请求格式:
- 标准 JSON
- 双重字符串化 JSON
- FormData
"""
# ========== 第一步:万能数据解析器 ==========
# 作用:兼容各种前端可能发送的数据格式
try:
data = await request.json()
# 处理前端可能造成的"双重字符串化"问题
if isinstance(data, str):
data = json.loads(data)
except Exception:
# 降级尝试 FormData 格式
try:
form = await request.form()
data = dict(form)
except Exception:
raise HTTPException(status_code=400, detail="请求数据解析失败,请检查网络")
if not isinstance(data, dict):
raise HTTPException(status_code=400, detail=f"前端数据格式异常,收到的是: {type(data).__name__}")
# ========== 第二步:万能字段提取器 ==========
# 作用:兼容前端可能使用的各种字段命名
account = data.get("account")
new_password = data.get("new_password") or data.get("password")
verify_contact = data.get("verifyContact") or data.get("verify_contact") or data.get("email") or data.get("phone")
verify_type = data.get("verifyType") or data.get("verify_type") or data.get("contact_type")
code = data.get("code")
# 参数完整性校验
if not all([account, new_password, verify_contact, verify_type, code]):
raise HTTPException(status_code=400, detail="缺失必要参数 (账号/密码/验证码/联系方式),请检查表单")
# ========== 第三步:核心业务逻辑 ==========
users_db = db.load_data("users.json", default_data={})
# 检查用户是否存在
if account not in users_db:
raise HTTPException(status_code=404, detail="该用户不存在")
user = users_db[account]
# 校验联系方式是否与账号绑定的一致
if verify_type == "email" and user.get("email") != verify_contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配")
if verify_type == "phone" and user.get("phone") != verify_contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配")
# 校验验证码(原子性获取+删除)
cache_key = f"{verify_contact}_reset"
# 🔒 P0安全修复:验证码一次性使用,原子性pop防止并发重用
cached = VERIFY_CODES.pop(cache_key, None)
expire_time = cached.get("expires_at", cached.get("expires", 0)) if cached else 0
if not cached or cached["code"] != code or time.time() > expire_time:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
# 校验新密码格式
if len(new_password) < 6:
raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', new_password):
raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符")
# ========== 第四步:更新密码并保存 ==========
# 🔒 P0安全增强:新密码哈希化存储
user["password"] = hash_password(new_password)
# 🔒 P0安全增强:更新password_version使旧Token失效
import time as time_module
user["password_version"] = int(time_module.time())
db.save_data("users.json", users_db)
# 🔒 P0安全增强:生成新Token返回给前端替换旧Token
new_token = create_token(account, extra_data={"pwd_ver": user["password_version"]})
return {"status": "success", "message": "密码修改成功", "token": new_token}
|