# ═══════════════════════════════════════════════════════════════════════════════ # 🧅 SODA-GEN ANONYMOUS RELAY V1.5 — HF SPACE CPU (BACKEND ONLY) # # V1.5: ULTRA-MINIMAL startup — ZERO blocking calls # - Root endpoint `/` added (HF Space health check) # - Tor runs as background process (not service) # - All Tor init deferred to background thread # - First generate request triggers lazy Tor check # ═══════════════════════════════════════════════════════════════════════════════ import json import os import random import re import shutil import subprocess import time import uuid from pathlib import Path from typing import Dict, Any, Optional import httpx from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, Request from fastapi.responses import FileResponse, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware from gradio_client import Client, handle_file from PIL import Image from stem import Signal from stem.control import Controller import threading # ═══════════════════════════════════════════════════════════════════════════════ # ⚙️ CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════════ TARGET_URL = "https://r3gm-wan2-2-fp8da-aoti-preview-2.hf.space" TOR_PROXY = "socks5://127.0.0.1:9050" PREDICT_TIMEOUT = 300 MAX_TOR_RETRIES = 5 GPU_RETRY_DELAY = 15 WAN_NEG = ( "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, " "整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余手指, " "画不好手部, 畸形, 毁容" ) OUTPUT_DIR = Path("outputs") OUTPUT_DIR.mkdir(exist_ok=True) UPLOAD_DIR = Path("uploads") UPLOAD_DIR.mkdir(exist_ok=True) TASKS: Dict[str, Dict[str, Any]] = {} _last_tor_ip = {"ip": "unknown", "time": 0} _tor_ready = False # ═══════════════════════════════════════════════════════════════════════════════ # 🧅 TOR ENGINE (Lazy — only runs when needed) # ═══════════════════════════════════════════════════════════════════════════════ def check_tor_alive() -> bool: """Quick check if Tor SOCKS proxy is responding (non-blocking, 3s timeout)""" try: r = httpx.get("https://api.ipify.org", proxy=TOR_PROXY, timeout=3) return r.status_code == 200 except: return False def get_tor_ip() -> str: now = time.time() if now - _last_tor_ip["time"] < 10 and _last_tor_ip["ip"] != "unknown": return _last_tor_ip["ip"] try: r = httpx.get("https://api.ipify.org", proxy=TOR_PROXY, timeout=15) ip = r.text.strip() _last_tor_ip["ip"] = ip _last_tor_ip["time"] = now return ip except Exception as e: print(f"⚠️ get_tor_ip failed: {e}") return "unknown" def rotate_tor_circuit() -> str: try: with Controller.from_port(port=9051) as controller: controller.authenticate() controller.signal(Signal.NEWNYM) time.sleep(5) _last_tor_ip["ip"] = "unknown" return get_tor_ip() except Exception as e: print(f"⚠️ Tor rotation failed: {e}") return "rotation_failed" def _lazy_tor_init(): """ Background thread: wait for Tor to be ready, then get IP. Called once on first generate request or after startup. """ global _tor_ready for attempt in range(6): if check_tor_alive(): _tor_ready = True ip = get_tor_ip() print(f"🧅 Tor ready! IP: {ip} (attempt {attempt+1})") return print(f"⏳ Tor not ready, waiting 5s... (attempt {attempt+1}/6)") time.sleep(5) print("⚠️ Tor init incomplete — will retry on requests") # Start lazy init in background (non-blocking) threading.Thread(target=_lazy_tor_init, daemon=True).start() # ═══════════════════════════════════════════════════════════════════════════════ # 📡 GRADIO CLIENT WITH PROXY # ═══════════════════════════════════════════════════════════════════════════════ class GradioClientWithProxy: def __init__(self, base_url: str, proxy: str = None): self.base_url = base_url self.proxy = proxy self.client = None self._old_env = {} self._setup_client() def _setup_client(self): if self.proxy: for key in ["HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy", "ALL_PROXY", "all_proxy"]: self._old_env[key] = os.environ.get(key) os.environ["HTTP_PROXY"] = self.proxy os.environ["HTTPS_PROXY"] = self.proxy os.environ["http_proxy"] = self.proxy os.environ["https_proxy"] = self.proxy os.environ["ALL_PROXY"] = self.proxy os.environ["all_proxy"] = self.proxy self.client = Client(self.base_url, headers={"User-Agent": "SodaGen/1.5"}) def submit(self, **kwargs): return self.client.submit(**kwargs) def _restore_env(self): for key, val in self._old_env.items(): if val is None: os.environ.pop(key, None) else: os.environ[key] = val def __del__(self): self._restore_env() # ═══════════════════════════════════════════════════════════════════════════════ # 📊 ERROR DETECTION # ═══════════════════════════════════════════════════════════════════════════════ def is_quota_error(msg: str) -> bool: return any(kw in msg.lower() for kw in ["exceeded","quota","rate limit","too many requests","daily limit","usage limit"]) def is_tor_blocked(msg: str) -> bool: return any(kw in msg.lower() for kw in ["403","forbidden","blocked","banned","denied","access denied","ip blocked"]) def is_gpu_cold(msg: str) -> bool: return any(kw in msg.lower() for kw in ["no gpu","gpu was available","available after","not ready"]) def parse_retry_time(msg: str) -> Optional[int]: m = re.search(r'(?:try again in|retry in)\s+(.+?)(?:\.|$)', msg, re.IGNORECASE) if m: try: s = m.group(1).strip() days = 0 d = re.search(r'(\d+)\s+day', s) if d: days = int(d.group(1)) t = re.search(r'(\d+):(\d+):(\d+)', s) if t: return (days*86400)+(int(t.group(1))*3600)+(int(t.group(2))*60)+int(t.group(3)) except: pass return None # ═══════════════════════════════════════════════════════════════════════════════ # 🎬 POST-PROCESSING # ═══════════════════════════════════════════════════════════════════════════════ def get_video_info(path: str) -> dict: try: r = subprocess.run(['ffprobe','-v','quiet','-print_format','json','-show_format','-show_streams',path], capture_output=True, text=True, timeout=10) info = json.loads(r.stdout) d = float(info.get('format',{}).get('duration',0)) s = int(info.get('format',{}).get('size',0)) vs = next((x for x in info.get('streams',[]) if x.get('codec_type')=='video'), {}) return {'duration':d,'size':s,'width':int(vs.get('width',768)),'height':int(vs.get('height',512))} except: sz = os.path.getsize(path) if os.path.exists(path) else 0 return {'duration':0,'size':sz,'width':768,'height':512} def gen_thumb(vpath, opath): try: subprocess.run(['ffmpeg','-y','-i',vpath,'-vframes','1','-q:v','3','-vf','scale=640:-1',opath], capture_output=True, timeout=15) return os.path.exists(opath) except: return False def resize_img(path, w=768, h=512): try: if not path or not os.path.exists(path): return None Image.open(path).convert("RGB").resize((int(w),int(h)), Image.LANCZOS).save(path) return path except: return path # ═══════════════════════════════════════════════════════════════════════════════ # 🚀 GENERATION TASK # ═══════════════════════════════════════════════════════════════════════════════ def run_generation_task(task_id, prompt, img1_path, img2_path, duration, steps, frame_mult): task = TASKS[task_id] try: task["status"] = "processing" task["log"] += f"🎬 [ANONYMOUS] {prompt[:80]}...\n" task["log"] += f"⏱️ {duration}s | Steps: {steps} | FPS: {frame_mult}\n" ip = get_tor_ip() task["log"] += f"🧅 Tor IP: {ip}\n" if ip == "unknown": task["log"] += "⚠️ Tor not ready, waiting 15s...\n" time.sleep(15) ip = get_tor_ip() task["log"] += f"🧅 Tor IP retry: {ip}\n" i1 = resize_img(img1_path) if img1_path else None i2 = resize_img(img2_path) if img2_path else None f1 = handle_file(i1) if i1 else None f2 = handle_file(i2) if i2 else None ok = False vid = None retry = 0 while retry <= MAX_TOR_RETRIES: cip = get_tor_ip() task["log"] += f"🧅 IP: {cip} ({retry+1}/{MAX_TOR_RETRIES+1})\n" try: cw = GradioClientWithProxy(TARGET_URL, proxy=TOR_PROXY) task["log"] += "🚀 Submitting...\n" task["progress"] = 20 job = cw.submit( input_image=f1, last_image=f2, prompt=prompt, steps=float(steps), negative_prompt=WAN_NEG, duration_seconds=float(duration), guidance_scale=1.0, guidance_scale_2=1.0, seed=float(random.randint(0,999999)), randomize_seed=True, quality=6.0, scheduler="UniPCMultistep", flow_shift=3.0, frame_multiplier=int(frame_mult), safe_mode=False, video_component=True, api_name="/generate_video", ) task["log"] += f"⏳ Waiting ({PREDICT_TIMEOUT}s)...\n" task["progress"] = 50 result = job.result(timeout=PREDICT_TIMEOUT) task["progress"] = 80 vid = result[0] if isinstance(result,(list,tuple)) else result if vid: ok = True task["log"] += f"✅ SUCCESS via {cip}\n" break else: task["log"] += f"⚠️ Empty result\n" except Exception as e: em = str(e) if is_quota_error(em) or is_tor_blocked(em): task["log"] += f"⚠️ {'Quota' if is_quota_error(em) else 'Blocked'}. Rotating...\n" nip = rotate_tor_circuit() task["log"] += f"🧅 New: {nip}\n" elif is_gpu_cold(em): task["log"] += f"⏳ GPU cold, wait {GPU_RETRY_DELAY}s...\n" time.sleep(GPU_RETRY_DELAY) continue elif "timeout" in em.lower(): task["log"] += "⏰ Timeout, retry...\n" time.sleep(GPU_RETRY_DELAY) nip = rotate_tor_circuit() task["log"] += f"🧅 New: {nip}\n" else: task["log"] += f"⚠️ {em[:150]}\n" if retry < MAX_TOR_RETRIES: nip = rotate_tor_circuit() task["log"] += f"🧅 New: {nip}\n" retry += 1 if ok and vid: outf = f"{task_id}.mp4" outp = OUTPUT_DIR / outf src = vid["video"] if isinstance(vid,dict) and "video" in vid else str(vid) shutil.copy2(src, str(outp)) tf = f"{task_id}.jpg" tp = OUTPUT_DIR / tf ht = gen_thumb(str(outp), str(tp)) vi = get_video_info(str(outp)) task.update(video_path=outf, thumbnail_path=tf if ht else None, video_info=vi, status="complete", progress=100) task["log"] += f"✅ {vi['width']}x{vi['height']}, {vi['duration']:.1f}s, {vi['size']//1024}KB\n" else: task["status"] = "error" task["log"] += f"❌ Failed after {retry+1} rotations\n" except Exception as e: task["status"] = "error" task["log"] += f"❌ FATAL: {str(e)[:150]}\n" # ═══════════════════════════════════════════════════════════════════════════════ # ⚡ VIDEO STREAMING # ═══════════════════════════════════════════════════════════════════════════════ def stream_video(vpath, req): fsz = os.path.getsize(vpath) rh = req.headers.get('range') if rh: m = rh.replace('bytes=','').split('-') s = int(m[0]) if m[0] else 0 e = int(m[1]) if len(m)>1 and m[1] else fsz-1 s,e = min(s,fsz-1),min(e,fsz-1) cl = e-s+1 def it(): with open(vpath,'rb') as f: f.seek(s); r=cl while r>0: c=f.read(min(1048576,r)) if not c: break r-=len(c); yield c return StreamingResponse(it(),status_code=206,media_type='video/mp4',headers={'Content-Range':f'bytes {s}-{e}/{fsz}','Accept-Ranges':'bytes','Content-Length':str(cl),'Cache-Control':'public, max-age=86400'}) else: def it(): with open(vpath,'rb') as f: while True: c=f.read(1048576) if not c: break yield c return StreamingResponse(it(),status_code=200,media_type='video/mp4',headers={'Accept-Ranges':'bytes','Content-Length':str(fsz),'Cache-Control':'public, max-age=86400'}) # ═══════════════════════════════════════════════════════════════════════════════ # 🌐 FASTAPI — ZERO BLOCKING STARTUP # ═══════════════════════════════════════════════════════════════════════════════ app = FastAPI(title="SODA-GEN Relay V1.5") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ══════════════════════════════════════════════════════ # 🔑 ROOT ENDPOINT — HF Space health check butuh ini! # ══════════════════════════════════════════════════════ @app.get("/") async def root(): """HF Space checks this to determine if space is alive""" return {"status": "ok", "service": "SODA-GEN Relay V1.5", "tor_ready": _tor_ready} @app.get("/api/health") async def health(): return JSONResponse({ "status": "ok", "mode": "anonymous", "version": "1.5", "tor_ip": _last_tor_ip["ip"], "tor_ready": _tor_ready, }) @app.post("/api/generate") async def api_generate( background_tasks: BackgroundTasks, prompt: str = Form(...), duration: float = Form(5.0), steps: float = Form(6.0), frame_mult: int = Form(16), img_start: Optional[UploadFile] = File(None), img_end: Optional[UploadFile] = File(None), ): tid = uuid.uuid4().hex[:12] td = UPLOAD_DIR / tid td.mkdir(exist_ok=True) i1 = i2 = None if img_start and img_start.filename: i1 = str(td/"start.png") with open(i1,"wb") as f: f.write(await img_start.read()) if img_end and img_end.filename: i2 = str(td/"end.png") with open(i2,"wb") as f: f.write(await img_end.read()) TASKS[tid] = {"status":"queued","progress":0,"log":"","video_path":None,"thumbnail_path":None,"video_info":None} background_tasks.add_task(run_generation_task, task_id=tid, prompt=prompt, img1_path=i1, img2_path=i2, duration=duration, steps=steps, frame_mult=frame_mult) return JSONResponse({"task_id": tid}) @app.get("/api/task/{task_id}") async def api_task(task_id: str): t = TASKS.get(task_id) if not t: return JSONResponse({"error":"Not found"}, status_code=404) r = {"status":t["status"],"progress":t["progress"],"log":t["log"]} if t["status"]=="complete" and t.get("video_path"): r["video_url"] = f"/api/video/{t['video_path']}" if t.get("thumbnail_path"): r["thumbnail_url"] = f"/api/thumbnail/{t['thumbnail_path']}" if t.get("video_info"): r["video_info"] = t["video_info"] return JSONResponse(r) @app.get("/api/video/{fn}") async def vid(fn: str, request: Request): p = OUTPUT_DIR/fn if not p.exists(): return JSONResponse({"error":"Not found"}, status_code=404) return stream_video(str(p), request) @app.get("/api/thumbnail/{fn}") async def thumb(fn: str): p = OUTPUT_DIR/fn if not p.exists(): return JSONResponse({"error":"Not found"}, status_code=404) return FileResponse(str(p), media_type="image/jpeg", headers={"Cache-Control":"public, max-age=86400"}) @app.get("/api/stats") async def stats(): return JSONResponse({ "mode":"anonymous","tor_ip":_last_tor_ip["ip"],"tor_ready":_tor_ready, "quota_per_ip":120,"total_tokens":"infinity","active_tokens":"∞", "cooldown_tokens":0,"version":"1.5", }) @app.get("/api/tor-ip") async def torip(): _last_tor_ip["ip"] = "unknown" return JSONResponse({"ip": get_tor_ip()}) @app.post("/api/tor-rotate") async def torrot(): nip = rotate_tor_circuit() return JSONResponse({"new_ip": nip, "success": nip != "rotation_failed"})