Spaces:
Running
Running
| # app.py (Worker Space) - نسخه نهایی با مقاومت در برابر NoneType | |
| from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import requests | |
| import json | |
| import uuid | |
| import threading | |
| import os | |
| import shutil | |
| import time | |
| from sseclient import SSEClient | |
| from fastapi.staticfiles import StaticFiles | |
| # --- تنظیمات اصلی --- | |
| TARGET_SPACE_URL = "https://wan-ai-wan2-2-animate.hf.space/" | |
| # با توجه به لاگها، تعداد تلاش را بیشتر میکنیم | |
| MAX_RETRIES = 100 | |
| RETRY_DELAY = 20 | |
| GENERIC_ERROR_MESSAGE = "خطا در ارتباط با سرور اصلی پردازش. لطفاً تصویر و ویدیو با چهره اضافه کرده مجدداً تلاش کنید." | |
| # --- ساختار داده و دایرکتوری موقت --- | |
| jobs = {} | |
| lock = threading.Lock() | |
| TEMP_DIR = "/tmp/worker_files" | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| app = FastAPI() | |
| # --- فعالسازی CORS --- | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- تابع کمکی برای پاکسازی خطا و ثبت لاگ --- | |
| def handle_error(job_id, exception, context=""): | |
| full_error = f"Context: {context} - Exception: {str(exception)}" | |
| print(full_error) | |
| with lock: | |
| jobs[job_id]["status"] = "error" | |
| jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE | |
| # --- تابع کمکی برای آپلود فایل --- | |
| def upload_single_file_patiently(file_path: str, job_id: str) -> str: | |
| for attempt in range(MAX_RETRIES): | |
| file_to_upload = None | |
| try: | |
| with lock: | |
| jobs[job_id]["status"] = f"تلاش {attempt + 1}/{MAX_RETRIES} برای آپلود: {os.path.basename(file_path)}..." | |
| file_to_upload = open(file_path, 'rb') | |
| files_payload = {'files': file_to_upload} | |
| upload_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/upload", files=files_payload, timeout=600) | |
| upload_res.raise_for_status() | |
| server_path = upload_res.json()[0] | |
| return server_path | |
| except Exception as e: | |
| print(f"خطا در آپلود (تلاش {attempt + 1}): {e}") | |
| if attempt < MAX_RETRIES - 1: | |
| time.sleep(RETRY_DELAY) | |
| else: | |
| raise e | |
| finally: | |
| if file_to_upload: | |
| file_to_upload.close() | |
| # --- تابع اصلی در پسزمینه --- | |
| def process_job_in_background(job_id, image_path, video_path, motion_type, style_type): | |
| try: | |
| image_server_path = upload_single_file_patiently(image_path, job_id) | |
| video_server_path = upload_single_file_patiently(video_path, job_id) | |
| with lock: jobs[job_id]["status"] = "در حال ارسال درخواست به صف..." | |
| image_payload = {"path": image_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={image_server_path}", "meta": {"_type": "gradio.FileData"}} | |
| video_payload = {"video": {"path": video_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={video_server_path}", "meta": {"_type": "gradio.FileData"}}, "subtitles": None} | |
| session_hash = str(uuid.uuid4()) | |
| payload = {"data": [image_payload, video_payload, motion_type, style_type], "fn_index": 0, "session_hash": session_hash} | |
| last_exception = None | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| join_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/queue/join", json=payload, timeout=180) | |
| join_res.raise_for_status() | |
| last_exception = None | |
| break | |
| except Exception as e: | |
| last_exception = e | |
| time.sleep(RETRY_DELAY) | |
| if last_exception: raise last_exception | |
| with lock: jobs[job_id]["status"] = "منتظر در صف..." | |
| response = requests.get(f"{TARGET_SPACE_URL}gradio_api/queue/data?session_hash={session_hash}", stream=True, timeout=14400) | |
| client = SSEClient(response) | |
| for event in client.events(): | |
| data = json.loads(event.data) | |
| msg = data.get("msg") | |
| if msg == "estimation": | |
| status_text = f"در صف انتظار... موقعیت: {data.get('rank', 0) + 1}/{data.get('queue_size', '?')}" | |
| with lock: jobs[job_id]["status"] = status_text | |
| elif msg == "process_starts": | |
| with lock: jobs[job_id]["status"] = "پردازش شروع شد..." | |
| elif msg == "process_completed": | |
| if data.get("success"): | |
| try: | |
| output_data = data.get("output", {}).get("data", []) | |
| if not output_data: raise ValueError("دادههای خروجی در پاسخ یافت نشد.") | |
| # <<< شروع تغییر: مدیریت خطای NoneType >>> | |
| video_info = output_data[0].get("video") # دیگر مقدار پیشفرض {} نمیدهیم | |
| if video_info is None: | |
| raise ValueError("ساختار ویدیو در پاسخ سرور اصلی یافت نشد (مقدار None دریافت شد).") | |
| video_url = video_info.get("url") | |
| if not video_url: raise ValueError("URL ویدیو در ساختار پاسخ یافت نشد.") | |
| # <<< پایان تغییر >>> | |
| video_content = None | |
| download_error = None | |
| for download_attempt in range(5): | |
| try: | |
| with lock: jobs[job_id]["status"] = f"در حال دانلود نتیجه نهایی (تلاش {download_attempt + 1}/5)..." | |
| video_content_response = requests.get(video_url, timeout=900) | |
| video_content_response.raise_for_status() | |
| video_content = video_content_response.content | |
| download_error = None | |
| break | |
| except Exception as e: | |
| download_error = e | |
| print(f"خطا در دانلود ویدیو (تلاش {download_attempt + 1}): {e}") | |
| time.sleep(15) | |
| if video_content is None: | |
| raise Exception(f"دانلود ویدیو پس از 5 بار تلاش ناموفق بود. آخرین خطا: {download_error}") | |
| final_video_path = os.path.join(TEMP_DIR, f"{job_id}.mp4") | |
| with open(final_video_path, 'wb') as f: f.write(video_content) | |
| with lock: | |
| jobs[job_id]["status"] = "completed" | |
| jobs[job_id]["result"] = f"/videos/{job_id}.mp4" | |
| except Exception as e: | |
| handle_error(job_id, e, "Processing successful response") | |
| else: | |
| error_msg = data.get("output", {}).get("error", "خطای نامشخص از سرور اصلی") | |
| with lock: | |
| jobs[job_id]["status"] = "error" | |
| jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE | |
| print(f"Error from main server: {error_msg}") | |
| break | |
| except Exception as e: | |
| handle_error(job_id, e, "Main processing block") | |
| finally: | |
| if os.path.exists(image_path): os.remove(image_path) | |
| if video_path and os.path.exists(video_path): os.remove(video_path) | |
| # --- API Endpoints --- | |
| async def submit_new_job_api(background_tasks: BackgroundTasks, image_file: UploadFile = File(...), video_file: UploadFile = File(...), motion: str = Form(...), style: str = Form(...)): | |
| job_id = str(uuid.uuid4()) | |
| image_path = os.path.join(TEMP_DIR, f"{job_id}_{image_file.filename}") | |
| with open(image_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer) | |
| video_path = os.path.join(TEMP_DIR, f"{job_id}_{video_file.filename}") | |
| with open(video_path, "wb") as buffer: shutil.copyfileobj(video_file.file, buffer) | |
| with lock: jobs[job_id] = {"status": "در صف انتظار...", "result": None} | |
| background_tasks.add_task(process_job_in_background, job_id, image_path, video_path, motion, style) | |
| return {"job_id": job_id} | |
| class JobCheckRequest(BaseModel): | |
| job_id: str | |
| async def check_job_status_api(request: JobCheckRequest): | |
| job_id = request.job_id | |
| with lock: | |
| job = jobs.get(job_id, {"status": "not_found", "result": None}) | |
| return {"status": job["status"], "result": job["result"]} | |
| app.mount("/videos", StaticFiles(directory=TEMP_DIR), name="videos") |