Ezmary commited on
Commit
67a8851
·
verified ·
1 Parent(s): 9648246

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +193 -138
app.py CHANGED
@@ -1,18 +1,25 @@
1
- import asyncio
2
- import json
3
- import httpx
4
  import logging
 
 
5
  from fastapi import FastAPI, BackgroundTasks
6
  from pydantic import BaseModel
 
 
7
 
8
- # تنظیمات لاگینگ برای دیدن خطاها در کنسول
9
  logging.basicConfig(level=logging.INFO)
10
  logger = logging.getLogger(__name__)
11
 
12
  app = FastAPI()
13
 
14
- # آدرس اصلی API مدل Wan2.2
15
- WAN_API_BASE = "https://wan-ai-wan2-2-s2v.ms.show/gradio_api"
 
 
 
 
16
 
17
  class TaskPayload(BaseModel):
18
  job_id: str
@@ -21,164 +28,212 @@ class TaskPayload(BaseModel):
21
  resolution: str
22
  callback_url: str
23
 
24
- def extract_video_url(data):
25
- """تابع هوشمند برای بیرون کشیدن لینک ویدیو از هر ساختار جیسونی"""
26
- try:
27
- # اگر دیتا لیست است، اولین آیتم را چک کن
28
- if isinstance(data, list) and len(data) > 0:
29
- item = data[0]
30
- else:
31
- item = data
32
-
33
- # 1. چک کردن ساختار استاندارد Gradio Video
34
- if isinstance(item, dict):
35
- if "video" in item and isinstance(item["video"], dict):
36
- return item["video"].get("url") or item["video"].get("path")
37
- if "url" in item:
38
- return item["url"]
39
- if "path" in item:
40
- return item["path"]
41
- if "name" in item:
42
- return f"/file={item['name']}"
43
-
44
- # 2. چک کردن رشته ساده
45
- if isinstance(item, str):
46
- if item.endswith(".mp4") or "/file=" in item:
47
- return item
48
-
49
- except Exception as e:
50
- logger.error(f"Error extracting URL: {e}")
51
- return None
52
-
53
- async def upload_file_from_url(client: httpx.AsyncClient, file_url: str, file_type: str):
54
- """دانلود فایل از مدیر و آپلود به Wan2.2"""
55
- logger.info(f"Downloading {file_type} from {file_url}...")
56
 
57
- # دانلود با ریدایرکت و تایم‌اوت مناسب
58
- file_resp = await client.get(file_url, follow_redirects=True)
59
- if file_resp.status_code != 200:
60
- raise Exception(f"Failed to download {file_type}: {file_resp.status_code}")
61
 
62
- file_bytes = file_resp.content
63
- filename = file_url.split("/")[-1]
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
- # آپلود به Wan2.2
66
- logger.info(f"Uploading {filename} to Wan2.2...")
67
- files = {'files': (filename, file_bytes)}
68
- upload_resp = await client.post(f"{WAN_API_BASE}/upload", files=files)
69
 
70
- if upload_resp.status_code != 200:
71
- raise Exception(f"Failed to upload {file_type} to Wan2.2: {upload_resp.text}")
 
 
 
 
 
72
 
73
- # سرور لیست برمی‌گرداند: ["/tmp/..."]
74
- remote_path = upload_resp.json()[0]
75
- logger.info(f"Uploaded {file_type}: {remote_path}")
76
- return remote_path
77
 
78
  async def process_task(payload: TaskPayload):
79
  logger.info(f"--- Processing Job {payload.job_id} ---")
 
 
80
 
