import asyncio import json import httpx import logging import time import uuid import os from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel # --- تنظیمات لاگ حرفه‌ای --- logging.basicConfig( format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO, datefmt='%H:%M:%S' ) logger = logging.getLogger(__name__) app = FastAPI() # آدرس سرور هوش مصنوعی WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api" # تنظیمات کلاینت برای عملکرد بالا (بدون محدودیت اتصال) HighPerformanceLimits = httpx.Limits(max_keepalive_connections=None, max_connections=None) class TaskPayload(BaseModel): job_id: str image_url: str audio_url: str resolution: str callback_url: str async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str): """ دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی با تایم‌اوت نامحدود و سیستم تلاش مجدد """ filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}" file_bytes = None # 1. مرحله دانلود از سرور خودمان (Manager) # تایم‌اوت نامحدود (timeout=None) for attempt in range(3): try: resp = await client.get(file_url, timeout=None) if resp.status_code == 200: file_bytes = resp.content break else: logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}") except Exception as e: logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}") await asyncio.sleep(2) if not file_bytes: raise Exception("Fatal Error: Could not download file from Manager.") # 2. مرحله آپلود به سرور هوش مصنوعی (Wan) # تایم‌اوت نامحدود (timeout=None) for attempt in range(5): try: files = {'files': (filename, file_bytes)} # این خط مهمترین تغییر است: timeout=None برای آپلود فایل‌های سنگین wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None) if wan_resp.status_code == 200: return wan_resp.json()[0] logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}") except Exception as e: logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}") await asyncio.sleep(3) raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.") async def process_single_job(payload: TaskPayload): """ تابع اصلی پردازش با زمان انتظار ۲۴ ساعته """ logger.info(f"🟢 [START] Job: {payload.job_id}") # تنظیم تایم‌اوت کلی کلاینت روی None (نامحدود) async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client: try: # 1. انتقال فایل‌ها logger.info(f"📥 [{payload.job_id}] Transferring files...") img_remote = await upload_to_wan(client, payload.image_url, payload.job_id) aud_remote = await upload_to_wan(client, payload.audio_url, payload.job_id) # 2. ثبت درخواست (Predict) req_data = { "data": [ {"path": img_remote, "meta": {"_type": "gradio.FileData"}}, {"path": aud_remote, "meta": {"_type": "gradio.FileData"}}, payload.resolution ] } logger.info(f"🚀 [{payload.job_id}] Sending prediction request...") # درخواست ساخت با تایم‌اوت نامحدود predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None) if predict_resp.status_code != 200: raise Exception(f"Prediction Request Failed: {predict_resp.text}") event_id = predict_resp.json().get("event_id") logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}") # 3. حلقه انتظار هوشمند (Polling/Streaming Loop) start_time = time.time() final_video_url = None # انتظار تا ۲۴ ساعت (86400 ثانیه) while time.time() - start_time < 86400: try: stream_url = f"{WAN_API_BASE}/call/predict/{event_id}" # اتصال به استریم # اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود # این تایم‌اوت باعث کنسل شدن کار نمی‌شود، فقط اتصال را رفرش می‌کند async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response: async for line in response.aiter_lines(): if not line.strip(): continue if line.startswith("data: "): try: data = json.loads(line[6:]) if isinstance(data, list) and len(data) > 0: res = data[0] found_url = None # جستجوی لینک ویدیو در فرمت‌های مختلف if isinstance(res, dict): found_url = res.get("video", {}).get("url") if not found_url: found_url = res.get("url") if not found_url and "name" in res: found_url = f"/file={res['name']}" elif isinstance(res, str) and ("/file=" in res or ".mp4" in res): found_url = res if found_url: final_video_url = found_url break except Exception: pass if final_video_url: break await asyncio.sleep(5) except Exception as e: # اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن # چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمی‌شوند await asyncio.sleep(5) # 4. پایان و ارسال نتیجه if final_video_url: if final_video_url.startswith("/"): final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}" final_video_url = final_video_url.replace("//file=", "/file=") logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready") # ارسال نتیجه به سرور مدیر await client.post(payload.callback_url, json={ "job_id": payload.job_id, "status": "COMPLETED", "video_url": final_video_url }) else: raise Exception("Timeout: Video processing took too long (>24 hours).") except Exception as e: logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}") # تلاش برای ارسال خطا به مدیر try: await client.post(payload.callback_url, json={ "job_id": payload.job_id, "status": "FAILED", "message": str(e) }) except: pass @app.post("/process") async def accept_task(payload: TaskPayload): """ دریافت درخواست و شروع پردازش در پس‌زمینه """ asyncio.create_task(process_single_job(payload)) return {"status": "accepted", "message": "Task started in background"} @app.get("/") async def root(): return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"}