Samfredoly commited on
Commit
0735339
·
verified ·
1 Parent(s): a7dc608

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +72 -71
app.py CHANGED
@@ -492,88 +492,89 @@ async def process_batch_dynamic(wav_files: List[str], start_batch_index: int, ba
492
  wav_path.unlink()
493
  except Exception as e:
494
  print(f"[{FLOW_ID}] Error in process_batch_dynamic: {e}")
495
-
496
- return batch_end, uploaded_count
497
 
498
- async def process_dataset_task(start_index: int):
499
- """Main task to process the dataset using dynamic server assignment."""
 
 
500
 
501
- # Load both local progress and HF state
502
- progress = load_progress()
503
- current_state = await download_hf_state()
504
- file_list = await get_audio_file_list(progress)
505
 
506
- if not file_list:
507
- print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
508
- return False
509
 
510
- # Ensure start_index is within bounds
511
- if start_index > len(file_list):
512
- print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
513
- return True
514
 
515
- # Determine the actual starting index in the 0-indexed list
516
- start_list_index = start_index - 1
517
 
518
- print(f"[{FLOW_ID}] Starting audio transcription from file index: {start_index} out of {len(file_list)}.")
519
- print(f"[{FLOW_ID}] Using {len(servers)} Whisper servers for dynamic processing.")
520
- print(f"[{FLOW_ID}] Upload pause enabled: {UPLOAD_PAUSE_ENABLED}, Max uploads before pause: {MAX_UPLOADS_BEFORE_PAUSE}")
521
 
522
- # Initialize progress tracking
523
- if 'uploaded_count' not in progress:
524
- progress['uploaded_count'] = 0
525
 
526
- # If there was no HF state in the repo, upload a fresh initial state file
527
- try:
528
- if not current_state.get("file_states") and current_state.get("next_download_index", 0) == 0:
529
- print(f"[{FLOW_ID}] No HF state detected; uploading initial state file to {HF_OUTPUT_DATASET_ID}...")
530
- # Ensure structure
531
- current_state.setdefault("file_states", {})
532
- current_state.setdefault("next_download_index", 0)
533
- if await upload_hf_state(current_state):
534
- print(f"[{FLOW_ID}] ✅ Initial HF state uploaded.")
535
- else:
536
- print(f"[{FLOW_ID}] ❌ Failed to upload initial HF state.")
537
- except Exception as e:
538
- print(f"[{FLOW_ID}] Error while uploading initial HF state: {e}")
539
- global_success = True
540
- current_batch_index = start_list_index
541
- batch_size = len(servers) # Batch size = number of servers (20 files per batch)
542
 
543
- try:
544
- while current_batch_index < len(file_list):
545
- # Process a batch dynamically
546
- next_index, uploaded_count = await process_batch_dynamic(
547
- file_list,
548
- current_batch_index,
549
- batch_size,
550
- current_state,
551
- progress
552
- )
553
-
554
- # Update progress
555
- progress['last_processed_index'] = next_index
556
- progress['uploaded_count'] = uploaded_count
557
- save_progress(progress)
558
-
559
- # Update current batch index
560
- current_batch_index = next_index
561
-
562
- # Log statistics
563
- print(f"[{FLOW_ID}] Batch complete. Progress: {current_batch_index}/{len(file_list)}, Uploaded: {uploaded_count}")
564
-
565
- # Print server statistics
566
- print(f"[{FLOW_ID}] Server Statistics:")
567
- for i, server in enumerate(servers):
568
- print(f" Server {i+1}: {server.total_processed} files, {server.total_time:.2f}s total, {server.fps:.2f} files/sec")
569
 
570
- print(f"[{FLOW_ID}] All files processed successfully!")
571
- return True
572
 
573
- except Exception as e:
574
- print(f"[{FLOW_ID}] Critical error in process_dataset_task: {e}")
575
- global_success = False
576
- return global_success
 
 
 
 
 
 
 
 
 
 
 
577
 
578
  # --- FastAPI App and Endpoints ---
579
 
 
492
  wav_path.unlink()
493
  except Exception as e:
494
  print(f"[{FLOW_ID}] Error in process_batch_dynamic: {e}")
 
 
495
 
496
+ return batch_end, uploaded_count
497
+
498
+ async def process_dataset_task(start_index: int):
499
+ """Main task to process the dataset using dynamic server assignment."""
500
 
501
+ # Load both local progress and HF state
502
+ progress = load_progress()
503
+ current_state = await download_hf_state()
504
+ file_list = await get_audio_file_list(progress)
505
 
506
+ if not file_list:
507
+ print(f"[{FLOW_ID}] ERROR: Cannot proceed. File list is empty.")
508
+ return False
509
 
510
+ # Ensure start_index is within bounds
511
+ if start_index > len(file_list):
512
+ print(f"[{FLOW_ID}] WARNING: Start index {start_index} is greater than the total number of files ({len(file_list)}). Exiting.")
513
+ return True
514
 
515
+ # Determine the actual starting index in the 0-indexed list
516
+ start_list_index = start_index - 1
517
 
518
+ print(f"[{FLOW_ID}] Starting audio transcription from file index: {start_index} out of {len(file_list)}.")
519
+ print(f"[{FLOW_ID}] Using {len(servers)} Whisper servers for dynamic processing.")
520
+ print(f"[{FLOW_ID}] Upload pause enabled: {UPLOAD_PAUSE_ENABLED}, Max uploads before pause: {MAX_UPLOADS_BEFORE_PAUSE}")
521
 
522
+ # Initialize progress tracking
523
+ if 'uploaded_count' not in progress:
524
+ progress['uploaded_count'] = 0
525
 
526
+ # If there was no HF state in the repo, upload a fresh initial state file
527
+ try:
528
+ if not current_state.get("file_states") and current_state.get("next_download_index", 0) == 0:
529
+ print(f"[{FLOW_ID}] No HF state detected; uploading initial state file to {HF_OUTPUT_DATASET_ID}...")
530
+ # Ensure structure
531
+ current_state.setdefault("file_states", {})
532
+ current_state.setdefault("next_download_index", 0)
533
+ if await upload_hf_state(current_state):
534
+ print(f"[{FLOW_ID}] ✅ Initial HF state uploaded.")
535
+ else:
536
+ print(f"[{FLOW_ID}] ❌ Failed to upload initial HF state.")
537
+ except Exception as e:
538
+ print(f"[{FLOW_ID}] Error while uploading initial HF state: {e}")
 
 
 
539
 
540
+ global_success = True
541
+ current_batch_index = start_list_index
542
+ batch_size = len(servers) # Batch size = number of servers (20 files per batch)
543
+
544
+ try:
545
+ while current_batch_index < len(file_list):
546
+ # Process a batch dynamically
547
+ next_index, uploaded_count = await process_batch_dynamic(
548
+ file_list,
549
+ current_batch_index,
550
+ batch_size,
551
+ current_state,
552
+ progress
553
+ )
554
+
555
+ # Update progress
556
+ progress['last_processed_index'] = next_index
557
+ progress['uploaded_count'] = uploaded_count
558
+ save_progress(progress)
 
 
 
 
 
 
 
559
 
560
+ # Update current batch index
561
+ current_batch_index = next_index
562
 
563
+ # Log statistics
564
+ print(f"[{FLOW_ID}] Batch complete. Progress: {current_batch_index}/{len(file_list)}, Uploaded: {uploaded_count}")
565
+
566
+ # Print server statistics
567
+ print(f"[{FLOW_ID}] Server Statistics:")
568
+ for i, server in enumerate(servers):
569
+ print(f" Server {i+1}: {server.total_processed} files, {server.total_time:.2f}s total, {server.fps:.2f} files/sec")
570
+
571
+ print(f"[{FLOW_ID}] All files processed successfully!")
572
+ return True
573
+
574
+ except Exception as e:
575
+ print(f"[{FLOW_ID}] Critical error in process_dataset_task: {e}")
576
+ global_success = False
577
+ return global_success
578
 
579
  # --- FastAPI App and Endpoints ---
580