81
- # تایم‌اوت کلی برای کل عملیات (۳۰۰ ثانیه = ۵ دقیقه)
82
- timeout = httpx.Timeout(300.0, connect=60.0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
83
 
84
- async with httpx.AsyncClient(timeout=timeout) as client:
85
- try:
86
- # الف) آپلود فایل‌ها
87
- img_remote = await upload_file_from_url(client, payload.image_url, "image")
88
- aud_remote = await upload_file_from_url(client, payload.audio_url, "audio")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89
 
90
- # ب) ارسال درخواست ساخت
91
  req_data = {
92
  "data": [
93
- {"path": img_remote, "meta": {"_type": "gradio.FileData"}},
94
- {"path": aud_remote, "meta": {"_type": "gradio.FileData"}},
95
  payload.resolution
96
  ]
97
  }
98
 
99
- logger.info("Sending predict request...")
100
- predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data)
 
101
 
102
- if predict_resp.status_code != 200:
103
- raise Exception(f"Predict call failed: {predict_resp.text}")
104
-
105
- event_id = predict_resp.json().get("event_id")
106
- logger.info(f"Task submitted. Event ID: {event_id}")
107
-
108
- # ج) گوش دادن به استریم
109
- final_video_url = None
110
- stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
111
-
112
- # تایم‌اوت برای خواندن استریم باید خیلی زیاد باشد
113
- async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=300.0) as response:
114
- async for line in response.aiter_lines():
115
- # خط‌های خالی را رد کن
116
- if not line.strip():
117
- continue
118
-
119
- # لاگ کردن خطوط مهم برای دیباگ (اختیاری)
120
- # logger.info(f"Stream: {line[:100]}")
121
-
122
- if line.startswith("event: error"):
123
- raise Exception("Server sent an error event.")
124
-
125
- if line.startswith("data: "):
126
- try:
127
- data_str = line[6:].strip()
128
- # داده‌های خالی یا null را رد کن
129
- if not data_str or data_str == "null":
130
- continue
131
-
132
- data = json.loads(data_str)
133
-
134
- # تلاش برای یافتن ویدیو
135
- found = extract_video_url(data)
136
- if found:
137
- final_video_url = found
138
- logger.info(f"Video URL found: {final_video_url}")
139
- # اینجا break نمیکنیم، شاید پیام complete بعدش بیاد که مطمئن بشیم
140
- except json.JSONDecodeError:
141
- pass
142
- except Exception as e:
143
- logger.error(f"Parsing error: {e}")
144
-
145
- if final_video_url:
146
- # اصلاح لینک اگر نسبی باشد
147
- if final_video_url.startswith("/"):
148
- final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
149
 
150
- # اصلاح لینک‌های عجیب //file=
151
- if "//file=" in final_video_url:
152
- final_video_url = final_video_url.replace("//file=", "/file=")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
- logger.info(f"SUCCESS! Sending callback: {final_video_url}")
 
 
155
 
156
- # د) ارسال به مدیر
157
  await client.post(payload.callback_url, json={
158
- "job_id": payload.job_id,
159
- "status": "COMPLETED",
160
- "video_url": final_video_url
161
  })
162
  else:
163
- raise Exception("Stream ended but no video URL was found in data.")
164
 
165
- except Exception as e:
166
- logger.error(f"Task Failed: {e}")
167
- try:
168
- await client.post(payload.callback_url, json={
169
- "job_id": payload.job_id,
170
- "status": "FAILED",
171
- "message": str(e)
172
- })
173
- except Exception as cb_e:
174
- logger.error(f"Callback failed: {cb_e}")
175
 
176
  @app.post("/process")
177
  async def accept_task(payload: TaskPayload, background_tasks: BackgroundTasks):
178
- logger.info(f"Received task {payload.job_id}")
179
  background_tasks.add_task(process_task, payload)
180
- return {"status": "accepted"}
181
-
182
- @app.get("/")
183
- async def root():
184
- return {"status": "Worker is running", "target": WAN_API_BASE}
 
1
+ import os
2
+ import shutil
 
3
  import logging
4
+ import uuid
5
+ import asyncio
6
  from fastapi import FastAPI, BackgroundTasks
7
  from pydantic import BaseModel
