# app.py (Worker Space) - نسخه نهایی با مقاومت در برابر NoneType from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import requests import json import uuid import threading import os import shutil import time from sseclient import SSEClient from fastapi.staticfiles import StaticFiles # --- تنظیمات اصلی --- TARGET_SPACE_URL = "https://wan-ai-wan2-2-animate.hf.space/" # با توجه به لاگ‌ها، تعداد تلاش را بیشتر می‌کنیم MAX_RETRIES = 100 RETRY_DELAY = 20 GENERIC_ERROR_MESSAGE = "خطا در ارتباط با سرور اصلی پردازش. لطفاً تصویر و ویدیو با چهره اضافه کرده مجدداً تلاش کنید." # --- ساختار داده و دایرکتوری موقت --- jobs = {} lock = threading.Lock() TEMP_DIR = "/tmp/worker_files" os.makedirs(TEMP_DIR, exist_ok=True) app = FastAPI() # --- فعال‌سازی CORS --- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- تابع کمکی برای پاکسازی خطا و ثبت لاگ --- def handle_error(job_id, exception, context=""): full_error = f"Context: {context} - Exception: {str(exception)}" print(full_error) with lock: jobs[job_id]["status"] = "error" jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE # --- تابع کمکی برای آپلود فایل --- def upload_single_file_patiently(file_path: str, job_id: str) -> str: for attempt in range(MAX_RETRIES): file_to_upload = None try: with lock: jobs[job_id]["status"] = f"تلاش {attempt + 1}/{MAX_RETRIES} برای آپلود: {os.path.basename(file_path)}..." file_to_upload = open(file_path, 'rb') files_payload = {'files': file_to_upload} upload_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/upload", files=files_payload, timeout=600) upload_res.raise_for_status() server_path = upload_res.json()[0] return server_path except Exception as e: print(f"خطا در آپلود (تلاش {attempt + 1}): {e}") if attempt < MAX_RETRIES - 1: time.sleep(RETRY_DELAY) else: raise e finally: if file_to_upload: file_to_upload.close() # --- تابع اصلی در پس‌زمینه --- def process_job_in_background(job_id, image_path, video_path, motion_type, style_type): try: image_server_path = upload_single_file_patiently(image_path, job_id) video_server_path = upload_single_file_patiently(video_path, job_id) with lock: jobs[job_id]["status"] = "در حال ارسال درخواست به صف..." image_payload = {"path": image_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={image_server_path}", "meta": {"_type": "gradio.FileData"}} video_payload = {"video": {"path": video_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={video_server_path}", "meta": {"_type": "gradio.FileData"}}, "subtitles": None} session_hash = str(uuid.uuid4()) payload = {"data": [image_payload, video_payload, motion_type, style_type], "fn_index": 0, "session_hash": session_hash} last_exception = None for attempt in range(MAX_RETRIES): try: join_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/queue/join", json=payload, timeout=180) join_res.raise_for_status() last_exception = None break except Exception as e: last_exception = e time.sleep(RETRY_DELAY) if last_exception: raise last_exception with lock: jobs[job_id]["status"] = "منتظر در صف..." response = requests.get(f"{TARGET_SPACE_URL}gradio_api/queue/data?session_hash={session_hash}", stream=True, timeout=14400) client = SSEClient(response) for event in client.events(): data = json.loads(event.data) msg = data.get("msg") if msg == "estimation": status_text = f"در صف انتظار... موقعیت: {data.get('rank', 0) + 1}/{data.get('queue_size', '?')}" with lock: jobs[job_id]["status"] = status_text elif msg == "process_starts": with lock: jobs[job_id]["status"] = "پردازش شروع شد..." elif msg == "process_completed": if data.get("success"): try: output_data = data.get("output", {}).get("data", []) if not output_data: raise ValueError("داده‌های خروجی در پاسخ یافت نشد.") # <<< شروع تغییر: مدیریت خطای NoneType >>> video_info = output_data[0].get("video") # دیگر مقدار پیش‌فرض {} نمی‌دهیم if video_info is None: raise ValueError("ساختار ویدیو در پاسخ سرور اصلی یافت نشد (مقدار None دریافت شد).") video_url = video_info.get("url") if not video_url: raise ValueError("URL ویدیو در ساختار پاسخ یافت نشد.") # <<< پایان تغییر >>> video_content = None download_error = None for download_attempt in range(5): try: with lock: jobs[job_id]["status"] = f"در حال دانلود نتیجه نهایی (تلاش {download_attempt + 1}/5)..." video_content_response = requests.get(video_url, timeout=900) video_content_response.raise_for_status() video_content = video_content_response.content download_error = None break except Exception as e: download_error = e print(f"خطا در دانلود ویدیو (تلاش {download_attempt + 1}): {e}") time.sleep(15) if video_content is None: raise Exception(f"دانلود ویدیو پس از 5 بار تلاش ناموفق بود. آخرین خطا: {download_error}") final_video_path = os.path.join(TEMP_DIR, f"{job_id}.mp4") with open(final_video_path, 'wb') as f: f.write(video_content) with lock: jobs[job_id]["status"] = "completed" jobs[job_id]["result"] = f"/videos/{job_id}.mp4" except Exception as e: handle_error(job_id, e, "Processing successful response") else: error_msg = data.get("output", {}).get("error", "خطای نامشخص از سرور اصلی") with lock: jobs[job_id]["status"] = "error" jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE print(f"Error from main server: {error_msg}") break except Exception as e: handle_error(job_id, e, "Main processing block") finally: if os.path.exists(image_path): os.remove(image_path) if video_path and os.path.exists(video_path): os.remove(video_path) # --- API Endpoints --- @app.post("/submit_new_job") async def submit_new_job_api(background_tasks: BackgroundTasks, image_file: UploadFile = File(...), video_file: UploadFile = File(...), motion: str = Form(...), style: str = Form(...)): job_id = str(uuid.uuid4()) image_path = os.path.join(TEMP_DIR, f"{job_id}_{image_file.filename}") with open(image_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer) video_path = os.path.join(TEMP_DIR, f"{job_id}_{video_file.filename}") with open(video_path, "wb") as buffer: shutil.copyfileobj(video_file.file, buffer) with lock: jobs[job_id] = {"status": "در صف انتظار...", "result": None} background_tasks.add_task(process_job_in_background, job_id, image_path, video_path, motion, style) return {"job_id": job_id} class JobCheckRequest(BaseModel): job_id: str @app.post("/check_job_status") async def check_job_status_api(request: JobCheckRequest): job_id = request.job_id with lock: job = jobs.get(job_id, {"status": "not_found", "result": None}) return {"status": job["status"], "result": job["result"]} app.mount("/videos", StaticFiles(directory=TEMP_DIR), name="videos")