Spaces:
Sleeping
Sleeping
File size: 9,109 Bytes
8a7d386 e256185 b838241 ac4201c 6d5e5ff b838241 79b0e4a ac4201c b838241 e256185 ac4201c b838241 ac4201c b838241 6d5e5ff 0262a28 6d5e5ff b838241 6d5e5ff 0262a28 6d5e5ff 0262a28 6d5e5ff 0262a28 6d5e5ff 0262a28 ac4201c 6d5e5ff ac4201c 6d5e5ff 0262a28 6d5e5ff 0262a28 6d5e5ff 0262a28 6d5e5ff 0262a28 6d5e5ff 8a7d386 b838241 6d5e5ff 0262a28 6d5e5ff b838241 79b0e4a 0262a28 b838241 e256185 6d5e5ff b838241 8a7d386 e256185 ac4201c e256185 ac4201c 6d5e5ff 0262a28 b838241 e256185 6d5e5ff 79b0e4a e256185 6d5e5ff e256185 6d5e5ff e256185 0262a28 8a7d386 e256185 6d5e5ff 0262a28 e256185 8a7d386 b838241 e256185 b838241 0262a28 b838241 6d5e5ff b838241 e256185 0262a28 6d5e5ff 0262a28 e256185 0262a28 b838241 e256185 0262a28 b838241 e256185 6d5e5ff e256185 6d5e5ff e256185 79b0e4a b838241 79b0e4a 0262a28 ac4201c e256185 ac4201c 0262a28 ac4201c e256185 b838241 6d5e5ff e256185 ac4201c b838241 0262a28 b838241 6d5e5ff e256185 0262a28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
import asyncio
import json
import httpx
import logging
import time
import uuid
import os
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
# --- تنظیمات لاگ حرفهای ---
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
app = FastAPI()
# آدرس سرور هوش مصنوعی
WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api"
# تنظیمات کلاینت برای عملکرد بالا (بدون محدودیت اتصال)
HighPerformanceLimits = httpx.Limits(max_keepalive_connections=None, max_connections=None)
class TaskPayload(BaseModel):
job_id: str
image_url: str
audio_url: str
resolution: str
callback_url: str
async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str):
"""
دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی
با تایماوت نامحدود و سیستم تلاش مجدد
"""
filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}"
file_bytes = None
# 1. مرحله دانلود از سرور خودمان (Manager)
# تایماوت نامحدود (timeout=None)
for attempt in range(3):
try:
resp = await client.get(file_url, timeout=None)
if resp.status_code == 200:
file_bytes = resp.content
break
else:
logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}")
except Exception as e:
logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}")
await asyncio.sleep(2)
if not file_bytes:
raise Exception("Fatal Error: Could not download file from Manager.")
# 2. مرحله آپلود به سرور هوش مصنوعی (Wan)
# تایماوت نامحدود (timeout=None)
for attempt in range(5):
try:
files = {'files': (filename, file_bytes)}
# این خط مهمترین تغییر است: timeout=None برای آپلود فایلهای سنگین
wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None)
if wan_resp.status_code == 200:
return wan_resp.json()[0]
logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}")
except Exception as e:
logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}")
await asyncio.sleep(3)
raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.")
async def process_single_job(payload: TaskPayload):
"""
تابع اصلی پردازش با زمان انتظار ۲۴ ساعته
"""
logger.info(f"🟢 [START] Job: {payload.job_id}")
# تنظیم تایماوت کلی کلاینت روی None (نامحدود)
async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client:
try:
# 1. انتقال فایلها
logger.info(f"📥 [{payload.job_id}] Transferring files...")
img_remote = await upload_to_wan(client, payload.image_url, payload.job_id)
aud_remote = await upload_to_wan(client, payload.audio_url, payload.job_id)
# 2. ثبت درخواست (Predict)
req_data = {
"data": [
{"path": img_remote, "meta": {"_type": "gradio.FileData"}},
{"path": aud_remote, "meta": {"_type": "gradio.FileData"}},
payload.resolution
]
}
logger.info(f"🚀 [{payload.job_id}] Sending prediction request...")
# درخواست ساخت با تایماوت نامحدود
predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None)
if predict_resp.status_code != 200:
raise Exception(f"Prediction Request Failed: {predict_resp.text}")
event_id = predict_resp.json().get("event_id")
logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}")
# 3. حلقه انتظار هوشمند (Polling/Streaming Loop)
start_time = time.time()
final_video_url = None
# انتظار تا ۲۴ ساعت (86400 ثانیه)
while time.time() - start_time < 86400:
try:
stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
# اتصال به استریم
# اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود
# این تایماوت باعث کنسل شدن کار نمیشود، فقط اتصال را رفرش میکند
async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response:
async for line in response.aiter_lines():
if not line.strip(): continue
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if isinstance(data, list) and len(data) > 0:
res = data[0]
found_url = None
# جستجوی لینک ویدیو در فرمتهای مختلف
if isinstance(res, dict):
found_url = res.get("video", {}).get("url")
if not found_url: found_url = res.get("url")
if not found_url and "name" in res: found_url = f"/file={res['name']}"
elif isinstance(res, str) and ("/file=" in res or ".mp4" in res):
found_url = res
if found_url:
final_video_url = found_url
break
except Exception:
pass
if final_video_url:
break
await asyncio.sleep(5)
except Exception as e:
# اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن
# چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمیشوند
await asyncio.sleep(5)
# 4. پایان و ارسال نتیجه
if final_video_url:
if final_video_url.startswith("/"):
final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
final_video_url = final_video_url.replace("//file=", "/file=")
logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready")
# ارسال نتیجه به سرور مدیر
await client.post(payload.callback_url, json={
"job_id": payload.job_id,
"status": "COMPLETED",
"video_url": final_video_url
})
else:
raise Exception("Timeout: Video processing took too long (>24 hours).")
except Exception as e:
logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}")
# تلاش برای ارسال خطا به مدیر
try:
await client.post(payload.callback_url, json={
"job_id": payload.job_id,
"status": "FAILED",
"message": str(e)
})
except:
pass
@app.post("/process")
async def accept_task(payload: TaskPayload):
"""
دریافت درخواست و شروع پردازش در پسزمینه
"""
asyncio.create_task(process_single_job(payload))
return {"status": "accepted", "message": "Task started in background"}
@app.get("/")
async def root():
return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"} |