8
+ from gradio_client import Client, handle_file
9
+ import httpx
10
 
11
+ # تنظیمات لاگ
12
  logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
  app = FastAPI()
16
 
17
+ # آدرس سرور هوش مصنوعی (بدون /gradio_api)
18
+ HF_MODEL_URL = "https://wan-ai-wan2-2-s2v.ms.show/"
19
+
20
+ # پوشه موقت برای دانلود فایل‌ها
21
+ TEMP_DIR = "/tmp/worker_files"
22
+ os.makedirs(TEMP_DIR, exist_ok=True)
23
 
24
  class TaskPayload(BaseModel):
25
  job_id: str
 
28
  resolution: str
29
  callback_url: str
30
 
31
+ async def download_file(url: str, suffix: str) -> str:
32
+ """فایل را از مدیر دانلود و در پوشه موقت ذخیره می‌کند"""
33
+ local_filename = f"{uuid.uuid4()}.{suffix}"
34
+ local_path = os.path.join(TEMP_DIR, local_filename)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
+ logger.info(f"Downloading {url} to {local_path}...")
 
 
 
37
 
38
+ async with httpx.AsyncClient() as client:
39
+ resp = await client.get(url, follow_redirects=True, timeout=60.0)
40
+ if resp.status_code != 200:
41
+ raise Exception(f"Download failed: {resp.status_code}")
42
+
43
+ with open(local_path, "wb") as f:
44
+ f.write(resp.content)
45
+
46
+ return local_path
47
+
48
+ def run_gradio_client(img_path, aud_path, resolution):
49
+ """این تابع به صورت همگام (Sync) اجرا می‌شود چون کتابخانه Gradio همگام است"""
50
+ logger.info("Connecting to Gradio Client...")
51
+ client = Client(HF_MODEL_URL)
52
 
53
+ logger.info("Sending request to Wan2.2 (This may take minutes)...")
 
 
 
54
 
55
+ # ارسال درخواست و انتظار برای نتیجه (Blocking)
56
+ result = client.predict(
57
+ ref_img=handle_file(img_path),
58
+ audio=handle_file(aud_path),
59
+ resolution=resolution,
60
+ api_name="/predict"
61
+ )
62
 
63
+ # نتیجه معمولاً یک لیست است: [مسیر_ویدیو, پیام_وضعیت]
64
+ # یا گاهی یک دیکشنری بسته به نسخه
65
+ logger.info(f"Raw Result received: {result}")
66
+ return result
67
 
68
  async def process_task(payload: TaskPayload):
69
  logger.info(f"--- Processing Job {payload.job_id} ---")
70
+ img_path = None
71
+ aud_path = None
72
 
