Fred808 commited on
Commit
2a6cacc
·
verified ·
1 Parent(s): f42f03e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +125 -88
app.py CHANGED
@@ -188,7 +188,7 @@ async def send_image_for_captioning(image_path: Path, course_name: str, progress
188
  print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
189
  return None
190
 
191
- async def download_and_extract_zip(course_name: str) -> Optional[tuple[Path, str]]:
192
  """Downloads the zip file for the course and extracts its contents."""
193
  print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...")
194
 
@@ -211,12 +211,18 @@ async def download_and_extract_zip(course_name: str) -> Optional[tuple[Path, str
211
  print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.")
212
  return None, None
213
 
214
- # Use the first matching file (you could modify this to process all matches)
215
- repo_file_full_path = matching_files[0] # e.g., frames/DAREEFSA_full_name.zip
 
 
 
 
 
 
216
 
217
  # Extract the full file name from the path (e.g., DAREEFSA_full_name.zip)
218
  zip_full_name = Path(repo_file_full_path).name
219
- print(f"[{FLOW_ID}] Found matching file: {repo_file_full_path}. Full name: {zip_full_name}")
220
 
221
  # Use hf_hub_download to get the file path
222
  zip_path = hf_hub_download(
@@ -237,12 +243,12 @@ async def download_and_extract_zip(course_name: str) -> Optional[tuple[Path, str
237
 
238
  print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
239
 
240
- # Return the extraction directory and the full zip file name
241
- return extract_dir, zip_full_name
242
 
243
  except Exception as e:
244
  print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}")
245
- return None, None
246
 
247
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
248
  """Uploads the final captions JSON file to the output dataset.
@@ -275,99 +281,130 @@ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> boo
275
  return False
276
 
277
  async def process_course_task(course_name: str):
278
- """Main task to process a single course."""
279
- print(f"[{FLOW_ID}] Starting processing for course: {course_name}")
280
 
281
- extract_dir = None
282
- zip_full_name = None
283
- success = False
284
- error_message = None
285
- all_captions = []
286
 
287
- try:
288
- # download_and_extract_zip now returns a tuple: (extract_dir, zip_full_name)
289
- download_result = await download_and_extract_zip(course_name)
 
 
290
 
291
- if download_result is None or download_result[0] is None:
292
- raise Exception("Failed to download or extract zip file.")
 
293
 
294
- extract_dir, zip_full_name = download_result
 
 
 
 
 
 
 
 
 
295
 
296
- # FIX: Use recursive glob to find images in subdirectories
297
- image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
298
- print(f"[{FLOW_ID}] Found {len(image_paths)} images to process.")
299
-
300
- if not image_paths:
301
- print(f"[{FLOW_ID}] No images found in {course_name}. Marking as complete.")
302
- success = True
303
- else:
304
- # Initialize progress tracker
305
- progress_tracker = {
306
- 'total': len(image_paths),
307
- 'completed': 0
308
- }
309
- print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images...")
310
 
311
- # Create a semaphore to limit concurrent tasks to the number of available servers
312
- # This ensures we only launch as many image processing tasks as we have servers.
313
- semaphore = asyncio.Semaphore(len(servers))
314
 
315
- async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
316
- async with semaphore:
317
- return await send_image_for_captioning(image_path, course_name, progress_tracker)
318
 
319
- # Create a list of tasks for parallel captioning
320
- caption_tasks = []
321
- for image_path in image_paths:
322
- caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker))
323
-
324
- # Run all captioning tasks concurrently
325
- results = await asyncio.gather(*caption_tasks)
326
 
327
- # Filter out failed results
328
- all_captions = [r for r in results if r is not None]
329
-
330
- # Final progress report
331
- if len(all_captions) == len(image_paths):
332
- print(f"[{FLOW_ID}] FINAL PROGRESS: Successfully completed all {len(all_captions)} captions.")
333
- success = True
334
  else:
335
- print(f"[{FLOW_ID}] FINAL PROGRESS: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
336
- success = False
337
-
338
- # Upload results
339
- if all_captions and zip_full_name:
340
- # Use the full zip file name for the upload as requested
341
- print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
342
- if await upload_captions_to_hf(zip_full_name, all_captions):
343
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
344
- success = True # Mark as success if upload is successful
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
345
  else:
346
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
347
- success = False
348
- else:
349
- print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload.")
350
- success = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
351
 
352
- except Exception as e:
353
- error_message = str(e)
354
- print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}")
355
- finally:
356
- # Cleanup temporary files
357
- if extract_dir and extract_dir.exists():
358
- print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
359
- import shutil
360
- shutil.rmtree(extract_dir, ignore_errors=True)
361
-
362
- # Report completion to manager
363
- # The original code for this part is missing but is implied by the user's output
364
- # For now, we'll just print a completion message
365
- print(f"[{FLOW_ID}] Reporting completion for {course_name} (Success: {success})...")
366
-
367
- # In a real scenario, you would call the manager endpoint here
368
- # await report_completion_to_manager(course_name, success, error_message)
369
-
370
- return success
 
 
 
 
 
 
 
 
 
 
371
 
372
  async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None):
373
  """Reports the task result back to the Manager Server."""
 
188
  print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {image_path.name}.")
189
  return None
190
 
191
+ async def download_and_extract_zip(course_name: str, processed_files: Set[str]) -> Optional[tuple[Path, str, str]]:
192
  """Downloads the zip file for the course and extracts its contents."""
193
  print(f"[{FLOW_ID}] Looking for files starting with '{course_name}' in frames/ directory...")
194
 
 
211
  print(f"[{FLOW_ID}] No zip files found starting with '{course_name}' in frames/ directory.")
212
  return None, None
213
 
214
+ # Filter out already processed files and select the first one
215
+ unprocessed_files = [f for f in matching_files if f not in processed_files]
216
+
217
+ if not unprocessed_files:
218
+ print(f"[{FLOW_ID}] No new zip files found for '{course_name}'.")
219
+ return None, None, None
220
+
221
+ repo_file_full_path = unprocessed_files[0] # e.g., frames/DAREEFSA_full_name.zip
222
 
223
  # Extract the full file name from the path (e.g., DAREEFSA_full_name.zip)
224
  zip_full_name = Path(repo_file_full_path).name
225
+ print(f"[{FLOW_ID}] Found new matching file: {repo_file_full_path}. Full name: {zip_full_name}")
226
 
227
  # Use hf_hub_download to get the file path
228
  zip_path = hf_hub_download(
 
243
 
244
  print(f"[{FLOW_ID}] Extraction complete to {extract_dir}.")
245
 
246
+ # Return the extraction directory, the full zip file name, and the repo path
247
+ return extract_dir, zip_full_name, repo_file_full_path
248
 
249
  except Exception as e:
250
  print(f"[{FLOW_ID}] Error downloading or extracting zip for {course_name}: {e}")
251
+ return None, None, None
252
 
253
  async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> bool:
254
  """Uploads the final captions JSON file to the output dataset.
 
281
  return False
282
 
283
  async def process_course_task(course_name: str):
284
+ """Main task to process a single course, looping until all files are processed."""
285
+ print(f"[{FLOW_ID}] Starting continuous processing for course: {course_name}")
286
 
287
+ processed_files = set()
288
+ all_processed_files_log = []
289
+ global_success = True
 
 
290
 
291
+ # Loop to continuously check for new files matching the course_name prefix
292
+ while True:
293
+ extract_dir = None
294
+ zip_full_name = None
295
+ repo_file_full_path = None
296
 
297
+ try:
298
+ # download_and_extract_zip now returns a tuple: (extract_dir, zip_full_name, repo_file_full_path)
299
+ download_result = await download_and_extract_zip(course_name, processed_files)
300
 
301
+ if download_result is None or download_result[0] is None:
302
+ # No new files found, or an error occurred during search/download
303
+ if download_result is not None and download_result[0] is None and download_result[1] is None:
304
+ print(f"[{FLOW_ID}] No new files found for {course_name}. Exiting loop.")
305
+ break
306
+ else:
307
+ # An error occurred during search/download
308
+ raise Exception("Failed to download or extract zip file.")
309
+
310
+ extract_dir, zip_full_name, repo_file_full_path = download_result
311
 
312
+ # Add the file to the processed set immediately to avoid re-processing in the next loop
313
+ processed_files.add(repo_file_full_path)
314
+ all_processed_files_log.append(repo_file_full_path)
 
 
 
 
 
 
 
 
 
 
 
315
 
316
+ # --- Start Processing the single file ---
 
 
317
 
318
+ # FIX: Use recursive glob to find images in subdirectories
319
+ image_paths = [p for p in extract_dir.glob("**/*") if p.is_file() and p.suffix.lower() in ['.jpg', '.jpeg', '.png']]
320
+ print(f"[{FLOW_ID}] Found {len(image_paths)} images to process in {zip_full_name}.")
321
 
322
+ current_file_success = False
 
 
 
 
 
 
323
 
324
+ if not image_paths:
325
+ print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
326
+ current_file_success = True
 
 
 
 
327
  else:
328
+ # Initialize progress tracker
329
+ progress_tracker = {
330
+ 'total': len(image_paths),
331
+ 'completed': 0
332
+ }
333
+ print(f"[{FLOW_ID}] Starting captioning for {progress_tracker['total']} images in {zip_full_name}...")
334
+
335
+ # Create a semaphore to limit concurrent tasks to the number of available servers
336
+ semaphore = asyncio.Semaphore(len(servers))
337
+
338
+ async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
339
+ async with semaphore:
340
+ return await send_image_for_captioning(image_path, course_name, progress_tracker)
341
+
342
+ # Create a list of tasks for parallel captioning
343
+ caption_tasks = []
344
+ for image_path in image_paths:
345
+ caption_tasks.append(limited_send_image_for_captioning(image_path, course_name, progress_tracker))
346
+
347
+ # Run all captioning tasks concurrently
348
+ results = await asyncio.gather(*caption_tasks)
349
+
350
+ # Filter out failed results
351
+ all_captions = [r for r in results if r is not None]
352
+
353
+ # Final progress report for the current file
354
+ if len(all_captions) == len(image_paths):
355
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
356
+ current_file_success = True
357
  else:
358
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
359
+ current_file_success = False
360
+
361
+ # Upload results
362
+ if all_captions and zip_full_name:
363
+ # Use the full zip file name for the upload as requested
364
+ print(f"[{FLOW_ID}] Uploading {len(all_captions)} captions for {zip_full_name}...")
365
+ if await upload_captions_to_hf(zip_full_name, all_captions):
366
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
367
+ # If partial success, we still upload, but the overall task is marked as failure if any file failed
368
+ if not current_file_success:
369
+ global_success = False
370
+ else:
371
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}.")
372
+ current_file_success = False
373
+ global_success = False
374
+ else:
375
+ print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}.")
376
+ current_file_success = False
377
+ global_success = False
378
 
379
+ # --- End Processing the single file ---
380
+
381
+ except Exception as e:
382
+ error_message = str(e)
383
+ print(f"[{FLOW_ID}] Critical error in process_course_task for {course_name}: {error_message}")
384
+ global_success = False
385
+
386
+ finally:
387
+ # Cleanup temporary files for the current file
388
+ if extract_dir and extract_dir.exists():
389
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
390
+ import shutil
391
+ shutil.rmtree(extract_dir, ignore_errors=True)
392
+
393
+ # If an unrecoverable error occurred (e.g., during search/download), break the loop
394
+ if download_result is None and extract_dir is None:
395
+ break
396
+
397
+ # --- Final Report after the loop is complete ---
398
+ print(f"[{FLOW_ID}] All processing loops complete for {course_name}.")
399
+ print(f"[{FLOW_ID}] Total files processed: {len(all_processed_files_log)}")
400
+ print(f"[{FLOW_ID}] List of processed files: {all_processed_files_log}")
401
+
402
+ # Report completion to manager
403
+ final_error_message = error_message if not global_success else None
404
+ # Assuming report_completion exists and is an async function
405
+ # await report_completion(course_name, global_success, final_error_message)
406
+
407
+ return global_success
408
 
409
  async def report_completion(course_name: str, success: bool, error_message: Optional[str] = None):
410
  """Reports the task result back to the Manager Server."""