File size: 9,222 Bytes
92e7e70
ce30669
6552434
 
 
 
 
 
 
 
 
 
 
 
 
 
244d068
92e7e70
1abefe7
fcb0523
c0f18dc
6552434
 
 
 
 
 
 
 
 
ade2276
6552434
 
 
 
 
 
 
 
fcb0523
 
 
92e7e70
fcb0523
 
 
 
ade2276
6552434
 
 
 
 
83fa137
6552434
 
 
6ba07d8
6552434
 
6ba07d8
6552434
 
 
83fa137
6552434
 
 
 
 
 
 
 
fcb0523
6552434
 
 
 
 
 
 
 
 
 
 
 
 
ce30669
6552434
 
 
 
 
 
 
 
 
 
 
 
ce30669
6552434
 
 
 
 
 
 
 
 
 
 
 
ade2276
 
83fa137
92e7e70
 
 
 
 
 
ade2276
83fa137
92e7e70
fcb0523
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ade2276
 
 
 
 
fcb0523
 
6552434
ade2276
fcb0523
 
 
92e7e70
6552434
 
 
fcb0523
6552434
 
 
 
92e7e70
6552434
 
 
 
 
 
 
6ba07d8
6552434
6ba07d8
6552434
 
 
 
 
 
 
 
 
6ba07d8
 
6552434
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# app.py (Worker Space) - نسخه نهایی با مقاومت در برابر NoneType

from fastapi import FastAPI, File, UploadFile, Form, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import requests
import json
import uuid
import threading
import os
import shutil
import time
from sseclient import SSEClient
from fastapi.staticfiles import StaticFiles

# --- تنظیمات اصلی ---
TARGET_SPACE_URL = "https://wan-ai-wan2-2-animate.hf.space/"
# با توجه به لاگ‌ها، تعداد تلاش را بیشتر می‌کنیم
MAX_RETRIES = 100
RETRY_DELAY = 20
GENERIC_ERROR_MESSAGE = "خطا در ارتباط با سرور اصلی پردازش. لطفاً تصویر و ویدیو با چهره اضافه کرده مجدداً تلاش کنید."

# --- ساختار داده و دایرکتوری موقت ---
jobs = {}
lock = threading.Lock()
TEMP_DIR = "/tmp/worker_files"
os.makedirs(TEMP_DIR, exist_ok=True)

app = FastAPI()

# --- فعال‌سازی CORS ---
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- تابع کمکی برای پاکسازی خطا و ثبت لاگ ---
def handle_error(job_id, exception, context=""):
    full_error = f"Context: {context} - Exception: {str(exception)}"
    print(full_error)
    with lock:
        jobs[job_id]["status"] = "error"
        jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE

# --- تابع کمکی برای آپلود فایل ---
def upload_single_file_patiently(file_path: str, job_id: str) -> str:
    for attempt in range(MAX_RETRIES):
        file_to_upload = None
        try:
            with lock:
                jobs[job_id]["status"] = f"تلاش {attempt + 1}/{MAX_RETRIES} برای آپلود: {os.path.basename(file_path)}..."
            
            file_to_upload = open(file_path, 'rb')
            files_payload = {'files': file_to_upload}
            
            upload_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/upload", files=files_payload, timeout=600)
            upload_res.raise_for_status()
            
            server_path = upload_res.json()[0]
            return server_path
        except Exception as e:
            print(f"خطا در آپلود (تلاش {attempt + 1}): {e}")
            if attempt < MAX_RETRIES - 1:
                time.sleep(RETRY_DELAY)
            else:
                raise e
        finally:
            if file_to_upload:
                file_to_upload.close()