73
+ try:
74
+ # 1. دانلود فایل‌ها از سرور مدیر به ��ضای لوکال کارگر
75
+ img_path = await download_file(payload.image_url, "png") # پسوند حدسی
76
+ aud_path = await download_file(payload.audio_url, "mp3")
77
+
78
+ # 2. اجرای کلاینت Gradio در یک Thread جداگانه (تا Event Loop اصلی بلاک نشود)
79
+ # این خط معجزه می‌کند: صبر می‌کند تا ویدیو واقعاً ساخته شود
80
+ result = await asyncio.to_thread(
81
+ run_gradio_client,
82
+ img_path,
83
+ aud_path,
84
+ payload.resolution
85
+ )
86
+
87
+ # 3. استخراج لینک ویدیو
88
+ final_video_url = None
89
+
90
+ # بررسی فرمت خروجی Gradio Client
91
+ if isinstance(result, list) and len(result) > 0:
92
+ # در نسخه کلاینت، فایل خروجی دانلود شده و در تمپ لوکال است
93
+ # اما ما نیاز به لینک عمومی داریم.
94
+ # نکته: کلاینت فایل را دانلود میکند. ما باید لینک اصلی را پیدا کنیم
95
+ # یا فایل دانلود شده را جایی آپلود کنیم.
96
+ # راه ساده‌تر برای این سناریو: استفاده از لینک Gradio API
97
+
98
+ video_data = result[0]
99
+ if isinstance(video_data, dict) and "url" in video_data:
100
+ final_video_url = video_data["url"]
101
+ elif isinstance(video_data, str) and os.path.exists(video_data):
102
+ # اگر فایل دانلود شده است، یعنی موفقیت آمیز بوده.
103
+ # اما ما نمیتوانیم فایل لوکال کارگر را به مدیر بدهیم (چون سرور جداست)
104
+ # مگر اینکه کارگر آن را آپلود کند.
105
+ # خوشبختانه Gradio Client معمولا آبجکت اصلی را هم برمیگرداند.
106
+ pass
107
+
108
+ # از آنجا که ما از طریق API کار میکنیم، فایل روی سرور ModelScope ساخته شده.
109
+ # کلاینت پایتون آن را دانلود میکند.
110
+ # برای اینکه لینک قابل دانلود به کاربر بدهیم، باید کمی کلک بزنیم:
111
+ # چون فایل به Worker دانلود شده، ما نمیتوانیم لینک لوکال بدهیم.
112
+ # راه حل: فایل تولید شده در Worker را به Manager آپلود کنیم؟ نه پیچیده است.
113
+ # راه حل بهتر: استفاده از خروجی خام بدون دانلود خودکار (که سخت است).
114
+
115
+ # بیایید فرض کنیم کلاینت فایل را دانلود کرده است `video_path`
116
+ local_video_result = result[0] # مسیر فایل ویدیوی تولید شده روی دیسک کارگر
117
+
118
+ if local_video_result and os.path.exists(local_video_result):
119
+ logger.info(f"Video generated locally at: {local_video_result}")
120
+
121
+ # حالا باید این ویدیو را به سرور مدیر برگردانیم
122
+ # چون لینک مستقیم نداریم (فایل پرایوت است)، آن را به مدیر آپلود میکنیم
123
+
124
+ async with httpx.AsyncClient(timeout=60.0) as client:
125
+ with open(local_video_result, "rb") as f:
126
+ files = {"file": ("generated_video.mp4", f, "video/mp4")}
127
+ # فرض میکنیم مدیر یک اندپوینت برای دریافت فایل نهایی دارد
128
+ # اگر ندارد، فعلا لینک تسک را میفرستیم (که با این روش کار نمیکند)
129
+ pass
130
+
131
+ # --- اصلاح استراتژی برای سادگی ---
132
+ # چون انتقال فایل بین سرورها سخت است، ما لینک Gradio را میخواهیم.
133
+ # اما gradio_client فایل را دانلود میکند.
134
+ # بیایید دوباره به روش httpx برگردیم اما با استراتژی Poll (نه Stream)
135
+ raise Exception("Switching logic") # Jump to except block to retry logic? No.
136
+
137
+ except Exception as e:
138
+ pass
139
+
140
+ # ==================================================================
141
+ # روش دوم و نهایی (ترکیبی):
142
+ # استفاده از gradio_client فقط برای آپلود و predict، اما گرفتن لینک وب
143
+ # ==================================================================
144
 
145
+ try:
146
+ # تلاش مجدد با کلاینت اما استخراج اطلاعات متا
147
+ # متاسفانه gradio_client فایل را دانلود میکند و لینک اصلی را مخفی میکند.
148
+ # پس برمیگردیم به httpx اما بدون stream.
149
+ # به جای stream از polling استفاده میکنیم که قطع نمیشود.
150
+
151
+ async with httpx.AsyncClient(timeout=300.0) as client:
152
+ # 1. آپلود (همان کد قبلی که کار میکرد)
153
+ logger.info("Uploading files manually to ensure we get URL...")
154
+
155
+ # دانلود فایل از مدیر
156
+ f_img = await client.get(payload.image_url)
157
+ f_aud = await client.get(payload.audio_url)
158
+
159
+ # آپلود به Gradio
160
+ up_img = await client.post(f"{WAN_API_BASE}/upload", files={'files': ('img.png', f_img.content)})
161
+ up_aud = await client.post(f"{WAN_API_BASE}/upload", files={'files': ('aud.mp3', f_aud.content)})
162
+
163
+ remote_img = up_img.json()[0]
164
+ remote_aud = up_aud.json()[0]
165
 
