Avatarworker9 / app.py
Ezmary's picture
Update app.py
0262a28 verified
import asyncio
import json
import httpx
import logging
import time
import uuid
import os
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
# --- تنظیمات لاگ حرفه‌ای ---
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s',
level=logging.INFO,
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
app = FastAPI()
# آدرس سرور هوش مصنوعی
WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api"
# تنظیمات کلاینت برای عملکرد بالا (بدون محدودیت اتصال)
HighPerformanceLimits = httpx.Limits(max_keepalive_connections=None, max_connections=None)
class TaskPayload(BaseModel):
job_id: str
image_url: str
audio_url: str
resolution: str
callback_url: str
async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str):
"""
دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی
با تایم‌اوت نامحدود و سیستم تلاش مجدد
"""
filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}"
file_bytes = None
# 1. مرحله دانلود از سرور خودمان (Manager)
# تایم‌اوت نامحدود (timeout=None)
for attempt in range(3):
try:
resp = await client.get(file_url, timeout=None)
if resp.status_code == 200:
file_bytes = resp.content
break
else:
logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}")
except Exception as e:
logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}")
await asyncio.sleep(2)
if not file_bytes:
raise Exception("Fatal Error: Could not download file from Manager.")
# 2. مرحله آپلود به سرور هوش مصنوعی (Wan)
# تایم‌اوت نامحدود (timeout=None)
for attempt in range(5):
try:
files = {'files': (filename, file_bytes)}
# این خط مهمترین تغییر است: timeout=None برای آپلود فایل‌های سنگین
wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None)
if wan_resp.status_code == 200:
return wan_resp.json()[0]
logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}")
except Exception as e:
logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}")
await asyncio.sleep(3)
raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.")
async def process_single_job(payload: TaskPayload):
"""
تابع اصلی پردازش با زمان انتظار ۲۴ ساعته
"""
logger.info(f"🟢 [START] Job: {payload.job_id}")
# تنظیم تایم‌اوت کلی کلاینت روی None (نامحدود)
async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client:
try:
# 1. انتقال فایل‌ها
logger.info(f"📥 [{payload.job_id}] Transferring files...")
img_remote = await upload_to_wan(client, payload.image_url, payload.job_id)
aud_remote = await upload_to_wan(client, payload.audio_url, payload.job_id)
# 2. ثبت درخواست (Predict)
req_data = {
"data": [
{"path": img_remote, "meta": {"_type": "gradio.FileData"}},
{"path": aud_remote, "meta": {"_type": "gradio.FileData"}},
payload.resolution
]
}
logger.info(f"🚀 [{payload.job_id}] Sending prediction request...")
# درخواست ساخت با تایم‌اوت نامحدود
predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None)
if predict_resp.status_code != 200:
raise Exception(f"Prediction Request Failed: {predict_resp.text}")
event_id = predict_resp.json().get("event_id")
logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}")
# 3. حلقه انتظار هوشمند (Polling/Streaming Loop)
start_time = time.time()
final_video_url = None
# انتظار تا ۲۴ ساعت (86400 ثانیه)
while time.time() - start_time < 86400:
try:
stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
# اتصال به استریم
# اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود
# این تایم‌اوت باعث کنسل شدن کار نمی‌شود، فقط اتصال را رفرش می‌کند
async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response:
async for line in response.aiter_lines():
if not line.strip(): continue
if line.startswith("data: "):
try:
data = json.loads(line[6:])
if isinstance(data, list) and len(data) > 0:
res = data[0]
found_url = None
# جستجوی لینک ویدیو در فرمت‌های مختلف
if isinstance(res, dict):
found_url = res.get("video", {}).get("url")
if not found_url: found_url = res.get("url")
if not found_url and "name" in res: found_url = f"/file={res['name']}"
elif isinstance(res, str) and ("/file=" in res or ".mp4" in res):
found_url = res
if found_url:
final_video_url = found_url
break
except Exception:
pass
if final_video_url:
break
await asyncio.sleep(5)
except Exception as e:
# اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن
# چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمی‌شوند
await asyncio.sleep(5)
# 4. پایان و ارسال نتیجه
if final_video_url:
if final_video_url.startswith("/"):
final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
final_video_url = final_video_url.replace("//file=", "/file=")
logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready")
# ارسال نتیجه به سرور مدیر
await client.post(payload.callback_url, json={
"job_id": payload.job_id,
"status": "COMPLETED",
"video_url": final_video_url
})
else:
raise Exception("Timeout: Video processing took too long (>24 hours).")
except Exception as e:
logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}")
# تلاش برای ارسال خطا به مدیر
try:
await client.post(payload.callback_url, json={
"job_id": payload.job_id,
"status": "FAILED",
"message": str(e)
})
except:
pass
@app.post("/process")
async def accept_task(payload: TaskPayload):
"""
دریافت درخواست و شروع پردازش در پس‌زمینه
"""
asyncio.create_task(process_single_job(payload))
return {"status": "accepted", "message": "Task started in background"}
@app.get("/")
async def root():
return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"}