Animat-worker3 / app.py
Ezmary's picture
Update app.py
244d068 verified
# app.py (Worker Space) - نسخه نهایی با مقاومت در برابر NoneType
from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import json
import uuid
import threading
import os
import shutil
import time
from sseclient import SSEClient
from fastapi.staticfiles import StaticFiles
# --- تنظیمات اصلی ---
TARGET_SPACE_URL = "https://wan-ai-wan2-2-animate.hf.space/"
# با توجه به لاگ‌ها، تعداد تلاش را بیشتر می‌کنیم
MAX_RETRIES = 100
RETRY_DELAY = 20
GENERIC_ERROR_MESSAGE = "خطا در ارتباط با سرور اصلی پردازش. لطفاً تصویر و ویدیو با چهره اضافه کرده مجدداً تلاش کنید."
# --- ساختار داده و دایرکتوری موقت ---
jobs = {}
lock = threading.Lock()
TEMP_DIR = "/tmp/worker_files"
os.makedirs(TEMP_DIR, exist_ok=True)
app = FastAPI()
# --- فعال‌سازی CORS ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- تابع کمکی برای پاکسازی خطا و ثبت لاگ ---
def handle_error(job_id, exception, context=""):
full_error = f"Context: {context} - Exception: {str(exception)}"
print(full_error)
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE
# --- تابع کمکی برای آپلود فایل ---
def upload_single_file_patiently(file_path: str, job_id: str) -> str:
for attempt in range(MAX_RETRIES):
file_to_upload = None
try:
with lock:
jobs[job_id]["status"] = f"تلاش {attempt + 1}/{MAX_RETRIES} برای آپلود: {os.path.basename(file_path)}..."
file_to_upload = open(file_path, 'rb')
files_payload = {'files': file_to_upload}
upload_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/upload", files=files_payload, timeout=600)
upload_res.raise_for_status()
server_path = upload_res.json()[0]
return server_path
except Exception as e:
print(f"خطا در آپلود (تلاش {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
else:
raise e
finally:
if file_to_upload:
file_to_upload.close()
# --- تابع اصلی در پس‌زمینه ---
def process_job_in_background(job_id, image_path, video_path, motion_type, style_type):
try:
image_server_path = upload_single_file_patiently(image_path, job_id)
video_server_path = upload_single_file_patiently(video_path, job_id)
with lock: jobs[job_id]["status"] = "در حال ارسال درخواست به صف..."
image_payload = {"path": image_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={image_server_path}", "meta": {"_type": "gradio.FileData"}}
video_payload = {"video": {"path": video_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={video_server_path}", "meta": {"_type": "gradio.FileData"}}, "subtitles": None}
session_hash = str(uuid.uuid4())
payload = {"data": [image_payload, video_payload, motion_type, style_type], "fn_index": 0, "session_hash": session_hash}
last_exception = None
for attempt in range(MAX_RETRIES):
try:
join_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/queue/join", json=payload, timeout=180)
join_res.raise_for_status()
last_exception = None
break
except Exception as e:
last_exception = e
time.sleep(RETRY_DELAY)
if last_exception: raise last_exception
with lock: jobs[job_id]["status"] = "منتظر در صف..."
response = requests.get(f"{TARGET_SPACE_URL}gradio_api/queue/data?session_hash={session_hash}", stream=True, timeout=14400)
client = SSEClient(response)
for event in client.events():
data = json.loads(event.data)
msg = data.get("msg")
if msg == "estimation":
status_text = f"در صف انتظار... موقعیت: {data.get('rank', 0) + 1}/{data.get('queue_size', '?')}"
with lock: jobs[job_id]["status"] = status_text
elif msg == "process_starts":
with lock: jobs[job_id]["status"] = "پردازش شروع شد..."
elif msg == "process_completed":
if data.get("success"):
try:
output_data = data.get("output", {}).get("data", [])
if not output_data: raise ValueError("داده‌های خروجی در پاسخ یافت نشد.")
# <<< شروع تغییر: مدیریت خطای NoneType >>>
video_info = output_data[0].get("video") # دیگر مقدار پیش‌فرض {} نمی‌دهیم
if video_info is None:
raise ValueError("ساختار ویدیو در پاسخ سرور اصلی یافت نشد (مقدار None دریافت شد).")
video_url = video_info.get("url")
if not video_url: raise ValueError("URL ویدیو در ساختار پاسخ یافت نشد.")
# <<< پایان تغییر >>>
video_content = None
download_error = None
for download_attempt in range(5):
try:
with lock: jobs[job_id]["status"] = f"در حال دانلود نتیجه نهایی (تلاش {download_attempt + 1}/5)..."
video_content_response = requests.get(video_url, timeout=900)
video_content_response.raise_for_status()
video_content = video_content_response.content
download_error = None
break
except Exception as e:
download_error = e
print(f"خطا در دانلود ویدیو (تلاش {download_attempt + 1}): {e}")
time.sleep(15)
if video_content is None:
raise Exception(f"دانلود ویدیو پس از 5 بار تلاش ناموفق بود. آخرین خطا: {download_error}")
final_video_path = os.path.join(TEMP_DIR, f"{job_id}.mp4")
with open(final_video_path, 'wb') as f: f.write(video_content)
with lock:
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = f"/videos/{job_id}.mp4"
except Exception as e:
handle_error(job_id, e, "Processing successful response")
else:
error_msg = data.get("output", {}).get("error", "خطای نامشخص از سرور اصلی")
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE
print(f"Error from main server: {error_msg}")
break
except Exception as e:
handle_error(job_id, e, "Main processing block")
finally:
if os.path.exists(image_path): os.remove(image_path)
if video_path and os.path.exists(video_path): os.remove(video_path)
# --- API Endpoints ---
@app.post("/submit_new_job")
async def submit_new_job_api(background_tasks: BackgroundTasks, image_file: UploadFile = File(...), video_file: UploadFile = File(...), motion: str = Form(...), style: str = Form(...)):
job_id = str(uuid.uuid4())
image_path = os.path.join(TEMP_DIR, f"{job_id}_{image_file.filename}")
with open(image_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer)
video_path = os.path.join(TEMP_DIR, f"{job_id}_{video_file.filename}")
with open(video_path, "wb") as buffer: shutil.copyfileobj(video_file.file, buffer)
with lock: jobs[job_id] = {"status": "در صف انتظار...", "result": None}
background_tasks.add_task(process_job_in_background, job_id, image_path, video_path, motion, style)
return {"job_id": job_id}
class JobCheckRequest(BaseModel):
job_id: str
@app.post("/check_job_status")
async def check_job_status_api(request: JobCheckRequest):
job_id = request.job_id
with lock:
job = jobs.get(job_id, {"status": "not_found", "result": None})
return {"status": job["status"], "result": job["result"]}
app.mount("/videos", StaticFiles(directory=TEMP_DIR), name="videos")