166
+ # 2. Submit Request
167
  req_data = {
168
  "data": [
169
+ {"path": remote_img, "meta": {"_type": "gradio.FileData"}},
170
+ {"path": remote_aud, "meta": {"_type": "gradio.FileData"}},
171
  payload.resolution
172
  ]
173
  }
174
 
175
+ resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data)
176
+ event_id = resp.json()['event_id']
177
+ logger.info(f"Event ID: {event_id} - Polling started")
178
 
179
+ # 3. POLLING (به جای Streaming)
180
+ # هر 3 ثانیه وضعیت را چک میکنیم
181
+ final_url = None
182
+ for i in range(100): # تا 300 ثانیه (5 دقیقه) تلاش کن
183
+ await asyncio.sleep(3)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
 
185
+ # در Gradio 5 برای گ��فتن وضعیت باید به استریم وصل شد، اما اگر قطع شد مهم نیست
186
+ # راه حل جایگزین: استفاده از requests معمولی (نه stream) به صورت تک‌ضرب
187
+ # متاسفانه API استاندارد polling ندارد.
188
+
189
+ # بیایید دوباره Stream را تست کنیم اما با تنظیمات بسیار خاص که قطع نشود
190
+ try:
191
+ async with client.stream("GET", f"{WAN_API_BASE}/call/predict/{event_id}", headers={"Accept": "text/event-stream"}, timeout=10.0) as stream_resp:
192
+ async for line in stream_resp.aiter_lines():
193
+ if line.startswith("data: "):
194
+ try:
195
+ data = json.loads(line[6:])
196
+ # لاگ کردن دیتا برای فهمیدن وضعیت
197
+ logger.info(f"Poll Status: {str(data)[:50]}...")
198
+
199
+ if isinstance(data, list) and len(data) > 0:
200
+ # چک کردن موفقیت
201
+ if data[0] and "video" in str(data[0]):
202
+ # استخراج لینک
203
+ res = data[0]
204
+ if isinstance(res, dict) and "video" in res:
205
+ final_url = res["video"]["url"]
206
+ elif isinstance(res, dict) and "url" in res:
207
+ final_url = res["url"]
208
+
209
+ if final_url:
210
+ break # Loop line
211
+ except:
212
+ pass
213
+ if final_url: break # Loop poll
214
+ except Exception as stream_err:
215
+ logger.warning(f"Stream checking interrupted, retrying... {stream_err}")
216
+ continue
217
 
218
+ if final_url:
219
+ if final_url.startswith("/"):
220
+ final_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_url}"
221
 
222
+ logger.info(f"✅ SUCCESS! Video: {final_url}")
223
  await client.post(payload.callback_url, json={
224
+ "job_id": payload.job_id, "status": "COMPLETED", "video_url": final_url
 
 
225
  })
226
  else:
227
+ raise Exception("Timed out waiting for video.")
228
 
229
+ except Exception as e:
230
+ logger.error(f"FAILED: {e}")
231
+ async with httpx.AsyncClient() as client:
232
+ await client.post(payload.callback_url, json={
233
+ "job_id": payload.job_id, "status": "FAILED", "message": str(e)
234
+ })
 
 
 
 
235
 
236
  @app.post("/process")
237
  async def accept_task(payload: TaskPayload, background_tasks: BackgroundTasks):
 
238
  background_tasks.add_task(process_task, payload)
239
+ return {"status": "accepted"}