Spaces:
Paused
Paused
File size: 9,222 Bytes
92e7e70 ce30669 6552434 244d068 92e7e70 1abefe7 fcb0523 c0f18dc 6552434 ade2276 6552434 fcb0523 92e7e70 fcb0523 ade2276 6552434 83fa137 6552434 6ba07d8 6552434 6ba07d8 6552434 83fa137 6552434 fcb0523 6552434 ce30669 6552434 ce30669 6552434 ade2276 83fa137 92e7e70 ade2276 83fa137 92e7e70 fcb0523 ade2276 fcb0523 6552434 ade2276 fcb0523 92e7e70 6552434 fcb0523 6552434 92e7e70 6552434 6ba07d8 6552434 6ba07d8 6552434 6ba07d8 6552434 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 | # app.py (Worker Space) - نسخه نهایی با مقاومت در برابر NoneType
from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import json
import uuid
import threading
import os
import shutil
import time
from sseclient import SSEClient
from fastapi.staticfiles import StaticFiles
# --- تنظیمات اصلی ---
TARGET_SPACE_URL = "https://wan-ai-wan2-2-animate.hf.space/"
# با توجه به لاگها، تعداد تلاش را بیشتر میکنیم
MAX_RETRIES = 100
RETRY_DELAY = 20
GENERIC_ERROR_MESSAGE = "خطا در ارتباط با سرور اصلی پردازش. لطفاً تصویر و ویدیو با چهره اضافه کرده مجدداً تلاش کنید."
# --- ساختار داده و دایرکتوری موقت ---
jobs = {}
lock = threading.Lock()
TEMP_DIR = "/tmp/worker_files"
os.makedirs(TEMP_DIR, exist_ok=True)
app = FastAPI()
# --- فعالسازی CORS ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- تابع کمکی برای پاکسازی خطا و ثبت لاگ ---
def handle_error(job_id, exception, context=""):
full_error = f"Context: {context} - Exception: {str(exception)}"
print(full_error)
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE
# --- تابع کمکی برای آپلود فایل ---
def upload_single_file_patiently(file_path: str, job_id: str) -> str:
for attempt in range(MAX_RETRIES):
file_to_upload = None
try:
with lock:
jobs[job_id]["status"] = f"تلاش {attempt + 1}/{MAX_RETRIES} برای آپلود: {os.path.basename(file_path)}..."
file_to_upload = open(file_path, 'rb')
files_payload = {'files': file_to_upload}
upload_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/upload", files=files_payload, timeout=600)
upload_res.raise_for_status()
server_path = upload_res.json()[0]
return server_path
except Exception as e:
print(f"خطا در آپلود (تلاش {attempt + 1}): {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
else:
raise e
finally:
if file_to_upload:
file_to_upload.close()
# --- تابع اصلی در پسزمینه ---
def process_job_in_background(job_id, image_path, video_path, motion_type, style_type):
try:
image_server_path = upload_single_file_patiently(image_path, job_id)
video_server_path = upload_single_file_patiently(video_path, job_id)
with lock: jobs[job_id]["status"] = "در حال ارسال درخواست به صف..."
image_payload = {"path": image_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={image_server_path}", "meta": {"_type": "gradio.FileData"}}
video_payload = {"video": {"path": video_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={video_server_path}", "meta": {"_type": "gradio.FileData"}}, "subtitles": None}
session_hash = str(uuid.uuid4())
payload = {"data": [image_payload, video_payload, motion_type, style_type], "fn_index": 0, "session_hash": session_hash}
last_exception = None
for attempt in range(MAX_RETRIES):
try:
join_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/queue/join", json=payload, timeout=180)
join_res.raise_for_status()
last_exception = None
break
except Exception as e:
last_exception = e
time.sleep(RETRY_DELAY)
if last_exception: raise last_exception
with lock: jobs[job_id]["status"] = "منتظر در صف..."
response = requests.get(f"{TARGET_SPACE_URL}gradio_api/queue/data?session_hash={session_hash}", stream=True, timeout=14400)
client = SSEClient(response)
for event in client.events():
data = json.loads(event.data)
msg = data.get("msg")
if msg == "estimation":
status_text = f"در صف انتظار... موقعیت: {data.get('rank', 0) + 1}/{data.get('queue_size', '?')}"
with lock: jobs[job_id]["status"] = status_text
elif msg == "process_starts":
with lock: jobs[job_id]["status"] = "پردازش شروع شد..."
elif msg == "process_completed":
if data.get("success"):
try:
output_data = data.get("output", {}).get("data", [])
if not output_data: raise ValueError("دادههای خروجی در پاسخ یافت نشد.")
# <<< شروع تغییر: مدیریت خطای NoneType >>>
video_info = output_data[0].get("video") # دیگر مقدار پیشفرض {} نمیدهیم
if video_info is None:
raise ValueError("ساختار ویدیو در پاسخ سرور اصلی یافت نشد (مقدار None دریافت شد).")
video_url = video_info.get("url")
if not video_url: raise ValueError("URL ویدیو در ساختار پاسخ یافت نشد.")
# <<< پایان تغییر >>>
video_content = None
download_error = None
for download_attempt in range(5):
try:
with lock: jobs[job_id]["status"] = f"در حال دانلود نتیجه نهایی (تلاش {download_attempt + 1}/5)..."
video_content_response = requests.get(video_url, timeout=900)
video_content_response.raise_for_status()
video_content = video_content_response.content
download_error = None
break
except Exception as e:
download_error = e
print(f"خطا در دانلود ویدیو (تلاش {download_attempt + 1}): {e}")
time.sleep(15)
if video_content is None:
raise Exception(f"دانلود ویدیو پس از 5 بار تلاش ناموفق بود. آخرین خطا: {download_error}")
final_video_path = os.path.join(TEMP_DIR, f"{job_id}.mp4")
with open(final_video_path, 'wb') as f: f.write(video_content)
with lock:
jobs[job_id]["status"] = "completed"
jobs[job_id]["result"] = f"/videos/{job_id}.mp4"
except Exception as e:
handle_error(job_id, e, "Processing successful response")
else:
error_msg = data.get("output", {}).get("error", "خطای نامشخص از سرور اصلی")
with lock:
jobs[job_id]["status"] = "error"
jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE
print(f"Error from main server: {error_msg}")
break
except Exception as e:
handle_error(job_id, e, "Main processing block")
finally:
if os.path.exists(image_path): os.remove(image_path)
if video_path and os.path.exists(video_path): os.remove(video_path)
# --- API Endpoints ---
@app.post("/submit_new_job")
async def submit_new_job_api(background_tasks: BackgroundTasks, image_file: UploadFile = File(...), video_file: UploadFile = File(...), motion: str = Form(...), style: str = Form(...)):
job_id = str(uuid.uuid4())
image_path = os.path.join(TEMP_DIR, f"{job_id}_{image_file.filename}")
with open(image_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer)
video_path = os.path.join(TEMP_DIR, f"{job_id}_{video_file.filename}")
with open(video_path, "wb") as buffer: shutil.copyfileobj(video_file.file, buffer)
with lock: jobs[job_id] = {"status": "در صف انتظار...", "result": None}
background_tasks.add_task(process_job_in_background, job_id, image_path, video_path, motion, style)
return {"job_id": job_id}
class JobCheckRequest(BaseModel):
job_id: str
@app.post("/check_job_status")
async def check_job_status_api(request: JobCheckRequest):
job_id = request.job_id
with lock:
job = jobs.get(job_id, {"status": "not_found", "result": None})
return {"status": job["status"], "result": job["result"]}
app.mount("/videos", StaticFiles(directory=TEMP_DIR), name="videos") |