Spaces:
Sleeping
Sleeping
File size: 1,879 Bytes
20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 20e3763 7e732e6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# utils.py
import time
import jwt
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
# Placeholders para imports de otros módulos
RATE_LIMIT_CACHE: Dict[str, List[float]] = {}
class AuthenticationError(Exception): pass
class RateLimitError(Exception): pass
class UserRole:
INICIADO = "INICIADO"
ADEPT = "ADEPT"
ARCHITECTO = "ARCHITECTO"
class SecurityUtils:
"""Utilidades críticas para la seguridad y performance (Rate Limiting y JWT)."""
@staticmethod
def check_rate_limit_and_log(user_identifier: str, window: int, requests: int):
"""Verifica el límite de peticiones (Throttling)."""
now = time.time()
RATE_LIMIT_CACHE[user_identifier] = [
t for t in RATE_LIMIT_CACHE.get(user_identifier, [])
if now - t < window
]
if len(RATE_LIMIT_CACHE[user_identifier]) >= requests:
raise RateLimitError("¡Eh, pará la mano! Demasiadas peticiones.")
RATE_LIMIT_CACHE[user_identifier].append(now)
@staticmethod
def create_jwt_token(soul_uuid: str, username: str, role: str, expire_minutes: int, secret_key: str) -> str:
"""Genera el token JWT. El pasaporte digital."""
expire = datetime.utcnow() + timedelta(minutes=expire_minutes)
payload = {"sub": soul_uuid, "name": username, "role": role, "exp": expire}
return jwt.encode(payload, secret_key, algorithm="HS256")
@staticmethod
def decode_jwt_token(token: str, secret_key: str) -> Optional[Dict[str, Any]]:
"""Decodifica y valida el token."""
try:
return jwt.decode(token, secret_key, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise AuthenticationError("El token se venció. Volvé a entrar, che.")
except Exception:
return None |