Spaces:
Sleeping
Sleeping
File size: 5,125 Bytes
3c4d71f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | """
Security Module
Provides HTTP security headers, input sanitization, and rate limiting.
"""
import html
import time
import functools
from collections import defaultdict
from flask import request, jsonify
# ββ Security Headers βββββββββββββββββββββββββββββββββββββββββ
def apply_security_headers(response):
"""Add security headers to every response via @after_request."""
# Content Security Policy β allows Google services
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' https://translate.google.com https://translate.googleapis.com https://translate-pa.googleapis.com https://cdnjs.cloudflare.com https://maps.googleapis.com",
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://translate.googleapis.com https://*.gstatic.com",
"font-src 'self' https://fonts.gstatic.com",
"img-src 'self' data: https://*.googleapis.com https://*.gstatic.com https://translate.google.com https://www.google.com",
"frame-src https://www.google.com https://maps.google.com https://translate.google.com",
"connect-src 'self' ws: wss: https://translate.googleapis.com https://translate-pa.googleapis.com",
]
response.headers['Content-Security-Policy'] = '; '.join(csp_directives)
# Prevent clickjacking
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
# Prevent MIME-type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# XSS Protection (legacy browsers)
response.headers['X-XSS-Protection'] = '1; mode=block'
# Referrer Policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions Policy β disable unnecessary features
response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)'
return response
# ββ Input Sanitization βββββββββββββββββββββββββββββββββββββββ
def sanitize_string(value: str, max_length: int = 500) -> str:
"""
Sanitize user-provided string input.
- HTML-escapes dangerous characters
- Strips leading/trailing whitespace
- Enforces maximum length
"""
if not isinstance(value, str):
return ''
value = value.strip()
value = html.escape(value, quote=True)
return value[:max_length]
def sanitize_json_input(data: dict, allowed_keys: set) -> dict:
"""
Filter JSON input to only allowed keys and sanitize string values.
Args:
data: Raw JSON dict from request
allowed_keys: Set of permitted key names
Returns:
Sanitized dict with only allowed keys
"""
if not isinstance(data, dict):
return {}
result = {}
for key in allowed_keys:
if key in data:
val = data[key]
result[key] = sanitize_string(val) if isinstance(val, str) else val
return result
# ββ Rate Limiting ββββββββββββββββββββββββββββββββββββββββββββ
class RateLimiter:
"""
Simple in-memory rate limiter.
No Redis dependency β suitable for single-process deployment.
"""
def __init__(self, max_requests: int = 30, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self._store: dict[str, list[float]] = defaultdict(list)
def is_allowed(self, client_ip: str) -> bool:
"""Check if the client is within the rate limit."""
now = time.time()
window_start = now - self.window
# Clean old entries
self._store[client_ip] = [
t for t in self._store[client_ip] if t > window_start
]
if len(self._store[client_ip]) >= self.max_requests:
return False
self._store[client_ip].append(now)
return True
def get_remaining(self, client_ip: str) -> int:
"""Return remaining requests in current window."""
now = time.time()
window_start = now - self.window
recent = [t for t in self._store[client_ip] if t > window_start]
return max(0, self.max_requests - len(recent))
# Global rate limiter instances
api_limiter = RateLimiter(max_requests=60, window_seconds=60) # 60 req/min for reads
write_limiter = RateLimiter(max_requests=10, window_seconds=60) # 10 req/min for writes
def rate_limit(limiter: RateLimiter = api_limiter):
"""Decorator to apply rate limiting to a route."""
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
client_ip = request.remote_addr or '0.0.0.0'
if not limiter.is_allowed(client_ip):
return jsonify({
'error': 'Rate limit exceeded. Please try again later.',
'retry_after': limiter.window,
}), 429
response = f(*args, **kwargs)
return response
return wrapper
return decorator
|