""" Security Module Provides HTTP security headers, input sanitization, and rate limiting. """ import html import time import functools from collections import defaultdict from flask import request, jsonify # ── Security Headers ───────────────────────────────────────── def apply_security_headers(response): """Add security headers to every response via @after_request.""" # Content Security Policy — allows Google services csp_directives = [ "default-src 'self'", "script-src 'self' 'unsafe-inline' https://translate.google.com https://translate.googleapis.com https://translate-pa.googleapis.com https://cdnjs.cloudflare.com https://maps.googleapis.com", "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://translate.googleapis.com https://*.gstatic.com", "font-src 'self' https://fonts.gstatic.com", "img-src 'self' data: https://*.googleapis.com https://*.gstatic.com https://translate.google.com https://www.google.com", "frame-src https://www.google.com https://maps.google.com https://translate.google.com", "connect-src 'self' ws: wss: https://translate.googleapis.com https://translate-pa.googleapis.com", ] response.headers['Content-Security-Policy'] = '; '.join(csp_directives) # Prevent clickjacking response.headers['X-Frame-Options'] = 'SAMEORIGIN' # Prevent MIME-type sniffing response.headers['X-Content-Type-Options'] = 'nosniff' # XSS Protection (legacy browsers) response.headers['X-XSS-Protection'] = '1; mode=block' # Referrer Policy response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' # Permissions Policy — disable unnecessary features response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)' return response # ── Input Sanitization ─────────────────────────────────────── def sanitize_string(value: str, max_length: int = 500) -> str: """ Sanitize user-provided string input. - HTML-escapes dangerous characters - Strips leading/trailing whitespace - Enforces maximum length """ if not isinstance(value, str): return '' value = value.strip() value = html.escape(value, quote=True) return value[:max_length] def sanitize_json_input(data: dict, allowed_keys: set) -> dict: """ Filter JSON input to only allowed keys and sanitize string values. Args: data: Raw JSON dict from request allowed_keys: Set of permitted key names Returns: Sanitized dict with only allowed keys """ if not isinstance(data, dict): return {} result = {} for key in allowed_keys: if key in data: val = data[key] result[key] = sanitize_string(val) if isinstance(val, str) else val return result # ── Rate Limiting ──────────────────────────────────────────── class RateLimiter: """ Simple in-memory rate limiter. No Redis dependency — suitable for single-process deployment. """ def __init__(self, max_requests: int = 30, window_seconds: int = 60): self.max_requests = max_requests self.window = window_seconds self._store: dict[str, list[float]] = defaultdict(list) def is_allowed(self, client_ip: str) -> bool: """Check if the client is within the rate limit.""" now = time.time() window_start = now - self.window # Clean old entries self._store[client_ip] = [ t for t in self._store[client_ip] if t > window_start ] if len(self._store[client_ip]) >= self.max_requests: return False self._store[client_ip].append(now) return True def get_remaining(self, client_ip: str) -> int: """Return remaining requests in current window.""" now = time.time() window_start = now - self.window recent = [t for t in self._store[client_ip] if t > window_start] return max(0, self.max_requests - len(recent)) # Global rate limiter instances api_limiter = RateLimiter(max_requests=60, window_seconds=60) # 60 req/min for reads write_limiter = RateLimiter(max_requests=10, window_seconds=60) # 10 req/min for writes def rate_limit(limiter: RateLimiter = api_limiter): """Decorator to apply rate limiting to a route.""" def decorator(f): @functools.wraps(f) def wrapper(*args, **kwargs): client_ip = request.remote_addr or '0.0.0.0' if not limiter.is_allowed(client_ip): return jsonify({ 'error': 'Rate limit exceeded. Please try again later.', 'retry_after': limiter.window, }), 429 response = f(*args, **kwargs) return response return wrapper return decorator