Ezmary commited on
Commit
79b0e4a
·
verified ·
1 Parent(s): 0f0d6e9

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +108 -45
app.py CHANGED
@@ -1,9 +1,14 @@
1
  import asyncio
2
  import json
3
  import httpx
 
4
  from fastapi import FastAPI, BackgroundTasks
5
  from pydantic import BaseModel
6
 
 
 
 
 
7
  app = FastAPI()
8
 
9
  # آدرس اصلی API مدل Wan2.2
@@ -16,39 +21,73 @@ class TaskPayload(BaseModel):
16
  resolution: str
17
  callback_url: str
18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
  async def upload_file_from_url(client: httpx.AsyncClient, file_url: str, file_type: str):
20
  """دانلود فایل از مدیر و آپلود به Wan2.2"""
21
- print(f"Downloading {file_type} from {file_url}...")
22
- # 1. دانلود فایل از اسپیس مدیر
23
- file_resp = await client.get(file_url)
 
24
  if file_resp.status_code != 200:
25
  raise Exception(f"Failed to download {file_type}: {file_resp.status_code}")
26
 
27
  file_bytes = file_resp.content
28
  filename = file_url.split("/")[-1]
29
 
30
- # 2. آپلود به Wan2.2
31
- print(f"Uploading {filename} to Wan2.2...")
32
  files = {'files': (filename, file_bytes)}
33
  upload_resp = await client.post(f"{WAN_API_BASE}/upload", files=files)
34
 
35
  if upload_resp.status_code != 200:
36
- raise Exception(f"Failed to upload {file_type} to Wan2.2")
37
 
 
38
  remote_path = upload_resp.json()[0]
39
- print(f"Uploaded: {remote_path}")
40
  return remote_path
41
 
42
  async def process_task(payload: TaskPayload):
43
- print(f"Processing Job {payload.job_id}")
44
 
45
- async with httpx.AsyncClient(timeout=120.0) as client: # تایم‌اوت بالا
 
 
 
46
  try:
47
  # الف) آپلود فایل‌ها
48
  img_remote = await upload_file_from_url(client, payload.image_url, "image")
49
  aud_remote = await upload_file_from_url(client, payload.audio_url, "audio")
50
 
51
- # ب) ارسال درخواست Predict
52
  req_data = {
53
  "data": [
54
  {"path": img_remote, "meta": {"_type": "gradio.FileData"}},
@@ -57,65 +96,89 @@ async def process_task(payload: TaskPayload):
57
  ]
58
  }
59
 
 
60
  predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data)
 
61
  if predict_resp.status_code != 200:
62
  raise Exception(f"Predict call failed: {predict_resp.text}")
63
 
64
  event_id = predict_resp.json().get("event_id")
65
- print(f"Event ID: {event_id}")
66
 
67
- # ج) گوش دادن به استریم (Polling the stream)
68
- # در پایتون، به جای EventSource، می‌توانیم با یک لوپ ساده GET بزنیم یا stream کنیم
69
  final_video_url = None
 
70
 
71
- async with client.stream("GET", f"{WAN_API_BASE}/call/predict/{event_id}", headers={"Accept": "text/event-stream"}) as response:
 
72
  async for line in response.aiter_lines():
 
 
 
 
 
 
 
 
 
 
73
  if line.startswith("data: "):
74
  try:
75
- data = json.loads(line[6:])
76
- # بررسی ساختار خروجی
77
- if isinstance(data, list) and len(data) > 0:
78
- result_item = data[0]
79
- # پیدا کردن URL در ساختارهای مختلف
80
- if isinstance(result_item, dict):
81
- if "video" in result_item and "url" in result_item["video"]:
82
- final_video_url = result_item["video"]["url"]
83
- elif "url" in result_item:
84
- final_video_url = result_item["url"]
85
- elif "name" in result_item:
86
- final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show/file={result_item['name']}"
87
- elif isinstance(result_item, str) and (result_item.endswith(".mp4") or "/file=" in result_item):
88
- final_video_url = result_item
89
 
90
- if final_video_url:
91
- # اصلاح لینک نسبی
92
- if final_video_url.startswith("/"):
93
- final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
94
- break
95
- except:
 
 
 
