Spaces:
Sleeping
Sleeping
| """Security utilities - validation, rate limiting.""" | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import time | |
| from collections import defaultdict | |
| from fastapi import HTTPException, Request | |
| logger = logging.getLogger(__name__) | |
| ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/webp", "image/jpg"} | |
| ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"} | |
| MAX_FILE_SIZE_BYTES = 20 * 1024 * 1024 | |
| MAX_IMAGE_PIXELS = 16_000_000 | |
| RATE_LIMIT_PER_MINUTE = int(os.environ.get("RATE_LIMIT_PER_MINUTE", "30")) | |
| def validate_file_type(content_type, filename): | |
| if content_type and content_type.lower() not in ALLOWED_CONTENT_TYPES: | |
| raise HTTPException(415, f"Unsupported type: {content_type}") | |
| if filename: | |
| ext = "." + filename.rsplit(".", 1)[-1].lower() if "." in filename else "" | |
| if ext and ext not in ALLOWED_EXTENSIONS: | |
| raise HTTPException(415, f"Unsupported extension: {ext}") | |
| def validate_file_size(size): | |
| if size > MAX_FILE_SIZE_BYTES: | |
| raise HTTPException(413, f"File too large: {size / 1024 / 1024:.1f}MB. Max: 20MB") | |
| def validate_image_dimensions(width, height): | |
| if width * height > MAX_IMAGE_PIXELS: | |
| raise HTTPException(413, f"Image too large: {width}x{height}") | |
| class RateLimiter: | |
| def __init__(self, max_requests=RATE_LIMIT_PER_MINUTE, window_seconds=60): | |
| self.max_requests = max_requests | |
| self.window_seconds = window_seconds | |
| self._requests = defaultdict(list) | |
| def check(self, request: Request): | |
| ip = request.headers.get("X-Forwarded-For", "").split(",")[0].strip() or ( | |
| request.client.host if request.client else "unknown" | |
| ) | |
| now = time.time() | |
| cutoff = now - self.window_seconds | |
| self._requests[ip] = [t for t in self._requests[ip] if t > cutoff] | |
| self._requests[ip].append(now) | |
| if len(self._requests[ip]) > self.max_requests: | |
| raise HTTPException( | |
| 429, | |
| f"Rate limit exceeded. Max {self.max_requests} requests per {self.window_seconds}s", | |
| ) | |