Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import httpx | |
| import logging | |
| import time | |
| import uuid | |
| import os | |
| from fastapi import FastAPI, BackgroundTasks | |
| from pydantic import BaseModel | |
| # --- تنظیمات لاگ حرفهای --- | |
| logging.basicConfig( | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| level=logging.INFO, | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI() | |
| # آدرس سرور هوش مصنوعی | |
| WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api" | |
| # تنظیمات کلاینت برای عملکرد بالا (بدون محدودیت اتصال) | |
| HighPerformanceLimits = httpx.Limits(max_keepalive_connections=None, max_connections=None) | |
| class TaskPayload(BaseModel): | |
| job_id: str | |
| image_url: str | |
| audio_url: str | |
| resolution: str | |
| callback_url: str | |
| async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str): | |
| """ | |
| دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی | |
| با تایماوت نامحدود و سیستم تلاش مجدد | |
| """ | |
| filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}" | |
| file_bytes = None | |
| # 1. مرحله دانلود از سرور خودمان (Manager) | |
| # تایماوت نامحدود (timeout=None) | |
| for attempt in range(3): | |
| try: | |
| resp = await client.get(file_url, timeout=None) | |
| if resp.status_code == 200: | |
| file_bytes = resp.content | |
| break | |
| else: | |
| logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}") | |
| await asyncio.sleep(2) | |
| if not file_bytes: | |
| raise Exception("Fatal Error: Could not download file from Manager.") | |
| # 2. مرحله آپلود به سرور هوش مصنوعی (Wan) | |
| # تایماوت نامحدود (timeout=None) | |
| for attempt in range(5): | |
| try: | |
| files = {'files': (filename, file_bytes)} | |
| # این خط مهمترین تغییر است: timeout=None برای آپلود فایلهای سنگین | |
| wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None) | |
| if wan_resp.status_code == 200: | |
| return wan_resp.json()[0] | |
| logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}") | |
| await asyncio.sleep(3) | |
| raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.") | |
| async def process_single_job(payload: TaskPayload): | |
| """ | |
| تابع اصلی پردازش با زمان انتظار ۲۴ ساعته | |
| """ | |
| logger.info(f"🟢 [START] Job: {payload.job_id}") | |
| # تنظیم تایماوت کلی کلاینت روی None (نامحدود) | |
| async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client: | |
| try: | |
| # 1. انتقال فایلها | |
| logger.info(f"📥 [{payload.job_id}] Transferring files...") | |
| img_remote = await upload_to_wan(client, payload.image_url, payload.job_id) | |
| aud_remote = await upload_to_wan(client, payload.audio_url, payload.job_id) | |
| # 2. ثبت درخواست (Predict) | |
| req_data = { | |
| "data": [ | |
| {"path": img_remote, "meta": {"_type": "gradio.FileData"}}, | |
| {"path": aud_remote, "meta": {"_type": "gradio.FileData"}}, | |
| payload.resolution | |
| ] | |
| } | |
| logger.info(f"🚀 [{payload.job_id}] Sending prediction request...") | |
| # درخواست ساخت با تایماوت نامحدود | |
| predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None) | |
| if predict_resp.status_code != 200: | |
| raise Exception(f"Prediction Request Failed: {predict_resp.text}") | |
| event_id = predict_resp.json().get("event_id") | |
| logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}") | |
| # 3. حلقه انتظار هوشمند (Polling/Streaming Loop) | |
| start_time = time.time() | |
| final_video_url = None | |
| # انتظار تا ۲۴ ساعت (86400 ثانیه) | |
| while time.time() - start_time < 86400: | |
| try: | |
| stream_url = f"{WAN_API_BASE}/call/predict/{event_id}" | |
| # اتصال به استریم | |
| # اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود | |
| # این تایماوت باعث کنسل شدن کار نمیشود، فقط اتصال را رفرش میکند | |
| async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response: | |
| async for line in response.aiter_lines(): | |
| if not line.strip(): continue | |
| if line.startswith("data: "): | |
| try: | |
| data = json.loads(line[6:]) | |
| if isinstance(data, list) and len(data) > 0: | |
| res = data[0] | |
| found_url = None | |
| # جستجوی لینک ویدیو در فرمتهای مختلف | |
| if isinstance(res, dict): | |
| found_url = res.get("video", {}).get("url") | |
| if not found_url: found_url = res.get("url") | |
| if not found_url and "name" in res: found_url = f"/file={res['name']}" | |
| elif isinstance(res, str) and ("/file=" in res or ".mp4" in res): | |
| found_url = res | |
| if found_url: | |
| final_video_url = found_url | |
| break | |
| except Exception: | |
| pass | |
| if final_video_url: | |
| break | |
| await asyncio.sleep(5) | |
| except Exception as e: | |
| # اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن | |
| # چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمیشوند | |
| await asyncio.sleep(5) | |
| # 4. پایان و ارسال نتیجه | |
| if final_video_url: | |
| if final_video_url.startswith("/"): | |
| final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}" | |
| final_video_url = final_video_url.replace("//file=", "/file=") | |
| logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready") | |
| # ارسال نتیجه به سرور مدیر | |
| await client.post(payload.callback_url, json={ | |
| "job_id": payload.job_id, | |
| "status": "COMPLETED", | |
| "video_url": final_video_url | |
| }) | |
| else: | |
| raise Exception("Timeout: Video processing took too long (>24 hours).") | |
| except Exception as e: | |
| logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}") | |
| # تلاش برای ارسال خطا به مدیر | |
| try: | |
| await client.post(payload.callback_url, json={ | |
| "job_id": payload.job_id, | |
| "status": "FAILED", | |
| "message": str(e) | |
| }) | |
| except: | |
| pass | |
| async def accept_task(payload: TaskPayload): | |
| """ | |
| دریافت درخواست و شروع پردازش در پسزمینه | |
| """ | |
| asyncio.create_task(process_single_job(payload)) | |
| return {"status": "accepted", "message": "Task started in background"} | |
| async def root(): | |
| return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"} |