|
|
|
|
|
import json |
|
|
import time |
|
|
from pathlib import Path |
|
|
from typing import Dict |
|
|
|
|
|
|
|
|
UPLOAD_ROOT = Path("/tmp/triflix_uploader") |
|
|
|
|
|
|
|
|
def get_upload_dir(upload_id: str) -> Path: |
|
|
"""Returns the directory for a given upload.""" |
|
|
return UPLOAD_ROOT / upload_id |
|
|
|
|
|
def load_meta(upload_id: str) -> dict: |
|
|
"""Loads metadata for an upload from its meta.json file.""" |
|
|
meta_path = get_upload_dir(upload_id) / "meta.json" |
|
|
if not meta_path.exists(): |
|
|
raise FileNotFoundError("Upload metadata not found.") |
|
|
return json.loads(meta_path.read_text()) |
|
|
|
|
|
def save_meta(upload_id: str, meta: dict): |
|
|
"""Saves metadata for an upload to its meta.json file.""" |
|
|
meta_path = get_upload_dir(upload_id) / "meta.json" |
|
|
meta_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
meta_path.write_text(json.dumps(meta, indent=2)) |
|
|
|
|
|
|
|
|
rate_limit_store: Dict[str, list] = {} |
|
|
|
|
|
def enforce_rate_limit(ip: str, limit: int = 10, per_seconds: int = 60) -> bool: |
|
|
"""Simple in-memory rate limiter. Returns True if allowed, False if denied.""" |
|
|
now = time.time() |
|
|
timestamps = rate_limit_store.setdefault(ip, []) |
|
|
|
|
|
timestamps[:] = [t for t in timestamps if t > now - per_seconds] |
|
|
if len(timestamps) >= limit: |
|
|
return False |
|
|
timestamps.append(now) |
|
|
return True |