# utils.py import time import jwt from typing import List, Dict, Any, Optional from datetime import datetime, timedelta # Placeholders para imports de otros módulos RATE_LIMIT_CACHE: Dict[str, List[float]] = {} class AuthenticationError(Exception): pass class RateLimitError(Exception): pass class UserRole: INICIADO = "INICIADO" ADEPT = "ADEPT" ARCHITECTO = "ARCHITECTO" class SecurityUtils: """Utilidades críticas para la seguridad y performance (Rate Limiting y JWT).""" @staticmethod def check_rate_limit_and_log(user_identifier: str, window: int, requests: int): """Verifica el límite de peticiones (Throttling).""" now = time.time() RATE_LIMIT_CACHE[user_identifier] = [ t for t in RATE_LIMIT_CACHE.get(user_identifier, []) if now - t < window ] if len(RATE_LIMIT_CACHE[user_identifier]) >= requests: raise RateLimitError("¡Eh, pará la mano! Demasiadas peticiones.") RATE_LIMIT_CACHE[user_identifier].append(now) @staticmethod def create_jwt_token(soul_uuid: str, username: str, role: str, expire_minutes: int, secret_key: str) -> str: """Genera el token JWT. El pasaporte digital.""" expire = datetime.utcnow() + timedelta(minutes=expire_minutes) payload = {"sub": soul_uuid, "name": username, "role": role, "exp": expire} return jwt.encode(payload, secret_key, algorithm="HS256") @staticmethod def decode_jwt_token(token: str, secret_key: str) -> Optional[Dict[str, Any]]: """Decodifica y valida el token.""" try: return jwt.decode(token, secret_key, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise AuthenticationError("El token se venció. Volvé a entrar, che.") except Exception: return None