ComfyUI-Ranking-API / rate_limiter.py
ZHIWEI666's picture
版本更新
c60e0ef verified
# rate_limiter.py
# ==========================================
# 🔒 P0安全优化:API 限流配置
# ==========================================
# 作用:防止恶意刷接口、暴力破解、DDoS 攻击
# 关联文件:
# - app.py (全局限流器初始化)
# - 各 router_*.py (应用限流装饰器)
# ==========================================
from slowapi import Limiter
from slowapi.util import get_remote_address
from functools import wraps
import time
import logging
logger = logging.getLogger("ComfyUI-Ranking.RateLimiter")
# ==========================================
# 📊 限流配置
# ==========================================
# 全局限流器实例(在 app.py 中初始化)
limiter = Limiter(key_func=get_remote_address)
# 限流规则定义
RATE_LIMITS = {
# 🔐 认证相关(严格限制)
"login": "5/minute", # 登录:每分钟5次
"register": "3/minute", # 注册:每分钟3次
"reset_password": "3/minute", # 重置密码:每分钟3次
"send_code": "1/minute", # 发送验证码:每分钟1次
# 💰 金融操作(严格限制)
"purchase": "10/minute", # 购买:每分钟10次
"refund": "3/minute", # 退款:每分钟3次
"withdraw": "3/minute", # 提现:每分钟3次
"tip": "20/minute", # 打赏:每分钟20次
# 📝 内容操作(中等限制)
"create_item": "10/minute", # 发布内容:每分钟10次
"update_item": "30/minute", # 更新内容:每分钟30次
"create_task": "5/minute", # 创建任务:每分钟5次
"create_post": "10/minute", # 发帖:每分钟10次
"comment": "30/minute", # 评论:每分钟30次
# 🔄 互动操作(宽松限制)
"like": "60/minute", # 点赞:每分钟60次
"follow": "30/minute", # 关注:每分钟30次
"message": "30/minute", # 私信:每分钟30次
# 📖 查询操作(最宽松)
"read": "100/minute", # 读取:每分钟100次
"search": "30/minute", # 搜索:每分钟30次
# 📤 上传操作(严格限制)
"upload": "20/minute", # 上传:每分钟20次
# 🌐 默认限制
"default": "60/minute", # 默认:每分钟60次
}
# ==========================================
# 🔧 限流工具函数
# ==========================================
def get_limit(action: str) -> str:
"""获取指定操作的限流规则"""
return RATE_LIMITS.get(action, RATE_LIMITS["default"])
# ==========================================
# 🛡️ 用户级限流(基于账号)
# ==========================================
# 用户请求计数器
user_request_counts = {}
USER_LIMIT_WINDOW = 60 # 时间窗口(秒)
USER_LIMIT_MAX = 100 # 每用户每分钟最大请求数
def check_user_rate_limit(account: str) -> bool:
"""
检查用户级限流
返回 True 表示允许请求,False 表示超限
"""
if not account:
return True
now = time.time()
key = f"user:{account}"
# 清理过期记录
if key in user_request_counts:
user_request_counts[key] = [
t for t in user_request_counts[key]
if now - t < USER_LIMIT_WINDOW
]
else:
user_request_counts[key] = []
# 检查是否超限
if len(user_request_counts[key]) >= USER_LIMIT_MAX:
logger.warning(f"RATE_LIMIT | user={account} | requests={len(user_request_counts[key])}")
return False
# 记录请求
user_request_counts[key].append(now)
return True
def user_rate_limit_decorator(func):
"""用户级限流装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
# 尝试从参数中获取用户账号
account = kwargs.get('current_user') or kwargs.get('account')
if account and not check_user_rate_limit(account):
from fastapi import HTTPException
raise HTTPException(
status_code=429,
detail="请求过于频繁,请稍后再试"
)
return await func(*args, **kwargs)
return wrapper
# ==========================================
# 📊 限流统计
# ==========================================
# IP 请求统计(用于监控)
ip_stats = {}
def record_request(ip: str, endpoint: str):
"""记录请求用于统计"""
now = time.time()
key = f"{ip}:{endpoint}"
if key not in ip_stats:
ip_stats[key] = {"count": 0, "first_request": now}
ip_stats[key]["count"] += 1
ip_stats[key]["last_request"] = now
def get_ip_stats():
"""获取 IP 请求统计"""
return ip_stats
def clear_old_stats(max_age: int = 3600):
"""清理超过指定时间的统计数据"""
now = time.time()
keys_to_remove = [
k for k, v in ip_stats.items()
if now - v.get("last_request", 0) > max_age
]
for k in keys_to_remove:
del ip_stats[k]