# rate_limiter.py # ========================================== # 🔒 P0安全优化:API 限流配置 # ========================================== # 作用:防止恶意刷接口、暴力破解、DDoS 攻击 # 关联文件: # - app.py (全局限流器初始化) # - 各 router_*.py (应用限流装饰器) # ========================================== from slowapi import Limiter from slowapi.util import get_remote_address from functools import wraps import time import logging logger = logging.getLogger("ComfyUI-Ranking.RateLimiter") # ========================================== # 📊 限流配置 # ========================================== # 全局限流器实例(在 app.py 中初始化) limiter = Limiter(key_func=get_remote_address) # 限流规则定义 RATE_LIMITS = { # 🔐 认证相关(严格限制) "login": "5/minute", # 登录:每分钟5次 "register": "3/minute", # 注册:每分钟3次 "reset_password": "3/minute", # 重置密码:每分钟3次 "send_code": "1/minute", # 发送验证码:每分钟1次 # 💰 金融操作(严格限制) "purchase": "10/minute", # 购买:每分钟10次 "refund": "3/minute", # 退款:每分钟3次 "withdraw": "3/minute", # 提现:每分钟3次 "tip": "20/minute", # 打赏:每分钟20次 # 📝 内容操作(中等限制) "create_item": "10/minute", # 发布内容:每分钟10次 "update_item": "30/minute", # 更新内容:每分钟30次 "create_task": "5/minute", # 创建任务:每分钟5次 "create_post": "10/minute", # 发帖:每分钟10次 "comment": "30/minute", # 评论:每分钟30次 # 🔄 互动操作(宽松限制) "like": "60/minute", # 点赞:每分钟60次 "follow": "30/minute", # 关注:每分钟30次 "message": "30/minute", # 私信:每分钟30次 # 📖 查询操作(最宽松) "read": "100/minute", # 读取:每分钟100次 "search": "30/minute", # 搜索:每分钟30次 # 📤 上传操作(严格限制) "upload": "20/minute", # 上传:每分钟20次 # 🌐 默认限制 "default": "60/minute", # 默认:每分钟60次 } # ========================================== # 🔧 限流工具函数 # ========================================== def get_limit(action: str) -> str: """获取指定操作的限流规则""" return RATE_LIMITS.get(action, RATE_LIMITS["default"]) # ========================================== # 🛡️ 用户级限流(基于账号) # ========================================== # 用户请求计数器 user_request_counts = {} USER_LIMIT_WINDOW = 60 # 时间窗口(秒) USER_LIMIT_MAX = 100 # 每用户每分钟最大请求数 def check_user_rate_limit(account: str) -> bool: """ 检查用户级限流 返回 True 表示允许请求,False 表示超限 """ if not account: return True now = time.time() key = f"user:{account}" # 清理过期记录 if key in user_request_counts: user_request_counts[key] = [ t for t in user_request_counts[key] if now - t < USER_LIMIT_WINDOW ] else: user_request_counts[key] = [] # 检查是否超限 if len(user_request_counts[key]) >= USER_LIMIT_MAX: logger.warning(f"RATE_LIMIT | user={account} | requests={len(user_request_counts[key])}") return False # 记录请求 user_request_counts[key].append(now) return True def user_rate_limit_decorator(func): """用户级限流装饰器""" @wraps(func) async def wrapper(*args, **kwargs): # 尝试从参数中获取用户账号 account = kwargs.get('current_user') or kwargs.get('account') if account and not check_user_rate_limit(account): from fastapi import HTTPException raise HTTPException( status_code=429, detail="请求过于频繁,请稍后再试" ) return await func(*args, **kwargs) return wrapper # ========================================== # 📊 限流统计 # ========================================== # IP 请求统计(用于监控) ip_stats = {} def record_request(ip: str, endpoint: str): """记录请求用于统计""" now = time.time() key = f"{ip}:{endpoint}" if key not in ip_stats: ip_stats[key] = {"count": 0, "first_request": now} ip_stats[key]["count"] += 1 ip_stats[key]["last_request"] = now def get_ip_stats(): """获取 IP 请求统计""" return ip_stats def clear_old_stats(max_age: int = 3600): """清理超过指定时间的统计数据""" now = time.time() keys_to_remove = [ k for k, v in ip_stats.items() if now - v.get("last_request", 0) > max_age ] for k in keys_to_remove: del ip_stats[k]