96
  pass
 
 
97
 
98
  if final_video_url:
99
- print(f"Success! Video: {final_video_url}")
100
- # د) بازگرداندن نتیجه به مدیر
 
 
 
 
 
 
 
 
 
101
  await client.post(payload.callback_url, json={
102
  "job_id": payload.job_id,
103
  "status": "COMPLETED",
104
  "video_url": final_video_url
105
  })
106
  else:
107
- raise Exception("Stream ended without video URL")
108
 
109
  except Exception as e:
110
- print(f"Task Failed: {e}")
111
- await client.post(payload.callback_url, json={
112
- "job_id": payload.job_id,
113
- "status": "FAILED",
114
- "message": str(e)
115
- })
 
 
 
116
 
117
  @app.post("/process")
118
  async def accept_task(payload: TaskPayload, background_tasks: BackgroundTasks):
119
- """تسک را می‌پذیرد و بلافاصله OK برمی‌گرداند تا تایم‌اوت نشود"""
120
  background_tasks.add_task(process_task, payload)
121
- return {"status": "accepted"}
 
 
 
 
 
1
  import asyncio
2
  import json
3
  import httpx
4
+ import logging
5
  from fastapi import FastAPI, BackgroundTasks
6
  from pydantic import BaseModel
7
 
8
+ # تنظیمات لاگینگ برای دیدن خطاها در کنسول
9
+ logging.basicConfig(level=logging.INFO)
10
+ logger = logging.getLogger(__name__)
11
+
12
  app = FastAPI()
13
 
14
  # آدرس اصلی API مدل Wan2.2
 
21
  resolution: str
22
  callback_url: str
23
 
24
+ def extract_video_url(data):
25
+ """تابع هوشمند برای بیرون کشیدن لینک ویدیو از هر ساختار جیسونی"""
26
+ try:
27
+ # اگر دیتا لیست است، اولین آیتم را چک کن
28
+ if isinstance(data, list) and len(data) > 0:
29
+ item = data[0]
30
+ else:
31
+ item = data
32
+
33
+ # 1. چک کردن ساختار استاندارد Gradio Video
34
+ if isinstance(item, dict):
35
+ if "video" in item and isinstance(item["video"], dict):
36
+ return item["video"].get("url") or item["video"].get("path")
37
+ if "url" in item:
38
+ return item["url"]
39
+ if "path" in item:
40
+ return item["path"]
41
+ if "name" in item:
42
+ return f"/file={item['name']}"
43
+
44
+ # 2. چک کردن رشته ساده
45
+ if isinstance(item, str):
46
+ if item.endswith(".mp4") or "/file=" in item:
47
+ return item
48
+
49
+ except Exception as e:
50
+ logger.error(f"Error extracting URL: {e}")
51
+ return None
52
+
53
  async def upload_file_from_url(client: httpx.AsyncClient, file_url: str, file_type: str):
54
  """دانلود فایل از مدیر و آپلود به Wan2.2"""
55
+ logger.info(f"Downloading {file_type} from {file_url}...")
56
+
57
+ # دانلود با ریدایرکت و تایم‌اوت مناسب
58
+ file_resp = await client.get(file_url, follow_redirects=True)
59
  if file_resp.status_code != 200:
60
  raise Exception(f"Failed to download {file_type}: {file_resp.status_code}")
61
 
62
  file_bytes = file_resp.content
63
  filename = file_url.split("/")[-1]
64
 
65
+ # آپلود به Wan2.2
66
+ logger.info(f"Uploading {filename} to Wan2.2...")
67
  files = {'files': (filename, file_bytes)}
68
  upload_resp = await client.post(f"{WAN_API_BASE}/upload", files=files)
69
 
70
  if upload_resp.status_code != 200:
71
+ raise Exception(f"Failed to upload {file_type} to Wan2.2: {upload_resp.text}")
72
 
73
+ # سرور لیست برمی‌گرداند: ["/tmp/..."]
74
  remote_path = upload_resp.json()[0]
75
+ logger.info(f"Uploaded {file_type}: {remote_path}")
76
  return remote_path
77
 
78
  async def process_task(payload: TaskPayload):
