PromptWarElection / app /security.py
Mr-TD's picture
feat: initialize 2026 Election Assistant web application with real-time tracking and comprehensive event documentation
3c4d71f
"""
Security Module
Provides HTTP security headers, input sanitization, and rate limiting.
"""
import html
import time
import functools
from collections import defaultdict
from flask import request, jsonify
# ── Security Headers ─────────────────────────────────────────
def apply_security_headers(response):
"""Add security headers to every response via @after_request."""
# Content Security Policy β€” allows Google services
csp_directives = [
"default-src 'self'",
"script-src 'self' 'unsafe-inline' https://translate.google.com https://translate.googleapis.com https://translate-pa.googleapis.com https://cdnjs.cloudflare.com https://maps.googleapis.com",
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://translate.googleapis.com https://*.gstatic.com",
"font-src 'self' https://fonts.gstatic.com",
"img-src 'self' data: https://*.googleapis.com https://*.gstatic.com https://translate.google.com https://www.google.com",
"frame-src https://www.google.com https://maps.google.com https://translate.google.com",
"connect-src 'self' ws: wss: https://translate.googleapis.com https://translate-pa.googleapis.com",
]
response.headers['Content-Security-Policy'] = '; '.join(csp_directives)
# Prevent clickjacking
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
# Prevent MIME-type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# XSS Protection (legacy browsers)
response.headers['X-XSS-Protection'] = '1; mode=block'
# Referrer Policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# Permissions Policy β€” disable unnecessary features
response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)'
return response
# ── Input Sanitization ───────────────────────────────────────
def sanitize_string(value: str, max_length: int = 500) -> str:
"""
Sanitize user-provided string input.
- HTML-escapes dangerous characters
- Strips leading/trailing whitespace
- Enforces maximum length
"""
if not isinstance(value, str):
return ''
value = value.strip()
value = html.escape(value, quote=True)
return value[:max_length]
def sanitize_json_input(data: dict, allowed_keys: set) -> dict:
"""
Filter JSON input to only allowed keys and sanitize string values.
Args:
data: Raw JSON dict from request
allowed_keys: Set of permitted key names
Returns:
Sanitized dict with only allowed keys
"""
if not isinstance(data, dict):
return {}
result = {}
for key in allowed_keys:
if key in data:
val = data[key]
result[key] = sanitize_string(val) if isinstance(val, str) else val
return result
# ── Rate Limiting ────────────────────────────────────────────
class RateLimiter:
"""
Simple in-memory rate limiter.
No Redis dependency β€” suitable for single-process deployment.
"""
def __init__(self, max_requests: int = 30, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self._store: dict[str, list[float]] = defaultdict(list)
def is_allowed(self, client_ip: str) -> bool:
"""Check if the client is within the rate limit."""
now = time.time()
window_start = now - self.window
# Clean old entries
self._store[client_ip] = [
t for t in self._store[client_ip] if t > window_start
]
if len(self._store[client_ip]) >= self.max_requests:
return False
self._store[client_ip].append(now)
return True
def get_remaining(self, client_ip: str) -> int:
"""Return remaining requests in current window."""
now = time.time()
window_start = now - self.window
recent = [t for t in self._store[client_ip] if t > window_start]
return max(0, self.max_requests - len(recent))
# Global rate limiter instances
api_limiter = RateLimiter(max_requests=60, window_seconds=60) # 60 req/min for reads
write_limiter = RateLimiter(max_requests=10, window_seconds=60) # 10 req/min for writes
def rate_limit(limiter: RateLimiter = api_limiter):
"""Decorator to apply rate limiting to a route."""
def decorator(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
client_ip = request.remote_addr or '0.0.0.0'
if not limiter.is_allowed(client_ip):
return jsonify({
'error': 'Rate limit exceeded. Please try again later.',
'retry_after': limiter.window,
}), 429
response = f(*args, **kwargs)
return response
return wrapper
return decorator