Fred808 commited on
Commit
38c1f9d
·
verified ·
1 Parent(s): 7725164

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +81 -91
app.py CHANGED
@@ -348,26 +348,37 @@ async def upload_captions_to_hf(zip_full_name: str, captions: List[Dict]) -> boo
348
  return False
349
 
350
  async def process_next_file_task():
351
- """Task to process the next file in the list based on the current index."""
352
  global state
353
 
354
- if not state.is_running:
355
- print(f"[{FLOW_ID}] Processing loop is not running. Exiting task.")
356
- return
357
-
358
- while state.is_running:
359
- repo_file_full_path = None
360
- current_index = -1
361
-
362
  async with state_lock:
 
363
  current_index = state.current_index
364
- if current_index >= state.total_files:
 
 
 
 
 
 
 
 
365
  state.status = "Finished processing all files."
366
  state.is_running = False
367
- print(f"[{FLOW_ID}] Reached end of file list. Stopping processing.")
368
- await save_state_to_hf()
369
- break
370
-
 
 
 
 
 
 
371
  repo_file_full_path = state.all_zip_files[current_index]
372
 
373
  if repo_file_full_path in state.processed_files:
@@ -378,18 +389,19 @@ async def process_next_file_task():
378
  await save_state_to_hf()
379
  continue
380
 
381
- # Mark the file as in-progress in the state
382
  state.status = f"Processing file {current_index + 1}/{state.total_files}"
383
  state.current_file = Path(repo_file_full_path).name
384
  state.current_file_progress = "0/0"
385
  await save_state_to_hf()
386
-
387
- # --- Start Processing ---
388
  extract_dir = None
389
  zip_full_name = None
390
- global_success = False
391
 
392
  try:
 
 
393
  download_result = await download_and_extract_zip(repo_file_full_path)
394
 
395
  if download_result is None:
@@ -404,7 +416,14 @@ async def process_next_file_task():
404
 
405
  if not image_paths:
406
  print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
407
- global_success = True
 
 
 
 
 
 
 
408
  else:
409
  # Initialize progress tracker
410
  progress_tracker = {
@@ -415,7 +434,7 @@ async def process_next_file_task():
415
  state.current_file_progress = f"0/{len(image_paths)}"
416
  await save_state_to_hf()
417
 
418
- # Create and run captioning tasks
419
  semaphore = asyncio.Semaphore(len(servers))
420
  async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
421
  async with semaphore:
@@ -426,25 +445,14 @@ async def process_next_file_task():
426
  all_captions = [r for r in results if r is not None]
427
 
428
  # Final progress report
429
- if len(all_captions) == len(image_paths):
430
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Successfully completed all {len(all_captions)} captions.")
431
- global_success = True
432
- else:
433
- print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: Completed with partial result: {len(all_captions)}/{len(image_paths)} captions.")
434
- global_success = False
435
 
436
- # Upload results
437
  if all_captions and zip_full_name:
438
  if await upload_captions_to_hf(zip_full_name, all_captions):
439
- print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}.")
440
- # If upload is successful, we mark the file as processed, regardless of partial success
441
- # The uploaded JSON will reflect the actual number of captions
442
- if global_success:
443
- print(f"[{FLOW_ID}] Fully processed and uploaded: {zip_full_name}")
444
- else:
445
- print(f"[{FLOW_ID}] Partially processed but uploaded: {zip_full_name}. Needs manual review.")
446
-
447
- # Mark as processed only if upload succeeded
448
  async with state_lock:
449
  state.processed_files.add(repo_file_full_path)
450
  state.current_index += 1
@@ -452,42 +460,37 @@ async def process_next_file_task():
452
  state.current_file_progress = "0/0"
453
  state.status = "Idle"
454
  await save_state_to_hf()
455
-
456
  else:
457
- print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}. Will retry this file later.")
458
- # Do NOT increment index or mark as processed, so it will be retried
459
  async with state_lock:
