File size: 5,125 Bytes
3c4d71f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
Security Module
Provides HTTP security headers, input sanitization, and rate limiting.
"""

import html
import time
import functools
from collections import defaultdict
from flask import request, jsonify


# ── Security Headers ─────────────────────────────────────────

def apply_security_headers(response):
    """Add security headers to every response via @after_request."""

    # Content Security Policy β€” allows Google services
    csp_directives = [
        "default-src 'self'",
        "script-src 'self' 'unsafe-inline' https://translate.google.com https://translate.googleapis.com https://translate-pa.googleapis.com https://cdnjs.cloudflare.com https://maps.googleapis.com",
        "style-src 'self' 'unsafe-inline' https://fonts.googleapis.com https://translate.googleapis.com https://*.gstatic.com",
        "font-src 'self' https://fonts.gstatic.com",
        "img-src 'self' data: https://*.googleapis.com https://*.gstatic.com https://translate.google.com https://www.google.com",
        "frame-src https://www.google.com https://maps.google.com https://translate.google.com",
        "connect-src 'self' ws: wss: https://translate.googleapis.com https://translate-pa.googleapis.com",
    ]
    response.headers['Content-Security-Policy'] = '; '.join(csp_directives)

    # Prevent clickjacking
    response.headers['X-Frame-Options'] = 'SAMEORIGIN'

    # Prevent MIME-type sniffing
    response.headers['X-Content-Type-Options'] = 'nosniff'

    # XSS Protection (legacy browsers)
    response.headers['X-XSS-Protection'] = '1; mode=block'

    # Referrer Policy
    response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'

    # Permissions Policy β€” disable unnecessary features
    response.headers['Permissions-Policy'] = 'camera=(), microphone=(), geolocation=(self)'

    return response


# ── Input Sanitization ───────────────────────────────────────

def sanitize_string(value: str, max_length: int = 500) -> str:
    """
    Sanitize user-provided string input.

    - HTML-escapes dangerous characters
    - Strips leading/trailing whitespace
    - Enforces maximum length
    """
    if not isinstance(value, str):
        return ''
    value = value.strip()
    value = html.escape(value, quote=True)
    return value[:max_length]


def sanitize_json_input(data: dict, allowed_keys: set) -> dict:
    """
    Filter JSON input to only allowed keys and sanitize string values.

    Args:
        data: Raw JSON dict from request
        allowed_keys: Set of permitted key names

    Returns:
        Sanitized dict with only allowed keys
    """
    if not isinstance(data, dict):
        return {}
    result = {}
    for key in allowed_keys:
        if key in data:
            val = data[key]
            result[key] = sanitize_string(val) if isinstance(val, str) else val
    return result


# ── Rate Limiting ────────────────────────────────────────────

class RateLimiter:
    """
    Simple in-memory rate limiter.
    No Redis dependency β€” suitable for single-process deployment.
    """

    def __init__(self, max_requests: int = 30, window_seconds: int = 60):
        self.max_requests = max_requests
        self.window = window_seconds
        self._store: dict[str, list[float]] = defaultdict(list)

    def is_allowed(self, client_ip: str) -> bool:
        """Check if the client is within the rate limit."""
        now = time.time()
        window_start = now - self.window

        # Clean old entries
        self._store[client_ip] = [
            t for t in self._store[client_ip] if t > window_start
        ]

        if len(self._store[client_ip]) >= self.max_requests:
            return False

        self._store[client_ip].append(now)
        return True

    def get_remaining(self, client_ip: str) -> int:
        """Return remaining requests in current window."""
        now = time.time()
        window_start = now - self.window
        recent = [t for t in self._store[client_ip] if t > window_start]
        return max(0, self.max_requests - len(recent))


# Global rate limiter instances
api_limiter = RateLimiter(max_requests=60, window_seconds=60)     # 60 req/min for reads
write_limiter = RateLimiter(max_requests=10, window_seconds=60)   # 10 req/min for writes


def rate_limit(limiter: RateLimiter = api_limiter):
    """Decorator to apply rate limiting to a route."""
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            client_ip = request.remote_addr or '0.0.0.0'
            if not limiter.is_allowed(client_ip):
                return jsonify({
                    'error': 'Rate limit exceeded. Please try again later.',
                    'retry_after': limiter.window,
                }), 429
            response = f(*args, **kwargs)
            return response
        return wrapper
    return decorator