Ezmary commited on
Commit
0262a28
·
verified ·
1 Parent(s): 52e0082

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +32 -35
app.py CHANGED
@@ -34,54 +34,55 @@ class TaskPayload(BaseModel):
34
  async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str):
35
  """
36
  دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی
37
- با سیستم تلاش مجدد (Retry) قدرتمند برای جلوگیری از خطا
38
  """
39
  filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}"
40
  file_bytes = None
41
 
42
  # 1. مرحله دانلود از سرور خودمان (Manager)
43
- # سه بار تلاش می‌کند اگر دانلود نشد
44
  for attempt in range(3):
45
  try:
46
- resp = await client.get(file_url, timeout=60.0)
47
  if resp.status_code == 200:
48
  file_bytes = resp.content
49
- break # دانلود موفق بود، خروج از حلقه
50
  else:
51
  logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}")
52
  except Exception as e:
53
  logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}")
54
- await asyncio.sleep(2) # مکث کوتاه
55
 
56
  if not file_bytes:
57
  raise Exception("Fatal Error: Could not download file from Manager.")
58
 
59
  # 2. مرحله آپلود به سرور هوش مصنوعی (Wan)
60
- # پنج بار تلاش می‌کند چون سرور چینی نوسان دارد
61
  for attempt in range(5):
62
  try:
63
  files = {'files': (filename, file_bytes)}
64
- # تایم‌اوت بالا برای آپلود (120 ثانیه)
65
- wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=120.0)
 
66
 
67
  if wan_resp.status_code == 200:
68
- return wan_resp.json()[0] # موفقیت! مسیر سرور را برگردان
69
 
70
  logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}")
71
  except Exception as e:
72
  logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}")
73
 
74
- await asyncio.sleep(3) # 3 ثانیه استراحت قبل از تلاش بعدی
75
 
76
  raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.")
77
 
78
  async def process_single_job(payload: TaskPayload):
79
  """
80
- تابع اصلی پردازش که به صورت کاملاً مستقل و ایزوله اجرا می‌شود.
81
  """
82
  logger.info(f"🟢 [START] Job: {payload.job_id}")
83
 
84
- # استفاده از کلاینت اختصاصی برای هر درخواست (بدون تداخل)
85
  async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client:
86
  try:
87
  # 1. انتقال فایل‌ها
@@ -99,7 +100,9 @@ async def process_single_job(payload: TaskPayload):
99
  }
100
 
101
  logger.info(f"🚀 [{payload.job_id}] Sending prediction request...")
102
- predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=60.0)
 
 
103
 
104
  if predict_resp.status_code != 200:
105
  raise Exception(f"Prediction Request Failed: {predict_resp.text}")
@@ -108,16 +111,18 @@ async def process_single_job(payload: TaskPayload):
108
  logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}")
109
 
110
  # 3. حلقه انتظار هوشمند (Polling/Streaming Loop)
111
- # تا 600 ثانیه (10 دقیقه) منتظر می‌ماند
112
  start_time = time.time()
113
  final_video_url = None
114
 
115
- while time.time() - start_time < 600:
 
116
  try:
117
  stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
118
 
119
- # اتصال به استریم برای دریافت وضعیت زنده
120
- async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=40.0) as response:
 
 
121
  async for line in response.aiter_lines():
122
  if not line.strip(): continue
123
 
@@ -125,60 +130,53 @@ async def process_single_job(payload: TaskPayload):
125
  try:
126
  data = json.loads(line[6:])
127
 
128
- # بررسی اینکه آیا ویدیو در پاسخ وجود دارد یا خیر
129
  if isinstance(data, list) and len(data) > 0:
130
  res = data[0]
131
  found_url = None
132
 
133
- # استراتژی‌های مختلف یافتن لینک ویدیو در جیسون
134
  if isinstance(res, dict):
135
- # حالت ۱: داخل آبجکت video
136
  found_url = res.get("video", {}).get("url")
137
- # حالت ۲: مستقیم در url
138
  if not found_url: found_url = res.get("url")
139
- # حالت ۳: فقط نام فایل
140
  if not found_url and "name" in res: found_url = f"/file={res['name']}"
141
 
142
- # حالت ۴: رشته مستقیم
143
  elif isinstance(res, str) and ("/file=" in res or ".mp4" in res):
144
  found_url = res
145
 
146
  if found_url:
147
  final_video_url = found_url