460
- state.status = f"Error uploading captions for {zip_full_name}. Retrying later."
461
  await save_state_to_hf()
462
- # Wait before retrying to avoid immediate re-attempt on a transient error
463
- await asyncio.sleep(60)
464
-
465
  else:
466
- print(f"[{FLOW_ID}] No captions generated or zip_full_name is missing. Skipping upload for {zip_full_name}. Will retry later.")
467
- # Do NOT increment index or mark as processed
468
  async with state_lock:
469
- state.status = f"No captions generated for {zip_full_name}. Retrying later."
470
  await save_state_to_hf()
471
- await asyncio.sleep(60)
472
 
473
  except Exception as e:
474
  error_message = str(e)
475
- print(f"[{FLOW_ID}] Critical error in process_next_file_task for {repo_file_full_path}: {error_message}")
476
  async with state_lock:
477
- state.status = f"CRITICAL ERROR for {Path(repo_file_full_path).name}. Retrying later. Error: {error_message[:50]}..."
478
  await save_state_to_hf()
479
- # Wait before retrying
480
- await asyncio.sleep(60)
481
 
482
  finally:
483
  # Cleanup temporary files
484
  if extract_dir and extract_dir.exists():
485
- print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
486
- shutil.rmtree(extract_dir, ignore_errors=True)
487
-
488
- # If the loop is still running, wait a short time before checking for the next file
489
- if state.is_running:
490
- await asyncio.sleep(5)
491
 
492
  # --- FastAPI App and Endpoints ---
493
 
@@ -509,47 +512,24 @@ async def startup_event():
509
 
510
  async def initialize_after_startup():
511
  """Initialize app components after server has started"""
512
- global state
513
-
514
  try:
515
- print(f"[{FLOW_ID}] Starting initialization...")
 
516
 
517
- # Load state with timeout
518
- try:
519
- print(f"[{FLOW_ID}] Loading state from HF...")
520
- await asyncio.wait_for(load_state_from_hf(), timeout=30.0)
521
- print(f"[{FLOW_ID}] State loaded successfully")
522
- except asyncio.TimeoutError:
523
- print("WARNING: Timeout loading state from HF, using default state")
524
- except Exception as e:
525
- print(f"WARNING: Error loading state from HF: {e}, using default state")
526
-
527
- # Update file list with timeout
528
- try:
529
- print(f"[{FLOW_ID}] Updating file list...")
530
- await asyncio.wait_for(update_file_list(), timeout=60.0)
531
- print(f"[{FLOW_ID}] File list updated successfully")
532
- except asyncio.TimeoutError:
533
- print("WARNING: Timeout updating file list, using existing state")
534
- except Exception as e:
535
- print(f"WARNING: Error updating file list: {e}, using existing state")
536
 
537
- # Start processing if needed
538
  async with state_lock:
539
  if state.current_index < state.total_files:
540
  state.is_running = True
541
- asyncio.create_task(process_next_file_task())
542
- print(f"[{FLOW_ID}] Started processing from index {state.current_index}/{state.total_files}")
543
- else:
544
- state.is_running = False
545
- print(f"[{FLOW_ID}] All files processed. Starting in Idle mode.")
546
 
547
  except Exception as e:
548
- print(f"[{FLOW_ID}] Critical error during initialization: {e}")
549
  import traceback
550
  traceback.print_exc()
551
- async with state_lock:
552
- state.is_running = False
553
 
554
 
555
  @app.get("/", response_class=HTMLResponse)
