Spaces:
Sleeping
Sleeping
| # utils.py | |
| import time | |
| import jwt | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime, timedelta | |
| # Placeholders para imports de otros módulos | |
| RATE_LIMIT_CACHE: Dict[str, List[float]] = {} | |
| class AuthenticationError(Exception): pass | |
| class RateLimitError(Exception): pass | |
| class UserRole: | |
| INICIADO = "INICIADO" | |
| ADEPT = "ADEPT" | |
| ARCHITECTO = "ARCHITECTO" | |
| class SecurityUtils: | |
| """Utilidades críticas para la seguridad y performance (Rate Limiting y JWT).""" | |
| def check_rate_limit_and_log(user_identifier: str, window: int, requests: int): | |
| """Verifica el límite de peticiones (Throttling).""" | |
| now = time.time() | |
| RATE_LIMIT_CACHE[user_identifier] = [ | |
| t for t in RATE_LIMIT_CACHE.get(user_identifier, []) | |
| if now - t < window | |
| ] | |
| if len(RATE_LIMIT_CACHE[user_identifier]) >= requests: | |
| raise RateLimitError("¡Eh, pará la mano! Demasiadas peticiones.") | |
| RATE_LIMIT_CACHE[user_identifier].append(now) | |
| def create_jwt_token(soul_uuid: str, username: str, role: str, expire_minutes: int, secret_key: str) -> str: | |
| """Genera el token JWT. El pasaporte digital.""" | |
| expire = datetime.utcnow() + timedelta(minutes=expire_minutes) | |
| payload = {"sub": soul_uuid, "name": username, "role": role, "exp": expire} | |
| return jwt.encode(payload, secret_key, algorithm="HS256") | |
| def decode_jwt_token(token: str, secret_key: str) -> Optional[Dict[str, Any]]: | |
| """Decodifica y valida el token.""" | |
| try: | |
| return jwt.decode(token, secret_key, algorithms=["HS256"]) | |
| except jwt.ExpiredSignatureError: | |
| raise AuthenticationError("El token se venció. Volvé a entrar, che.") | |
| except Exception: | |
| return None |