Spaces:
Sleeping
Sleeping
| """ | |
| Security Module | |
| Provides HTTP security headers, input sanitization, and rate limiting. | |
| """ | |
| import html | |
| import time | |
| import functools | |
| from collections import defaultdict | |
| from flask import request, jsonify | |
| # ββ Security Headers βββββββββββββββββββββββββββββββββββββββββ | |
| def apply_security_headers(response): | |
| """Add security headers to every response via @after_request.""" | |
| # Content Security Policy β allows Google services | |
| csp_directives = [ | |
| "default-src 'self'", | |
| "script-src 'self' 'unsafe-inline' https://translate.google.com https://translate.googleapis.com https://translate-pa.googleapis.com https://cdnjs.cloudflare.com https://maps.googleapis.com", | |
| "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://translate.googleapis.com https://*.gstatic.com", | |
| "font-src 'self' https://fonts.gstatic.com", | |
| "img-src 'self' data: https://*.googleapis.com https://*.gstatic.com https://translate.google.com https://www.google.com", | |
| "frame-src https://www.google.com https://maps.google.com https://translate.google.com", | |
| "connect-src 'self' ws: wss: https://translate.googleapis.com https://translate-pa.googleapis.com", | |
| ] | |
| response.headers['Content-Security-Policy'] = '; '.join(csp_directives) | |
| # Prevent clickjacking | |
| response.headers['X-Frame-Options'] = 'SAMEORIGIN' | |
| # Prevent MIME-type sniffing | |
| response.headers['X-Content-Type-Options'] = 'nosniff' | |
| # XSS Protection (legacy browsers) | |
| response.headers['X-XSS-Protection'] = '1; mode=block' | |
| # Referrer Policy | |
| response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin' | |
| # Permissions Policy β disable unnecessary features | |
| response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)' | |
| return response | |
| # ββ Input Sanitization βββββββββββββββββββββββββββββββββββββββ | |
| def sanitize_string(value: str, max_length: int = 500) -> str: | |
| """ | |
| Sanitize user-provided string input. | |
| - HTML-escapes dangerous characters | |
| - Strips leading/trailing whitespace | |
| - Enforces maximum length | |
| """ | |
| if not isinstance(value, str): | |
| return '' | |
| value = value.strip() | |
| value = html.escape(value, quote=True) | |
| return value[:max_length] | |
| def sanitize_json_input(data: dict, allowed_keys: set) -> dict: | |
| """ | |
| Filter JSON input to only allowed keys and sanitize string values. | |
| Args: | |
| data: Raw JSON dict from request | |
| allowed_keys: Set of permitted key names | |
| Returns: | |
| Sanitized dict with only allowed keys | |
| """ | |
| if not isinstance(data, dict): | |
| return {} | |
| result = {} | |
| for key in allowed_keys: | |
| if key in data: | |
| val = data[key] | |
| result[key] = sanitize_string(val) if isinstance(val, str) else val | |
| return result | |
| # ββ Rate Limiting ββββββββββββββββββββββββββββββββββββββββββββ | |
| class RateLimiter: | |
| """ | |
| Simple in-memory rate limiter. | |
| No Redis dependency β suitable for single-process deployment. | |
| """ | |
| def __init__(self, max_requests: int = 30, window_seconds: int = 60): | |
| self.max_requests = max_requests | |
| self.window = window_seconds | |
| self._store: dict[str, list[float]] = defaultdict(list) | |
| def is_allowed(self, client_ip: str) -> bool: | |
| """Check if the client is within the rate limit.""" | |
| now = time.time() | |
| window_start = now - self.window | |
| # Clean old entries | |
| self._store[client_ip] = [ | |
| t for t in self._store[client_ip] if t > window_start | |
| ] | |
| if len(self._store[client_ip]) >= self.max_requests: | |
| return False | |
| self._store[client_ip].append(now) | |
| return True | |
| def get_remaining(self, client_ip: str) -> int: | |
| """Return remaining requests in current window.""" | |
| now = time.time() | |
| window_start = now - self.window | |
| recent = [t for t in self._store[client_ip] if t > window_start] | |
| return max(0, self.max_requests - len(recent)) | |
| # Global rate limiter instances | |
| api_limiter = RateLimiter(max_requests=60, window_seconds=60) # 60 req/min for reads | |
| write_limiter = RateLimiter(max_requests=10, window_seconds=60) # 10 req/min for writes | |
| def rate_limit(limiter: RateLimiter = api_limiter): | |
| """Decorator to apply rate limiting to a route.""" | |
| def decorator(f): | |
| def wrapper(*args, **kwargs): | |
| client_ip = request.remote_addr or '0.0.0.0' | |
| if not limiter.is_allowed(client_ip): | |
| return jsonify({ | |
| 'error': 'Rate limit exceeded. Please try again later.', | |
| 'retry_after': limiter.window, | |
| }), 429 | |
| response = f(*args, **kwargs) | |
| return response | |
| return wrapper | |
| return decorator | |