File size: 9,109 Bytes
8a7d386
e256185
 
 
 
b838241
 
ac4201c
 
 
6d5e5ff
b838241
 
 
 
 
79b0e4a
 
ac4201c
 
b838241
e256185
ac4201c
b838241
 
 
ac4201c
 
 
 
 
 
 
b838241
6d5e5ff
 
0262a28
6d5e5ff
b838241
6d5e5ff
 
 
0262a28
6d5e5ff
 
0262a28
6d5e5ff
 
0262a28
6d5e5ff
 
 
 
0262a28
ac4201c
6d5e5ff
 
ac4201c
6d5e5ff
0262a28
6d5e5ff
 
 
0262a28
 
 
6d5e5ff
 
0262a28
6d5e5ff
 
 
 
 
0262a28
6d5e5ff
 
8a7d386
b838241
6d5e5ff
0262a28
6d5e5ff
b838241
79b0e4a
0262a28
b838241
e256185
6d5e5ff
b838241
 
 
8a7d386
e256185
ac4201c
 
e256185
 
ac4201c
 
 
 
6d5e5ff
0262a28
 
 
b838241
e256185
6d5e5ff
79b0e4a
e256185
6d5e5ff
e256185
6d5e5ff
e256185
 
 
0262a28
 
8a7d386
e256185
6d5e5ff
0262a28
 
 
 
e256185
 
 
8a7d386
 
 
 
 
b838241
e256185
b838241
0262a28
b838241
6d5e5ff
 
 
 
b838241
 
e256185
 
 
0262a28
6d5e5ff
 
0262a28
e256185
 
0262a28
b838241
 
e256185
 
0262a28
 
b838241
e256185
6d5e5ff
e256185
 
 
6d5e5ff
e256185
79b0e4a
b838241
79b0e4a
0262a28
ac4201c
e256185
 
 
ac4201c
 
0262a28
ac4201c
e256185
b838241
6d5e5ff
e256185
 
 
 
 
 
 
 
ac4201c
 
b838241
 
0262a28
b838241
 
6d5e5ff
e256185
 
 
0262a28
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import asyncio
import json
import httpx
import logging
import time
import uuid
import os
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

# --- تنظیمات لاگ حرفه‌ای ---
logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s',
    level=logging.INFO,
    datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)

app = FastAPI()

# آدرس سرور هوش مصنوعی
WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api"

# تنظیمات کلاینت برای عملکرد بالا (بدون محدودیت اتصال)
HighPerformanceLimits = httpx.Limits(max_keepalive_connections=None, max_connections=None)

class TaskPayload(BaseModel):
    job_id: str
    image_url: str
    audio_url: str
    resolution: str
    callback_url: str

async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str):
    """
    دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی
    با تایم‌اوت نامحدود و سیستم تلاش مجدد
    """
    filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}"
    file_bytes = None

    # 1. مرحله دانلود از سرور خودمان (Manager)
    # تایم‌اوت نامحدود (timeout=None)
    for attempt in range(3):
        try:
            resp = await client.get(file_url, timeout=None)
            if resp.status_code == 200:
                file_bytes = resp.content
                break 
            else:
                logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}")
        except Exception as e:
            logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}")
        await asyncio.sleep(2)
    
    if not file_bytes:
        raise Exception("Fatal Error: Could not download file from Manager.")
    
    # 2. مرحله آپلود به سرور هوش مصنوعی (Wan)
    # تایم‌اوت نامحدود (timeout=None)
    for attempt in range(5):
        try:
            files = {'files': (filename, file_bytes)}
            
            # این خط مهمترین تغییر است: timeout=None برای آپلود فایل‌های سنگین
            wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None)
            
            if wan_resp.status_code == 200:
                return wan_resp.json()[0] 
            
            logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}")
        except Exception as e:
            logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}")
        
        await asyncio.sleep(3)

    raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.")

async def process_single_job(payload: TaskPayload):
    """
    تابع اصلی پردازش با زمان انتظار ۲۴ ساعته
    """
    logger.info(f"🟢 [START] Job: {payload.job_id}")
    
    # تنظیم تایم‌اوت کلی کلاینت روی None (نامحدود)
    async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client:
        try:
            # 1. انتقال فایل‌ها
            logger.info(f"📥 [{payload.job_id}] Transferring files...")
            img_remote = await upload_to_wan(client, payload.image_url, payload.job_id)
            aud_remote = await upload_to_wan(client, payload.audio_url, payload.job_id)
            
            # 2. ثبت درخواست (Predict)
            req_data = {
                "data": [
                    {"path": img_remote, "meta": {"_type": "gradio.FileData"}},
                    {"path": aud_remote, "meta": {"_type": "gradio.FileData"}},
                    payload.resolution
                ]
            }
            
            logger.info(f"🚀 [{payload.job_id}] Sending prediction request...")
            
            # درخواست ساخت با تایم‌اوت نامحدود
            predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None)
            
            if predict_resp.status_code != 200:
                raise Exception(f"Prediction Request Failed: {predict_resp.text}")
            
            event_id = predict_resp.json().get("event_id")
            logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}")
            
            # 3. حلقه انتظار هوشمند (Polling/Streaming Loop)
            start_time = time.time()
            final_video_url = None
            
            # انتظار تا ۲۴ ساعت (86400 ثانیه)
            while time.time() - start_time < 86400:
                try:
                    stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
                    
                    # اتصال به استریم
                    # اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود
                    # این تایم‌اوت باعث کنسل شدن کار نمی‌شود، فقط اتصال را رفرش می‌کند
                    async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response:
                        async for line in response.aiter_lines():
                            if not line.strip(): continue
                            
                            if line.startswith("data: "):
                                try:
                                    data = json.loads(line[6:])
                                    
                                    if isinstance(data, list) and len(data) > 0:
                                        res = data[0]
                                        found_url = None
                                        
                                        # جستجوی لینک ویدیو در فرمت‌های مختلف
                                        if isinstance(res, dict):
                                            found_url = res.get("video", {}).get("url")
                                            if not found_url: found_url = res.get("url")
                                            if not found_url and "name" in res: found_url = f"/file={res['name']}"
                                        
                                        elif isinstance(res, str) and ("/file=" in res or ".mp4" in res):
                                            found_url = res
                                        
                                        if found_url:
                                            final_video_url = found_url
                                            break 
                                            
                                except Exception:
                                    pass 
                        
                    if final_video_url:
                        break 
                    
                    await asyncio.sleep(5)

                except Exception as e:
                    # اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن
                    # چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمی‌شوند
                    await asyncio.sleep(5)

            # 4. پایان و ارسال نتیجه
            if final_video_url:
                if final_video_url.startswith("/"):
                    final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
                
                final_video_url = final_video_url.replace("//file=", "/file=")

                logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready")
                
                # ارسال نتیجه به سرور مدیر
                await client.post(payload.callback_url, json={
                    "job_id": payload.job_id,
                    "status": "COMPLETED",
                    "video_url": final_video_url
                })
            else:
                raise Exception("Timeout: Video processing took too long (>24 hours).")

        except Exception as e:
            logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}")
            # تلاش برای ارسال خطا به مدیر
            try:
                await client.post(payload.callback_url, json={
                    "job_id": payload.job_id,
                    "status": "FAILED",
                    "message": str(e)
                })
            except:
                pass

@app.post("/process")
async def accept_task(payload: TaskPayload):
    """
    دریافت درخواست و شروع پردازش در پس‌زمینه
    """
    asyncio.create_task(process_single_job(payload))
    return {"status": "accepted", "message": "Task started in background"}

@app.get("/")
async def root():
    return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"}