# --- تابع اصلی در پس‌زمینه ---
def process_job_in_background(job_id, image_path, video_path, motion_type, style_type):
    try:
        image_server_path = upload_single_file_patiently(image_path, job_id)
        video_server_path = upload_single_file_patiently(video_path, job_id)

        with lock: jobs[job_id]["status"] = "در حال ارسال درخواست به صف..."
        
        image_payload = {"path": image_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={image_server_path}", "meta": {"_type": "gradio.FileData"}}
        video_payload = {"video": {"path": video_server_path, "url": f"{TARGET_SPACE_URL}gradio_api/file={video_server_path}", "meta": {"_type": "gradio.FileData"}}, "subtitles": None}
        session_hash = str(uuid.uuid4())
        payload = {"data": [image_payload, video_payload, motion_type, style_type], "fn_index": 0, "session_hash": session_hash}

        last_exception = None
        for attempt in range(MAX_RETRIES): 
            try:
                join_res = requests.post(f"{TARGET_SPACE_URL}gradio_api/queue/join", json=payload, timeout=180)
                join_res.raise_for_status()
                last_exception = None
                break
            except Exception as e:
                last_exception = e
                time.sleep(RETRY_DELAY)
        if last_exception: raise last_exception

        with lock: jobs[job_id]["status"] = "منتظر در صف..."
        
        response = requests.get(f"{TARGET_SPACE_URL}gradio_api/queue/data?session_hash={session_hash}", stream=True, timeout=14400) 
        client = SSEClient(response)
        
        for event in client.events():
            data = json.loads(event.data)
            msg = data.get("msg")
            if msg == "estimation":
                status_text = f"در صف انتظار... موقعیت: {data.get('rank', 0) + 1}/{data.get('queue_size', '?')}"
                with lock: jobs[job_id]["status"] = status_text
            elif msg == "process_starts":
                with lock: jobs[job_id]["status"] = "پردازش شروع شد..."
            elif msg == "process_completed":
                if data.get("success"):
                    try:
                        output_data = data.get("output", {}).get("data", [])
                        if not output_data: raise ValueError("داده‌های خروجی در پاسخ یافت نشد.")
                        
                        # <<< شروع تغییر: مدیریت خطای NoneType >>>
                        video_info = output_data[0].get("video") # دیگر مقدار پیش‌فرض {} نمی‌دهیم
                        if video_info is None:
                            raise ValueError("ساختار ویدیو در پاسخ سرور اصلی یافت نشد (مقدار None دریافت شد).")
                        
                        video_url = video_info.get("url")
                        if not video_url: raise ValueError("URL ویدیو در ساختار پاسخ یافت نشد.")
                        # <<< پایان تغییر >>>
                        
                        video_content = None
                        download_error = None
                        for download_attempt in range(5):
                            try:
                                with lock: jobs[job_id]["status"] = f"در حال دانلود نتیجه نهایی (تلاش {download_attempt + 1}/5)..."
                                video_content_response = requests.get(video_url, timeout=900)
                                video_content_response.raise_for_status()
                                video_content = video_content_response.content
                                download_error = None
                                break 
                            except Exception as e:
                                download_error = e
                                print(f"خطا در دانلود ویدیو (تلاش {download_attempt + 1}): {e}")
                                time.sleep(15)
                        
                        if video_content is None:
                            raise Exception(f"دانلود ویدیو پس از 5 بار تلاش ناموفق بود. آخرین خطا: {download_error}")

                        final_video_path = os.path.join(TEMP_DIR, f"{job_id}.mp4")
                        with open(final_video_path, 'wb') as f: f.write(video_content)
                        with lock:
                            jobs[job_id]["status"] = "completed"
                            jobs[job_id]["result"] = f"/videos/{job_id}.mp4"
                    except Exception as e:
                        handle_error(job_id, e, "Processing successful response")
                else:
                    error_msg = data.get("output", {}).get("error", "خطای نامشخص از سرور اصلی")
                    with lock: 
                        jobs[job_id]["status"] = "error"
                        jobs[job_id]["result"] = GENERIC_ERROR_MESSAGE
                        print(f"Error from main server: {error_msg}")
                break
            
    except Exception as e:
        handle_error(job_id, e, "Main processing block")
    finally:
        if os.path.exists(image_path): os.remove(image_path)
        if video_path and os.path.exists(video_path): os.remove(video_path)

# --- API Endpoints ---
@app.post("/submit_new_job")
async def submit_new_job_api(background_tasks: BackgroundTasks, image_file: UploadFile = File(...), video_file: UploadFile = File(...), motion: str = Form(...), style: str = Form(...)):
    job_id = str(uuid.uuid4())
    image_path = os.path.join(TEMP_DIR, f"{job_id}_{image_file.filename}")
    with open(image_path, "wb") as buffer: shutil.copyfileobj(image_file.file, buffer)
    video_path = os.path.join(TEMP_DIR, f"{job_id}_{video_file.filename}")
    with open(video_path, "wb") as buffer: shutil.copyfileobj(video_file.file, buffer)
    
    with lock: jobs[job_id] = {"status": "در صف انتظار...", "result": None}
    
    background_tasks.add_task(process_job_in_background, job_id, image_path, video_path, motion, style)
    return {"job_id": job_id}

class JobCheckRequest(BaseModel):
    job_id: str

@app.post("/check_job_status")
async def check_job_status_api(request: JobCheckRequest):
    job_id = request.job_id
    with lock:
        job = jobs.get(job_id, {"status": "not_found", "result": None})
    return {"status": job["status"], "result": job["result"]}

app.mount("/videos", StaticFiles(directory=TEMP_DIR), name="videos")