# app/utils.py import json import time from pathlib import Path from typing import Dict # --- Constants --- UPLOAD_ROOT = Path("/tmp/triflix_uploader") # --- Upload Management --- def get_upload_dir(upload_id: str) -> Path: """Returns the directory for a given upload.""" return UPLOAD_ROOT / upload_id def load_meta(upload_id: str) -> dict: """Loads metadata for an upload from its meta.json file.""" meta_path = get_upload_dir(upload_id) / "meta.json" if not meta_path.exists(): raise FileNotFoundError("Upload metadata not found.") return json.loads(meta_path.read_text()) def save_meta(upload_id: str, meta: dict): """Saves metadata for an upload to its meta.json file.""" meta_path = get_upload_dir(upload_id) / "meta.json" meta_path.parent.mkdir(parents=True, exist_ok=True) meta_path.write_text(json.dumps(meta, indent=2)) # --- Rate Limiting --- rate_limit_store: Dict[str, list] = {} def enforce_rate_limit(ip: str, limit: int = 10, per_seconds: int = 60) -> bool: """Simple in-memory rate limiter. Returns True if allowed, False if denied.""" now = time.time() timestamps = rate_limit_store.setdefault(ip, []) # Remove timestamps older than the window timestamps[:] = [t for t in timestamps if t > now - per_seconds] if len(timestamps) >= limit: return False timestamps.append(now) return True