@@ -634,21 +614,31 @@ async def control_processing(request: Request, background_tasks: BackgroundTasks
634
 
635
  async with state_lock:
636
  if action == "start":
637
- if not state.is_running and state.current_index < state.total_files:
 
 
 
 
 
638
  state.is_running = True
639
  state.status = "Processing started."
 
 
640
  background_tasks.add_task(process_next_file_task)
641
  await save_state_to_hf()
 
 
642
  return {"status": "success", "message": "Processing loop started."}
643
- elif state.current_index >= state.total_files:
644
- return {"status": "error", "message": "Cannot start. All files have been processed."}
645
  else:
646
  return {"status": "info", "message": "Processing is already running."}
 
647
  elif action == "stop":
648
  if state.is_running:
649
  state.is_running = False
650
  state.status = "Processing stopped by user."
651
  await save_state_to_hf()
 
 
652
  return {"status": "success", "message": "Processing loop stopped."}
653
  else:
654
  return {"status": "info", "message": "Processing is already stopped."}
 
348
  return False
349
 
350
  async def process_next_file_task():
351
+ """Continuous task to process files based on the current index."""
352
  global state
353
 
354
+ print(f"[{FLOW_ID}] Processing task started. Running: {state.is_running}")
355
+
356
+ while True:
357
+ # Check if we should be running
 
 
 
 
358
  async with state_lock:
359
+ should_run = state.is_running
360
  current_index = state.current_index
361
+
362
+ if not should_run:
363
+ # Wait a bit and check again
364
+ await asyncio.sleep(2)
365
+ continue
366
+
367
+ # Check if we have files to process
368
+ if current_index >= state.total_files:
369
+ async with state_lock:
370
  state.status = "Finished processing all files."
371
  state.is_running = False
372
+ state.current_file = None
373
+ state.current_file_progress = "0/0"
374
+ print(f"[{FLOW_ID}] Reached end of file list. Stopping processing.")
375
+ await save_state_to_hf()
376
+ await asyncio.sleep(2)
377
+ continue
378
+
379
+ # Process the current file
380
+ repo_file_full_path = None
381
+ async with state_lock:
382
  repo_file_full_path = state.all_zip_files[current_index]
383
 
384
  if repo_file_full_path in state.processed_files:
 
389
  await save_state_to_hf()
390
  continue
391
 
392
+ # Mark the file as in-progress
393
  state.status = f"Processing file {current_index + 1}/{state.total_files}"
394
  state.current_file = Path(repo_file_full_path).name
395
  state.current_file_progress = "0/0"
396
  await save_state_to_hf()
397
+
398
+ # --- Process the file ---
399
  extract_dir = None
400
  zip_full_name = None
 
401
 
402
  try:
403
+ print(f"[{FLOW_ID}] Processing: {repo_file_full_path}")
404
+
405
  download_result = await download_and_extract_zip(repo_file_full_path)
406
 
407
  if download_result is None:
 
416
 
417
  if not image_paths:
418
  print(f"[{FLOW_ID}] No images found in {zip_full_name}. Marking as complete.")
419
+ # Mark as processed and move to next
420
+ async with state_lock:
421
+ state.processed_files.add(repo_file_full_path)
422
+ state.current_index += 1
423
+ state.current_file = None
424
+ state.current_file_progress = "0/0"
425
+ state.status = "Idle"
426
+ await save_state_to_hf()
427
  else:
428
  # Initialize progress tracker
429
  progress_tracker = {
 
434
  state.current_file_progress = f"0/{len(image_paths)}"
435
  await save_state_to_hf()
436
 
437
+ # Process images
438
  semaphore = asyncio.Semaphore(len(servers))
439
  async def limited_send_image_for_captioning(image_path, course_name, progress_tracker):
440
  async with semaphore:
 
445
  all_captions = [r for r in results if r is not None]
446
 
447
  # Final progress report
448
+ success_rate = len(all_captions) / len(image_paths)
449
+ print(f"[{FLOW_ID}] FINAL PROGRESS for {zip_full_name}: {len(all_captions)}/{len(image_paths)} captions ({success_rate:.1%})")
 
 
 
 
450
 
451
+ # Upload results if we have any captions
452
  if all_captions and zip_full_name:
453
  if await upload_captions_to_hf(zip_full_name, all_captions):
454
+ print(f"[{FLOW_ID}] Successfully uploaded captions for {zip_full_name}")
455
+ # Mark as processed regardless of partial success
 
 
 
 
 
 
 
456
  async with state_lock:
457
  state.processed_files.add(repo_file_full_path)
458
  state.current_index += 1
 
460
  state.current_file_progress = "0/0"
461
  state.status = "Idle"
462
  await save_state_to_hf()
 
463
  else:
464
+ print(f"[{FLOW_ID}] Failed to upload captions for {zip_full_name}. Will retry.")
465
+ # Don't increment index, will retry this file
466
  async with state_lock:
467
+ state.status = f"Upload failed for {zip_full_name}. Retrying later."
468
  await save_state_to_hf()
469
+ await asyncio.sleep(30) # Wait before retry
 
 
470
  else:
471
+ print(f"[{FLOW_ID}] No captions generated for {zip_full_name}. Will retry.")
472
+ # Don't increment index, will retry this file
473
  async with state_lock:
474
+ state.status = f"No captions for {zip_full_name}. Retrying later."
475
  await save_state_to_hf()
476
+ await asyncio.sleep(30) # Wait before retry
477
 
478
  except Exception as e:
479
  error_message = str(e)
480
+ print(f"[{FLOW_ID}] Error processing {repo_file_full_path}: {error_message}")
481
  async with state_lock:
482
+ state.status = f"Error processing {Path(repo_file_full_path).name}: {error_message[:100]}..."
483
  await save_state_to_hf()
484
+ await asyncio.sleep(30) # Wait before retry
 
485
 
486
  finally:
487
  # Cleanup temporary files
488
  if extract_dir and extract_dir.exists():
489
+ try:
490
+ shutil.rmtree(extract_dir, ignore_errors=True)
491
+ print(f"[{FLOW_ID}] Cleaned up temporary directory {extract_dir}.")
492
+ except Exception as e:
493
+ print(f"[{FLOW_ID}] Error cleaning up {extract_dir}: {e}")
 
494
 
495
  # --- FastAPI App and Endpoints ---
496
 
 
512
 
513
  async def initialize_after_startup():
514
  """Initialize app components after server has started"""
 
 
515
  try:
516
+ await load_state_from_hf()
517
+ await update_file_list()
518
 
519
+ # Always start the processing task, but it will check is_running flag
520
+ asyncio.create_task(process_next_file_task())
521
+ print(f"[{FLOW_ID}] Processing task created. Current running state: {state.is_running}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
522
 
523
+ # Auto-start if we have files to process
524
  async with state_lock:
525
  if state.current_index < state.total_files:
526
  state.is_running = True
527
+ print(f"[{FLOW_ID}] Auto-starting processing from index {state.current_index}")
 
 
 
 
528
 
529
  except Exception as e:
530
+ print(f"[{FLOW_ID}] Error during initialization: {e}")
531
  import traceback
532
  traceback.print_exc()
 
 
533
 
534
 
535
  @app.get("/", response_class=HTMLResponse)
 
614
 
615
  async with state_lock:
616
  if action == "start":
617
+ if not state.is_running:
618
+ # Reset state if we're at the end
619
+ if state.current_index >= state.total_files:
620
+ state.current_index = 0
621
+ state.status = "Reset to start and processing..."
622
+
623
  state.is_running = True
624
  state.status = "Processing started."
625
+
626
+ # Start the processing task
627
  background_tasks.add_task(process_next_file_task)
628
  await save_state_to_hf()
629
+
630
+ print(f"[{FLOW_ID}] Processing manually started from index {state.current_index}")
631
  return {"status": "success", "message": "Processing loop started."}
 
 
632
  else:
633
  return {"status": "info", "message": "Processing is already running."}
634
+
635
  elif action == "stop":
636
  if state.is_running:
637
  state.is_running = False
638
  state.status = "Processing stopped by user."
639
  await save_state_to_hf()
640
+
641
+ print(f"[{FLOW_ID}] Processing manually stopped")
642
  return {"status": "success", "message": "Processing loop stopped."}
643
  else:
644
  return {"status": "info", "message": "Processing is already stopped."}