148
- break # یافت شد! خروج از خواندن خطوط
149
 
150
  except Exception:
151
- pass # خطاهای ریز جیسون را نادیده بگیر
152
 
153
  if final_video_url:
154
- break # خروج از حلقه زمانی اصلی
155
 
156
- # اگر استریم قطع شد ولی ویدیو نیامد، ۵ ثانیه صبر کن و دوباره وصل شو
157
  await asyncio.sleep(5)
158
 
159
  except Exception as e:
160
- # قطع شدن اینترنت کارگر مشکلی نیست، دوباره تلاش می‌کنیم
 
161
  await asyncio.sleep(5)
162
 
163
  # 4. پایان و ارسال نتیجه
164
  if final_video_url:
165
- # اصلاح لینک‌های نسبی به مطلق
166
  if final_video_url.startswith("/"):
167
  final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
168
 
169
- # اصلاح باگ دو اسلش در آدرس
170
  final_video_url = final_video_url.replace("//file=", "/file=")
171
 
172
  logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready")
173
 
174
- # ارسال به مدیر
175
  await client.post(payload.callback_url, json={
176
  "job_id": payload.job_id,
177
  "status": "COMPLETED",
178
  "video_url": final_video_url
179
  })
180
  else:
181
- raise Exception("Timeout: Video processing took too long (>10 mins).")
182
 
183
  except Exception as e:
184
  logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}")
@@ -195,12 +193,11 @@ async def process_single_job(payload: TaskPayload):
195
  @app.post("/process")
196
  async def accept_task(payload: TaskPayload):
197
  """
198
- این اندپوینت درخواست را می‌پذیرد و بلافاصله پاسخ OK می‌دهد.
199
- پردازش اصلی در یک Task جداگانه و موازی شروع می‌شود.
200
  """
201
  asyncio.create_task(process_single_job(payload))
202
  return {"status": "accepted", "message": "Task started in background"}
203
 
204
  @app.get("/")
205
  async def root():
206
- return {"status": "Worker Pro Ready", "mode": "High Concurrency"}
 
34
  async def upload_to_wan(client: httpx.AsyncClient, file_url: str, job_id: str):
35
  """
36
  دانلود فایل از سرور مدیر و آپلود آن به سرور هوش مصنوعی
37
+ با تایماوت نامحدود و سیستم تلاش مجدد
38
  """
39
  filename = f"{job_id}_{uuid.uuid4().hex[:6]}_{file_url.split('/')[-1]}"
40
  file_bytes = None
41
 
42
  # 1. مرحله دانلود از سرور خودمان (Manager)
43
+ # تایم‌اوت نامحدود (timeout=None)
44
  for attempt in range(3):
45
  try:
46
+ resp = await client.get(file_url, timeout=None)
47
  if resp.status_code == 200:
48
  file_bytes = resp.content
49
+ break
50
  else:
51
  logger.warning(f"⚠️ [{job_id}] Download attempt {attempt+1} failed: {resp.status_code}")
52
  except Exception as e:
53
  logger.warning(f"⚠️ [{job_id}] Download error {attempt+1}: {e}")
54
+ await asyncio.sleep(2)
55
 
56
  if not file_bytes:
57
  raise Exception("Fatal Error: Could not download file from Manager.")
58
 
59
  # 2. مرحله آپلود به سرور هوش مصنوعی (Wan)
60
+ # تایماوت نامحدود (timeout=None)
61
  for attempt in range(5):
62
  try:
63
  files = {'files': (filename, file_bytes)}
64
+
65
+ # این خط مهمترین تغییر است: timeout=None برای آپلود فایل‌های سنگین
66
+ wan_resp = await client.post(f"{WAN_API_BASE}/upload", files=files, timeout=None)
67
 
68
  if wan_resp.status_code == 200:
69
+ return wan_resp.json()[0]
70
 
71
  logger.warning(f"⚠️ [{job_id}] Upload attempt {attempt+1} failed. Status: {wan_resp.status_code}")
72
  except Exception as e:
73
  logger.warning(f"⚠️ [{job_id}] Upload connection error {attempt+1}: {e}")
74
 
75
+ await asyncio.sleep(3)
76
 
77
  raise Exception("Fatal Error: Failed to upload file to AI Server after 5 attempts.")
78
 
79
  async def process_single_job(payload: TaskPayload):
80
  """
81
+ تابع اصلی پردازش با زمان انتظار ۲۴ ساعته
82
  """
