import http.server import json import os import time import shutil import threading import urllib.request import re import socketserver from pathlib import Path from datetime import datetime from urllib.parse import urlparse, parse_qs HF_TOKEN = os.environ.get("HF_TOKEN", "") CACHE_DIR = Path("/tmp/proxy_cache") CACHE_DIR.mkdir(exist_ok=True) CACHE_TTL = 5 * 3600 MAX_CACHE_BYTES = 2 * 1024 * 1024 * 1023 _active_connections = 0 _active_lock = threading.Lock() _total_requests = 0 _start_time = time.time() _rate_limit = {} _rate_limit_lock = threading.Lock() LOCK = threading.Lock() class SmartCache: def __init__(self): self._start_cleaner() def _start_cleaner(self): def loop(): while True: time.sleep(300) self.cleanup() t = threading.Thread(target=loop, daemon=True) t.start() def get(self, key): fpath = CACHE_DIR / key meta = CACHE_DIR / f"{key}.meta" if fpath.exists() and meta.exists(): try: with open(meta) as f: m = json.load(f) if time.time() - m.get("ts", 0) < CACHE_TTL: m["hits"] = m.get("hits", 0) + 1 with open(meta, "w") as f: json.dump(m, f) return fpath fpath.unlink(missing_ok=True) meta.unlink(missing_ok=True) except: pass return None def put(self, key, src_path, content_type="video/mp4"): with LOCK: fpath = CACHE_DIR / key fpath.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_path, fpath) meta = CACHE_DIR / f"{key}.meta" with open(meta, "w") as f: json.dump({ "ts": time.time(), "expires": time.time() + CACHE_TTL, "hits": 1, "size": os.path.getsize(fpath), "content_type": content_type, }, f) def stats(self): files = 0 size = 0 with LOCK: for f in CACHE_DIR.rglob("*"): if f.is_file() and not f.name.endswith(".meta"): files += 1 size += f.stat().st_size return files, size def cleanup(self): with LOCK: total = sum(f.stat().st_size for f in CACHE_DIR.rglob("*") if f.is_file() and not f.name.endswith(".meta")) if total < MAX_CACHE_BYTES: return entries = [] for f in CACHE_DIR.rglob("*"): if f.is_file() and not f.name.endswith(".meta"): meta_path = CACHE_DIR / f"{f.name}.meta" hits = 0 mtime = f.stat().st_mtime if meta_path.exists(): try: with open(meta_path) as mf: m = json.load(mf) hits = m.get("hits", 0) mtime = m.get("ts", mtime) except: pass entries.append((hits, mtime, f.stat().st_size, f)) entries.sort() target = int(MAX_CACHE_BYTES * 0.5) for hits, mtime, sz, f in entries: if total <= target: break f.unlink(missing_ok=True) (CACHE_DIR / f"{f.name}.meta").unlink(missing_ok=True) total -= sz cache = SmartCache() def parse_stream_key(key): """Parse stream key like 'dataset|file_path' (| separator).""" import urllib.parse decoded = urllib.parse.unquote(key) if "|" in decoded: parts = decoded.split("|", 1) return parts[0], parts[1] return "", decoded def stream_from_hf(dataset, file_path, wfile, range_start=0, range_end=None): """Stream file from HF dataset, return total_size and whether streaming succeeded.""" dl_url = f"https://huggingface.co/datasets/{dataset}/resolve/main/{file_path}" headers = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" if range_end is not None: headers["Range"] = f"bytes={range_start}-{range_end}" elif range_start > 0: headers["Range"] = f"bytes={range_start}-" req = urllib.request.Request(dl_url, headers=headers) resp = urllib.request.urlopen(req, timeout=300) total = int(resp.headers.get("Content-Length", 0)) while True: chunk = resp.read(1048576) if not chunk: break wfile.write(chunk) wfile.flush() return total class Handler(http.server.BaseHTTPRequestHandler): def _check_rate_limit(self, ip): now = time.time() with _rate_limit_lock: times = _rate_limit.get(ip, []) times = [t for t in times if now - t < 60] if len(times) >= 100: return False times.append(now) _rate_limit[ip] = times return True def do_OPTIONS(self): self.send_response(204) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "*") self.send_header("Access-Control-Max-Age", "86400") self.end_headers() def do_GET(self): global _active_connections, _total_requests if not self._check_rate_limit(self.client_address[0]): self._json(429, {"error": "Too many requests"}) return with _active_lock: _active_connections += 1 _total_requests += 1 try: parsed = urlparse(self.path) params = parse_qs(parsed.query) if parsed.path == "/status": files, size = cache.stats() with _active_lock: active = _active_connections self._json(200, { "status": "ok", "version": "v4", "active_connections": active, "total_requests": _total_requests, "uptime_seconds": int(time.time() - _start_time), "cached_files": files, "cache_size_bytes": size, "disk_free_gb": round(shutil.disk_usage("/tmp").free / (1024**3), 1), }) return if parsed.path == "/health": files, size = cache.stats() free = shutil.disk_usage("/tmp").free self._json(200, { "status": "ok", "cached_files": files, "cache_size_bytes": size, "cache_size_gb": round(size / (1024**3), 2), "disk_free_bytes": free, "disk_free_gb": round(free / (1024**3), 1), "ttl_hours": CACHE_TTL / 3600, }) return if parsed.path.startswith("/stream/"): import urllib.parse as up raw_key = parsed.path.split("/stream/", 1)[-1] key = up.unquote(raw_key) if not key: self._json(400, {"error": "Missing file key"}) return fpath = cache.get(key) if fpath: range_header = self.headers.get("Range", "") self._stream_file(fpath, key, range_header) return dataset, file_path = parse_stream_key(key) if not dataset or not file_path: self._json(400, {"error": "Invalid key format. Expected: dataset/filename"}) return self._stream_from_hf_with_cache(key, dataset, file_path) return if parsed.path == "/list": files = [] for f in sorted(CACHE_DIR.rglob("*")): if f.is_file() and not f.name.endswith(".meta"): rel = str(f.relative_to(CACHE_DIR)) meta_path = f.parent / f"{f.name}.meta" expires = 0 hits = 0 if meta_path.exists(): try: with open(meta_path) as mf: m = json.load(mf) expires = m.get("expires", 0) hits = m.get("hits", 0) except: pass files.append({ "key": rel, "size": f.stat().st_size, "expires_at": datetime.fromtimestamp(expires).isoformat() if expires else "N/A", "hits": hits, }) self._json(200, {"files": files, "count": len(files)}) return self._json(404, {"error": "Not found. Try /health, /stream/{key}, /list"}) finally: with _active_lock: _active_connections -= 1 def do_POST(self): global _active_connections, _total_requests if not self._check_rate_limit(self.client_address[0]): self._json(429, {"error": "Too many requests"}) return with _active_lock: _active_connections += 1 _total_requests += 1 try: length = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(length)) if length else {} path = urlparse(self.path).path if path == "/cache": key = body.get("key", "") url = body.get("url", "") if not key or not url: self._json(400, {"error": "Missing key or url"}) return try: temp = CACHE_DIR / f"dl_{int(time.time()*1000)}_{os.urandom(2).hex()}" headers = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" req = urllib.request.Request(url, headers=headers) with urllib.request.urlopen(req, timeout=300) as src, open(temp, "wb") as f: shutil.copyfileobj(src, f) cache.put(key, temp) temp.unlink() self._json(200, {"status": "cached", "key": key}) except Exception as e: self._json(500, {"error": str(e)[:200]}) return if path == "/preload": dataset = body.get("dataset", "") file_name = body.get("file_name", "") if not dataset or not file_name: self._json(400, {"error": "Missing dataset or file_name"}) return key = f"{dataset}|{file_name}" dl_url = f"https://huggingface.co/datasets/{dataset}/resolve/main/{file_name}" try: temp = CACHE_DIR / f"dl_{int(time.time()*1000)}_{os.urandom(2).hex()}" headers = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" req = urllib.request.Request(dl_url, headers=headers) with urllib.request.urlopen(req, timeout=300) as src, open(temp, "wb") as f: shutil.copyfileobj(src, f) cache.put(key, temp, "video/mp4") temp.unlink() self._json(200, {"status": "preloaded", "key": key}) except Exception as e: self._json(500, {"error": str(e)[:200]}) return if path == "/flush": with LOCK: count = 0 for f in CACHE_DIR.rglob("*"): if f.is_file(): f.unlink() count += 1 self._json(200, {"status": "flushed", "removed": count}) return self._json(404, {"error": "Not found"}) finally: with _active_lock: _active_connections -= 1 def _stream_file(self, fpath, filename, range_header): file_size = fpath.stat().st_size if range_header: match = re.match(r"bytes=(\d+)-(\d*)", range_header) if match: start = int(match.group(1)) end = int(match.group(2)) if match.group(2) else file_size - 1 length = end - start + 1 self.send_response(206) self.send_header("Content-Range", f"bytes {start}-{end}/{file_size}") else: start, end, length = 0, file_size - 1, file_size self.send_response(200) else: start, end, length = 0, file_size - 1, file_size self.send_response(200) self.send_header("Content-Type", "video/mp4") self.send_header("Content-Length", str(length)) self.send_header("Accept-Ranges", "bytes") self.send_header("Cache-Control", f"public, max-age={CACHE_TTL}") self.send_header("Content-Disposition", f'inline; filename="{filename}"') self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Expose-Headers", "*") self.end_headers() with open(fpath, "rb") as f: f.seek(start) remaining = length while remaining > 0: chunk_size = min(1048576, remaining) chunk = f.read(chunk_size) if not chunk: break self.wfile.write(chunk) remaining -= len(chunk) def _stream_from_hf_with_cache(self, key, dataset, file_path): """Stream from HF dataset — if full file requested, cache it; otherwise forward range.""" dl_url = f"https://huggingface.co/datasets/{dataset}/resolve/main/{file_path}" client_range = self.headers.get("Range", "") req_headers = {} if HF_TOKEN: req_headers["Authorization"] = f"Bearer {HF_TOKEN}" if client_range: req_headers["Range"] = client_range try: hf_req = urllib.request.Request(dl_url, headers=req_headers) hf_resp = urllib.request.urlopen(hf_req, timeout=300) except Exception as e: err_msg = str(e)[:200] self._json(502, {"error": f"HF fetch failed: {err_msg}"}) return total = int(hf_resp.getheader("Content-Length", 0) or 0) hf_ct = hf_resp.getheader("Content-Type", "video/mp4") hf_cr = hf_resp.getheader("Content-Range", "") if hf_cr: self.send_response(206) self.send_header("Content-Range", hf_cr) elif client_range: self.send_response(206) if total: self.send_header("Content-Range", f"bytes 0-{total-1}/{total}") else: self.send_response(200) self.send_header("Content-Type", hf_ct) self.send_header("Content-Length", str(total)) self.send_header("Accept-Ranges", "bytes") self.send_header("Cache-Control", f"public, max-age={CACHE_TTL}") self.send_header("Content-Disposition", f'inline; filename="{key}"') self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Expose-Headers", "*") self.end_headers() do_cache = not client_range and total > 0 temp = CACHE_DIR / f"dl_{int(time.time()*1000)}_{os.urandom(2).hex()}" if do_cache else None written = 0 try: tmpf = open(temp, "wb") if temp else None while True: chunk = hf_resp.read(1048576) if not chunk: break if tmpf: tmpf.write(chunk) self.wfile.write(chunk) self.wfile.flush() written += len(chunk) if tmpf: tmpf.close() except (BrokenPipeError, ConnectionResetError): pass finally: hf_resp.close() if temp and written > 0 and written == total: cache.put(key, temp, hf_ct) if temp: temp.unlink(missing_ok=True) def _json(self, code, data): self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False, default=str).encode()) def log_message(self, fmt, *args): pass if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) print(f"Proxy v4 (stream-through) on :{port}") print(f"Cache: {CACHE_DIR} | TTL: {CACHE_TTL}s | Max: {MAX_CACHE_BYTES/(1024**3):.0f}GB") httpd = socketserver.ThreadingTCPServer(("0.0.0.0", port), Handler) httpd.serve_forever()