79
+ logger.info(f"--- Processing Job {payload.job_id} ---")
80
 
81
+ # تایم‌اوت کلی برای کل عملیات (۳۰۰ ثانیه = ۵ دقیقه)
82
+ timeout = httpx.Timeout(300.0, connect=60.0)
83
+
84
+ async with httpx.AsyncClient(timeout=timeout) as client:
85
  try:
86
  # الف) آپلود فایل‌ها
87
  img_remote = await upload_file_from_url(client, payload.image_url, "image")
88
  aud_remote = await upload_file_from_url(client, payload.audio_url, "audio")
89
 
90
+ # ب) ارسال درخواست ساخت
91
  req_data = {
92
  "data": [
93
  {"path": img_remote, "meta": {"_type": "gradio.FileData"}},
 
96
  ]
97
  }
98
 
99
+ logger.info("Sending predict request...")
100
  predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data)
101
+
102
  if predict_resp.status_code != 200:
103
  raise Exception(f"Predict call failed: {predict_resp.text}")
104
 
105
  event_id = predict_resp.json().get("event_id")
106
+ logger.info(f"Task submitted. Event ID: {event_id}")
107
 
108
+ # ج) گوش دادن به استریم
 
109
  final_video_url = None
110
+ stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
111
 
112
+ # تایم‌اوت برای خواندن استریم باید خیلی زیاد باشد
113
+ async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=300.0) as response:
114
  async for line in response.aiter_lines():
115
+ # خط‌های خالی را رد کن
116
+ if not line.strip():
117
+ continue
118
+
119
+ # لاگ کردن خطوط مهم برای دیباگ (اختیاری)
120
+ # logger.info(f"Stream: {line[:100]}")
121
+
122
+ if line.startswith("event: error"):
123
+ raise Exception("Server sent an error event.")
124
+
125
  if line.startswith("data: "):
126
  try:
127
+ data_str = line[6:].strip()
128
+ # داده‌های خالی یا null را رد کن
129
+ if not data_str or data_str == "null":
130
+ continue
 
 
 
 
 
 
 
 
 
 
131
 
132
+ data = json.loads(data_str)
133
+
134
+ # تلاش برای یافتن ویدیو
135
+ found = extract_video_url(data)
136
+ if found:
137
+ final_video_url = found
138
+ logger.info(f"Video URL found: {final_video_url}")
139
+ # اینجا break نمیکنیم، شاید پیام complete بعدش بیاد که مطمئن بشیم
140
+ except json.JSONDecodeError:
141
  pass
142
+ except Exception as e:
143
+ logger.error(f"Parsing error: {e}")
144
 
145
  if final_video_url:
146
+ # اصلاح لینک اگر نسبی باشد
147
+ if final_video_url.startswith("/"):
148
+ final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
149
+
150
+ # اصلاح لینک‌های عجیب //file=
151
+ if "//file=" in final_video_url:
152
+ final_video_url = final_video_url.replace("//file=", "/file=")
153
+
154
+ logger.info(f"SUCCESS! Sending callback: {final_video_url}")
155
+
156
+ # د) ارسال به مدیر
157
  await client.post(payload.callback_url, json={
158
  "job_id": payload.job_id,
159
  "status": "COMPLETED",
160
  "video_url": final_video_url
161
  })
162
  else:
163
+ raise Exception("Stream ended but no video URL was found in data.")
164
 
165
  except Exception as e:
166
+ logger.error(f"Task Failed: {e}")
167
+ try:
168
+ await client.post(payload.callback_url, json={
169
+ "job_id": payload.job_id,
170
+ "status": "FAILED",
171
+ "message": str(e)
172
+ })
173
+ except Exception as cb_e:
174
+ logger.error(f"Callback failed: {cb_e}")
175
 
176
  @app.post("/process")
177
  async def accept_task(payload: TaskPayload, background_tasks: BackgroundTasks):
178
+ logger.info(f"Received task {payload.job_id}")
179
  background_tasks.add_task(process_task, payload)
180
+ return {"status": "accepted"}
181
+
182
+ @app.get("/")
183
+ async def root():
184
+ return {"status": "Worker is running", "target": WAN_API_BASE}