83
  logger.info(f"🟢 [START] Job: {payload.job_id}")
84
 
85
+ # تنظیم تایم‌اوت کلی کلاینت روی None (نامحدود)
86
  async with httpx.AsyncClient(timeout=None, limits=HighPerformanceLimits) as client:
87
  try:
88
  # 1. انتقال فایل‌ها
 
100
  }
101
 
102
  logger.info(f"🚀 [{payload.job_id}] Sending prediction request...")
103
+
104
+ # درخواست ساخت با تایم‌اوت نامحدود
105
+ predict_resp = await client.post(f"{WAN_API_BASE}/call/predict", json=req_data, timeout=None)
106
 
107
  if predict_resp.status_code != 200:
108
  raise Exception(f"Prediction Request Failed: {predict_resp.text}")
 
111
  logger.info(f"⏳ [{payload.job_id}] Queued. Event ID: {event_id}")
112
 
113
  # 3. حلقه انتظار هوشمند (Polling/Streaming Loop)
 
114
  start_time = time.time()
115
  final_video_url = None
116
 
117
+ # انتظار تا ۲۴ ساعت (86400 ثانیه)
118
+ while time.time() - start_time < 86400:
119
  try:
120
  stream_url = f"{WAN_API_BASE}/call/predict/{event_id}"
121
 
122
+ # اتصال به استریم
123
+ # اینجا timeout=60 تنظیم شده تا اگر اتصال قطع شد، برنامه متوجه شود و دوباره وصل شود
124
+ # این تایم‌اوت باعث کنسل شدن کار نمی‌شود، فقط اتصال را رفرش می‌کند
125
+ async with client.stream("GET", stream_url, headers={"Accept": "text/event-stream"}, timeout=60.0) as response:
126
  async for line in response.aiter_lines():
127
  if not line.strip(): continue
128
 
 
130
  try:
131
  data = json.loads(line[6:])
132
 
 
133
  if isinstance(data, list) and len(data) > 0:
134
  res = data[0]
135
  found_url = None
136
 
137
+ # جستجوی لینک ویدیو در فرمت‌های مختلف
138
  if isinstance(res, dict):
 
139
  found_url = res.get("video", {}).get("url")
 
140
  if not found_url: found_url = res.get("url")
 
141
  if not found_url and "name" in res: found_url = f"/file={res['name']}"
142
 
 
143
  elif isinstance(res, str) and ("/file=" in res or ".mp4" in res):
144
  found_url = res
145
 
146
  if found_url:
147
  final_video_url = found_url
148
+ break
149
 
150
  except Exception:
151
+ pass
152
 
153
  if final_video_url:
154
+ break
155
 
 
156
  await asyncio.sleep(5)
157
 
158
  except Exception as e:
159
+ # اگر اینترنت قطع شد یا خطایی رخ داد، ۵ ثانیه صبر کن و دوباره تلاش کن
160
+ # چون حلقه اصلی ۸۶۴۰۰ ثانیه است، این خطاها باعث توقف نمی‌شوند
161
  await asyncio.sleep(5)
162
 
163
  # 4. پایان و ارسال نتیجه
164
  if final_video_url:
 
165
  if final_video_url.startswith("/"):
166
  final_video_url = f"https://wan-ai-wan2-2-s2v.ms.show{final_video_url}"
167
 
 
168
  final_video_url = final_video_url.replace("//file=", "/file=")
169
 
170
  logger.info(f"✅ [DONE] Job: {payload.job_id} -> Video Ready")
171
 
172
+ # ارسال نتیجه به سرور مدیر
173
  await client.post(payload.callback_url, json={
174
  "job_id": payload.job_id,
175
  "status": "COMPLETED",
176
  "video_url": final_video_url
177
  })
178
  else:
179
+ raise Exception("Timeout: Video processing took too long (>24 hours).")
180
 
181
  except Exception as e:
182
  logger.error(f"❌ [FAIL] Job: {payload.job_id} -> {e}")
 
193
  @app.post("/process")
194
  async def accept_task(payload: TaskPayload):
195
  """
196
+ دریافت درخواست و شروع پردازش در پس‌زمینه
 
197
  """
198
  asyncio.create_task(process_single_job(payload))
199
  return {"status": "accepted", "message": "Task started in background"}
200
 
201
  @app.get("/")
202
  async def root():
203
+ return {"status": "Worker Pro Ready", "mode": "Unlimited Timeout (24h)"}