Samuel_4.0 / utils.py
Lukeetah's picture
Update utils.py
7e732e6 verified
# utils.py
import time
import jwt
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
# Placeholders para imports de otros módulos
RATE_LIMIT_CACHE: Dict[str, List[float]] = {}
class AuthenticationError(Exception): pass
class RateLimitError(Exception): pass
class UserRole:
INICIADO = "INICIADO"
ADEPT = "ADEPT"
ARCHITECTO = "ARCHITECTO"
class SecurityUtils:
"""Utilidades críticas para la seguridad y performance (Rate Limiting y JWT)."""
@staticmethod
def check_rate_limit_and_log(user_identifier: str, window: int, requests: int):
"""Verifica el límite de peticiones (Throttling)."""
now = time.time()
RATE_LIMIT_CACHE[user_identifier] = [
t for t in RATE_LIMIT_CACHE.get(user_identifier, [])
if now - t < window
]
if len(RATE_LIMIT_CACHE[user_identifier]) >= requests:
raise RateLimitError("¡Eh, pará la mano! Demasiadas peticiones.")
RATE_LIMIT_CACHE[user_identifier].append(now)
@staticmethod
def create_jwt_token(soul_uuid: str, username: str, role: str, expire_minutes: int, secret_key: str) -> str:
"""Genera el token JWT. El pasaporte digital."""
expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
payload = {"sub": soul_uuid, "name": username, "role": role, "exp": expire}
return jwt.encode(payload, secret_key, algorithm="HS256")
@staticmethod
def decode_jwt_token(token: str, secret_key: str) -> Optional[Dict[str, Any]]:
"""Decodifica y valida el token."""
try:
return jwt.decode(token, secret_key, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise AuthenticationError("El token se venció. Volvé a entrar, che.")
except Exception:
return None