ServerRelay2 / app_relay2.py
Bl4ckSpaces's picture
Update app_relay2.py
7051a38 verified
Raw
History Blame Contribute Delete
20.2 kB
# ═══════════════════════════════════════════════════════════════════════════════
# 🧅 SODA-GEN ANONYMOUS RELAY V1.5 — HF SPACE CPU (BACKEND ONLY)
#
# V1.5: ULTRA-MINIMAL startup — ZERO blocking calls
# - Root endpoint `/` added (HF Space health check)
# - Tor runs as background process (not service)
# - All Tor init deferred to background thread
# - First generate request triggers lazy Tor check
# ═══════════════════════════════════════════════════════════════════════════════
import json
import os
import random
import re
import shutil
import subprocess
import time
import uuid
from pathlib import Path
from typing import Dict, Any, Optional
import httpx
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, Request
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from gradio_client import Client, handle_file
from PIL import Image
from stem import Signal
from stem.control import Controller
import threading
# ═══════════════════════════════════════════════════════════════════════════════
# ⚙️ CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════════
TARGET_URL = "https://r3gm-wan2-2-fp8da-aoti-preview-2.hf.space"
TOR_PROXY = "socks5://127.0.0.1:9050"
PREDICT_TIMEOUT = 300
MAX_TOR_RETRIES = 5
GPU_RETRY_DELAY = 15
WAN_NEG = (
"色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, "
"整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余手指, "
"画不好手部, 畸形, 毁容"
)
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
TASKS: Dict[str, Dict[str, Any]] = {}
_last_tor_ip = {"ip": "unknown", "time": 0}
_tor_ready = False
# ═══════════════════════════════════════════════════════════════════════════════
# 🧅 TOR ENGINE (Lazy — only runs when needed)
# ═══════════════════════════════════════════════════════════════════════════════
def check_tor_alive() -> bool:
"""Quick check if Tor SOCKS proxy is responding (non-blocking, 3s timeout)"""
try:
r = httpx.get("https://api.ipify.org", proxy=TOR_PROXY, timeout=3)
return r.status_code == 200
except:
return False
def get_tor_ip() -> str:
now = time.time()
if now - _last_tor_ip["time"] < 10 and _last_tor_ip["ip"] != "unknown":
return _last_tor_ip["ip"]
try:
r = httpx.get("https://api.ipify.org", proxy=TOR_PROXY, timeout=15)
ip = r.text.strip()
_last_tor_ip["ip"] = ip
_last_tor_ip["time"] = now
return ip
except Exception as e:
print(f"⚠️ get_tor_ip failed: {e}")
return "unknown"
def rotate_tor_circuit() -> str:
try:
with Controller.from_port(port=9051) as controller:
controller.authenticate()
controller.signal(Signal.NEWNYM)
time.sleep(5)
_last_tor_ip["ip"] = "unknown"
return get_tor_ip()
except Exception as e:
print(f"⚠️ Tor rotation failed: {e}")
return "rotation_failed"
def _lazy_tor_init():
"""
Background thread: wait for Tor to be ready, then get IP.
Called once on first generate request or after startup.
"""
global _tor_ready
for attempt in range(6):
if check_tor_alive():
_tor_ready = True
ip = get_tor_ip()
print(f"🧅 Tor ready! IP: {ip} (attempt {attempt+1})")
return
print(f"⏳ Tor not ready, waiting 5s... (attempt {attempt+1}/6)")
time.sleep(5)
print("⚠️ Tor init incomplete — will retry on requests")
# Start lazy init in background (non-blocking)
threading.Thread(target=_lazy_tor_init, daemon=True).start()
# ═══════════════════════════════════════════════════════════════════════════════
# 📡 GRADIO CLIENT WITH PROXY
# ═══════════════════════════════════════════════════════════════════════════════
class GradioClientWithProxy:
def __init__(self, base_url: str, proxy: str = None):
self.base_url = base_url
self.proxy = proxy
self.client = None
self._old_env = {}
self._setup_client()
def _setup_client(self):
if self.proxy:
for key in ["HTTP_PROXY", "HTTPS_PROXY", "http_proxy", "https_proxy",
"ALL_PROXY", "all_proxy"]:
self._old_env[key] = os.environ.get(key)
os.environ["HTTP_PROXY"] = self.proxy
os.environ["HTTPS_PROXY"] = self.proxy
os.environ["http_proxy"] = self.proxy
os.environ["https_proxy"] = self.proxy
os.environ["ALL_PROXY"] = self.proxy
os.environ["all_proxy"] = self.proxy
self.client = Client(self.base_url, headers={"User-Agent": "SodaGen/1.5"})
def submit(self, **kwargs):
return self.client.submit(**kwargs)
def _restore_env(self):
for key, val in self._old_env.items():
if val is None: os.environ.pop(key, None)
else: os.environ[key] = val
def __del__(self):
self._restore_env()
# ═══════════════════════════════════════════════════════════════════════════════
# 📊 ERROR DETECTION
# ═══════════════════════════════════════════════════════════════════════════════
def is_quota_error(msg: str) -> bool:
return any(kw in msg.lower() for kw in ["exceeded","quota","rate limit","too many requests","daily limit","usage limit"])
def is_tor_blocked(msg: str) -> bool:
return any(kw in msg.lower() for kw in ["403","forbidden","blocked","banned","denied","access denied","ip blocked"])
def is_gpu_cold(msg: str) -> bool:
return any(kw in msg.lower() for kw in ["no gpu","gpu was available","available after","not ready"])
def parse_retry_time(msg: str) -> Optional[int]:
m = re.search(r'(?:try again in|retry in)\s+(.+?)(?:\.|$)', msg, re.IGNORECASE)
if m:
try:
s = m.group(1).strip()
days = 0
d = re.search(r'(\d+)\s+day', s)
if d: days = int(d.group(1))
t = re.search(r'(\d+):(\d+):(\d+)', s)
if t: return (days*86400)+(int(t.group(1))*3600)+(int(t.group(2))*60)+int(t.group(3))
except: pass
return None
# ═══════════════════════════════════════════════════════════════════════════════
# 🎬 POST-PROCESSING
# ═══════════════════════════════════════════════════════════════════════════════
def get_video_info(path: str) -> dict:
try:
r = subprocess.run(['ffprobe','-v','quiet','-print_format','json','-show_format','-show_streams',path], capture_output=True, text=True, timeout=10)
info = json.loads(r.stdout)
d = float(info.get('format',{}).get('duration',0))
s = int(info.get('format',{}).get('size',0))
vs = next((x for x in info.get('streams',[]) if x.get('codec_type')=='video'), {})
return {'duration':d,'size':s,'width':int(vs.get('width',768)),'height':int(vs.get('height',512))}
except:
sz = os.path.getsize(path) if os.path.exists(path) else 0
return {'duration':0,'size':sz,'width':768,'height':512}
def gen_thumb(vpath, opath):
try:
subprocess.run(['ffmpeg','-y','-i',vpath,'-vframes','1','-q:v','3','-vf','scale=640:-1',opath], capture_output=True, timeout=15)
return os.path.exists(opath)
except: return False
def resize_img(path, w=768, h=512):
try:
if not path or not os.path.exists(path): return None
Image.open(path).convert("RGB").resize((int(w),int(h)), Image.LANCZOS).save(path)
return path
except: return path
# ═══════════════════════════════════════════════════════════════════════════════
# 🚀 GENERATION TASK
# ═══════════════════════════════════════════════════════════════════════════════
def run_generation_task(task_id, prompt, img1_path, img2_path, duration, steps, frame_mult):
task = TASKS[task_id]
try:
task["status"] = "processing"
task["log"] += f"🎬 [ANONYMOUS] {prompt[:80]}...\n"
task["log"] += f"⏱️ {duration}s | Steps: {steps} | FPS: {frame_mult}\n"
ip = get_tor_ip()
task["log"] += f"🧅 Tor IP: {ip}\n"
if ip == "unknown":
task["log"] += "⚠️ Tor not ready, waiting 15s...\n"
time.sleep(15)
ip = get_tor_ip()
task["log"] += f"🧅 Tor IP retry: {ip}\n"
i1 = resize_img(img1_path) if img1_path else None
i2 = resize_img(img2_path) if img2_path else None
f1 = handle_file(i1) if i1 else None
f2 = handle_file(i2) if i2 else None
ok = False
vid = None
retry = 0
while retry <= MAX_TOR_RETRIES:
cip = get_tor_ip()
task["log"] += f"🧅 IP: {cip} ({retry+1}/{MAX_TOR_RETRIES+1})\n"
try:
cw = GradioClientWithProxy(TARGET_URL, proxy=TOR_PROXY)
task["log"] += "🚀 Submitting...\n"
task["progress"] = 20
job = cw.submit(
input_image=f1, last_image=f2, prompt=prompt,
steps=float(steps), negative_prompt=WAN_NEG,
duration_seconds=float(duration),
guidance_scale=1.0, guidance_scale_2=1.0,
seed=float(random.randint(0,999999)),
randomize_seed=True, quality=6.0,
scheduler="UniPCMultistep", flow_shift=3.0,
frame_multiplier=int(frame_mult),
safe_mode=False, video_component=True,
api_name="/generate_video",
)
task["log"] += f"⏳ Waiting ({PREDICT_TIMEOUT}s)...\n"
task["progress"] = 50
result = job.result(timeout=PREDICT_TIMEOUT)
task["progress"] = 80
vid = result[0] if isinstance(result,(list,tuple)) else result
if vid:
ok = True
task["log"] += f"✅ SUCCESS via {cip}\n"
break
else:
task["log"] += f"⚠️ Empty result\n"
except Exception as e:
em = str(e)
if is_quota_error(em) or is_tor_blocked(em):
task["log"] += f"⚠️ {'Quota' if is_quota_error(em) else 'Blocked'}. Rotating...\n"
nip = rotate_tor_circuit()
task["log"] += f"🧅 New: {nip}\n"
elif is_gpu_cold(em):
task["log"] += f"⏳ GPU cold, wait {GPU_RETRY_DELAY}s...\n"
time.sleep(GPU_RETRY_DELAY)
continue
elif "timeout" in em.lower():
task["log"] += "⏰ Timeout, retry...\n"
time.sleep(GPU_RETRY_DELAY)
nip = rotate_tor_circuit()
task["log"] += f"🧅 New: {nip}\n"
else:
task["log"] += f"⚠️ {em[:150]}\n"
if retry < MAX_TOR_RETRIES:
nip = rotate_tor_circuit()
task["log"] += f"🧅 New: {nip}\n"
retry += 1
if ok and vid:
outf = f"{task_id}.mp4"
outp = OUTPUT_DIR / outf
src = vid["video"] if isinstance(vid,dict) and "video" in vid else str(vid)
shutil.copy2(src, str(outp))
tf = f"{task_id}.jpg"
tp = OUTPUT_DIR / tf
ht = gen_thumb(str(outp), str(tp))
vi = get_video_info(str(outp))
task.update(video_path=outf, thumbnail_path=tf if ht else None, video_info=vi, status="complete", progress=100)
task["log"] += f"✅ {vi['width']}x{vi['height']}, {vi['duration']:.1f}s, {vi['size']//1024}KB\n"
else:
task["status"] = "error"
task["log"] += f"❌ Failed after {retry+1} rotations\n"
except Exception as e:
task["status"] = "error"
task["log"] += f"❌ FATAL: {str(e)[:150]}\n"
# ═══════════════════════════════════════════════════════════════════════════════
# ⚡ VIDEO STREAMING
# ═══════════════════════════════════════════════════════════════════════════════
def stream_video(vpath, req):
fsz = os.path.getsize(vpath)
rh = req.headers.get('range')
if rh:
m = rh.replace('bytes=','').split('-')
s = int(m[0]) if m[0] else 0
e = int(m[1]) if len(m)>1 and m[1] else fsz-1
s,e = min(s,fsz-1),min(e,fsz-1)
cl = e-s+1
def it():
with open(vpath,'rb') as f:
f.seek(s); r=cl
while r>0:
c=f.read(min(1048576,r))
if not c: break
r-=len(c); yield c
return StreamingResponse(it(),status_code=206,media_type='video/mp4',headers={'Content-Range':f'bytes {s}-{e}/{fsz}','Accept-Ranges':'bytes','Content-Length':str(cl),'Cache-Control':'public, max-age=86400'})
else:
def it():
with open(vpath,'rb') as f:
while True:
c=f.read(1048576)
if not c: break
yield c
return StreamingResponse(it(),status_code=200,media_type='video/mp4',headers={'Accept-Ranges':'bytes','Content-Length':str(fsz),'Cache-Control':'public, max-age=86400'})
# ═══════════════════════════════════════════════════════════════════════════════
# 🌐 FASTAPI — ZERO BLOCKING STARTUP
# ═══════════════════════════════════════════════════════════════════════════════
app = FastAPI(title="SODA-GEN Relay V1.5")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ══════════════════════════════════════════════════════
# 🔑 ROOT ENDPOINT — HF Space health check butuh ini!
# ══════════════════════════════════════════════════════
@app.get("/")
async def root():
"""HF Space checks this to determine if space is alive"""
return {"status": "ok", "service": "SODA-GEN Relay V1.5", "tor_ready": _tor_ready}
@app.get("/api/health")
async def health():
return JSONResponse({
"status": "ok", "mode": "anonymous", "version": "1.5",
"tor_ip": _last_tor_ip["ip"], "tor_ready": _tor_ready,
})
@app.post("/api/generate")
async def api_generate(
background_tasks: BackgroundTasks,
prompt: str = Form(...),
duration: float = Form(5.0),
steps: float = Form(6.0),
frame_mult: int = Form(16),
img_start: Optional[UploadFile] = File(None),
img_end: Optional[UploadFile] = File(None),
):
tid = uuid.uuid4().hex[:12]
td = UPLOAD_DIR / tid
td.mkdir(exist_ok=True)
i1 = i2 = None
if img_start and img_start.filename:
i1 = str(td/"start.png")
with open(i1,"wb") as f: f.write(await img_start.read())
if img_end and img_end.filename:
i2 = str(td/"end.png")
with open(i2,"wb") as f: f.write(await img_end.read())
TASKS[tid] = {"status":"queued","progress":0,"log":"","video_path":None,"thumbnail_path":None,"video_info":None}
background_tasks.add_task(run_generation_task, task_id=tid, prompt=prompt, img1_path=i1, img2_path=i2, duration=duration, steps=steps, frame_mult=frame_mult)
return JSONResponse({"task_id": tid})
@app.get("/api/task/{task_id}")
async def api_task(task_id: str):
t = TASKS.get(task_id)
if not t: return JSONResponse({"error":"Not found"}, status_code=404)
r = {"status":t["status"],"progress":t["progress"],"log":t["log"]}
if t["status"]=="complete" and t.get("video_path"):
r["video_url"] = f"/api/video/{t['video_path']}"
if t.get("thumbnail_path"): r["thumbnail_url"] = f"/api/thumbnail/{t['thumbnail_path']}"
if t.get("video_info"): r["video_info"] = t["video_info"]
return JSONResponse(r)
@app.get("/api/video/{fn}")
async def vid(fn: str, request: Request):
p = OUTPUT_DIR/fn
if not p.exists(): return JSONResponse({"error":"Not found"}, status_code=404)
return stream_video(str(p), request)
@app.get("/api/thumbnail/{fn}")
async def thumb(fn: str):
p = OUTPUT_DIR/fn
if not p.exists(): return JSONResponse({"error":"Not found"}, status_code=404)
return FileResponse(str(p), media_type="image/jpeg", headers={"Cache-Control":"public, max-age=86400"})
@app.get("/api/stats")
async def stats():
return JSONResponse({
"mode":"anonymous","tor_ip":_last_tor_ip["ip"],"tor_ready":_tor_ready,
"quota_per_ip":120,"total_tokens":"infinity","active_tokens":"∞",
"cooldown_tokens":0,"version":"1.5",
})
@app.get("/api/tor-ip")
async def torip():
_last_tor_ip["ip"] = "unknown"
return JSONResponse({"ip": get_tor_ip()})
@app.post("/api/tor-rotate")
async def torrot():
nip = rotate_tor_circuit()
return JSONResponse({"new_ip": nip, "success": nip != "rotation_failed"})