Samfredoly commited on
Commit
963cadd
·
verified ·
1 Parent(s): 1b81a5c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +167 -332
app.py CHANGED
@@ -3,12 +3,10 @@ import json
3
  import time
4
  import asyncio
5
  import aiohttp
6
- # import zipfile # Removed as zipping is no longer required
7
  import shutil
8
- from collections import deque
9
- from collections import deque
10
  import threading
11
- from typing import Dict, List, Set, Optional, Tuple, Any, Deque
12
  from urllib.parse import quote
13
  from datetime import datetime
14
  from pathlib import Path
@@ -63,8 +61,7 @@ WHISPER_SERVERS = [
63
  ]
64
 
65
  MODEL_TYPE = "whisper-small"
66
- # ZIP_UPLOAD_THRESHOLD = 100 # Upload and zip after this many transcriptions - REMOVED per user request
67
- MAX_UPLOADS_PER_HOUR = 120 # User requested rate limit
68
 
69
  # Temporary storage for audio files
70
  TEMP_DIR = Path(f"temp_audio_{FLOW_ID}")
@@ -119,38 +116,6 @@ def save_progress(progress_data: Dict):
119
  except Exception as e:
120
  print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
121
 
122
- # Rate Limiting State
123
- upload_timestamps: Deque[float] = deque()
124
- upload_lock = asyncio.Lock()
125
-
126
- def check_rate_limit() -> bool:
127
- """Checks if the upload rate limit has been reached."""
128
- global upload_timestamps
129
-
130
- # Remove timestamps older than 1 hour (3600 seconds)
131
- one_hour_ago = time.time() - 3600
132
- while upload_timestamps and upload_timestamps[0] < one_hour_ago:
133
- upload_timestamps.popleft()
134
-
135
- return len(upload_timestamps) < MAX_UPLOADS_PER_HOUR
136
-
137
- async def record_upload():
138
- """Records a successful upload and enforces the rate limit wait."""
139
- global upload_timestamps
140
- async with upload_lock:
141
- upload_timestamps.append(time.time())
142
- # Wait for the next available slot if the limit was just hit
143
- if len(upload_timestamps) > MAX_UPLOADS_PER_HOUR:
144
- time_to_wait = upload_timestamps[0] + 3600 - time.time()
145
- if time_to_wait > 0:
146
- print(f"[{FLOW_ID}] Rate limit hit. Waiting for {time_to_wait:.2f} seconds.")
147
- await asyncio.sleep(time_to_wait)
148
- # After waiting, the oldest timestamp should be outside the window
149
- one_hour_ago = time.time() - 3600
150
- while upload_timestamps and upload_timestamps[0] < one_hour_ago:
151
- upload_timestamps.popleft()
152
- print(f"[{FLOW_ID}] Upload recorded. Current count in last hour: {len(upload_timestamps)}/{MAX_UPLOADS_PER_HOUR}")
153
-
154
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
155
  """Load state from JSON file with migration logic for new structure."""
156
  if os.path.exists(file_path):
@@ -369,42 +334,43 @@ async def download_audio_file(file_index: int, repo_file_full_path: str) -> Opti
369
  print(f"[{FLOW_ID}] Error downloading audio file {repo_file_full_path}: {e}")
370
  return None
371
 
372
- async def upload_single_transcription(file_path: Path) -> bool:
373
- """Uploads a single transcription JSON file to the dataset."""
374
- if not file_path.exists():
375
- print(f"[{FLOW_ID}] File not found for upload: {file_path.name}")
376
  return False
377
 
378
- # 1. Check Rate Limit
379
- if not check_rate_limit():
380
- print(f"[{FLOW_ID}] 🛑 Rate limit of {MAX_UPLOADS_PER_HOUR} uploads/hour reached. Waiting for next hour slot.")
381
- # The main loop will handle the wait by checking the limit again
382
- return False
383
-
384
  try:
385
- print(f"[{FLOW_ID}] 📤 Uploading transcription file: {file_path.name} to {HF_OUTPUT_DATASET_ID}...")
 
 
 
 
 
 
 
 
 
 
386
 
387
  api = HfApi(token=HF_TOKEN)
388
  api.upload_file(
389
- path_or_fileobj=str(file_path),
390
- path_in_repo=file_path.name,
391
  repo_id=HF_OUTPUT_DATASET_ID,
392
  repo_type="dataset",
393
- commit_message=f"[{FLOW_ID}] Upload transcription: {file_path.name}"
394
  )
395
 
396
- print(f"[{FLOW_ID}] ✅ Successfully uploaded: {file_path.name}")
397
 
398
- # 2. Record successful upload
399
- await record_upload()
400
-
401
- # 3. Cleanup local file
402
- os.remove(file_path)
403
 
404
  return True
405
 
406
  except Exception as e:
407
- print(f"[{FLOW_ID}] Error uploading transcription: {e}")
408
  return False
409
 
410
  # --- Core Processing Functions ---
@@ -538,9 +504,7 @@ async def process_audio_files(background_tasks: BackgroundTasks):
538
  }
539
 
540
  async def process_audio_files_background():
541
- """Background task that processes audio files with reference mapping using parallel batch distribution and atomic state updates."""
542
-
543
- # 1. Load initial state and data
544
  progress_data = load_progress()
545
  reference_map = progress_data.get('reference_map', {})
546
 
@@ -555,104 +519,49 @@ async def process_audio_files_background():
555
  if not audio_files:
556
  print(f"[{FLOW_ID}] No audio files found. Exiting.")
557
  return
558
-
559
  # Dynamic batch size: one file per server
560
  BATCH_SIZE = len(servers)
561
  print(f"[{FLOW_ID}] 📊 Configuration: {len(servers)} Whisper server(s) → Batch size: {BATCH_SIZE} (1 file per server)")
562
 
563
- # --- Core Processing Loop ---
 
 
564
 
565
- while True:
566
- # Download the latest state from Hugging Face at the start of each batch attempt
567
- hf_state = await download_hf_state()
568
- current_index = hf_state['next_download_index']
569
-
570
- if current_index >= len(audio_files):
571
- print(f"[{FLOW_ID}] All {len(audio_files)} files processed. Waiting for new files...")
572
- await asyncio.sleep(300) # Wait 5 minutes before checking again
573
- # Re-fetch file list in case new files were added
574
- audio_files = await get_audio_file_list(progress_data)
575
- continue
576
-
577
- batch_end = min(current_index + BATCH_SIZE, len(audio_files))
578
- batch_files_full_path = audio_files[current_index:batch_end]
579
-
580
- if not batch_files_full_path:
581
- await asyncio.sleep(10)
582
- continue
583
-
584
- print(f"\n[{FLOW_ID}] 📦 BATCH: Attempting to process files #{current_index}-#{batch_end-1} ({len(batch_files_full_path)} files)")
585
-
586
- # 1. ATOMIC LOCK: Attempt to lock all files in the batch
587
- # We will lock the files sequentially, and if any fail (already locked by another server),
588
- # we will unlock all files locked in this attempt and restart the loop.
589
 
590
- locked_files = []
591
- lock_succeeded = True
592
 
593
- # Re-download state to ensure we have the latest before locking
594
- hf_state = await download_hf_state()
 
 
 
 
595
 
596
- for audio_file_full_path in batch_files_full_path:
597
- audio_filename = Path(audio_file_full_path).name
598
-
599
- # Check if already processed or processing
600
- file_state = hf_state['file_states'].get(audio_filename)
601
- if file_state == "processed":
602
- print(f"[{FLOW_ID}] ⏭️ File already processed: {audio_filename}. Aborting batch and moving index.")
603
- # Abort batch, unlock any files we locked, and move index past this file
604
- lock_succeeded = False
605
- break
606
- elif file_state == "processing":
607
- print(f"[{FLOW_ID}] ⏭️ File currently processing: {audio_filename}. Aborting batch and moving index.")
608
- # Abort batch, unlock any files we locked, and move index past this file
609
- lock_succeeded = False
610
- break
611
-
612
- # Attempt to lock
613
- if await lock_file_for_processing(audio_filename, hf_state):
614
- locked_files.append(audio_filename)
615
- else:
616
- print(f"[{FLOW_ID}] ❌ Failed to lock file: {audio_filename}. Aborting batch.")
617
- lock_succeeded = False
618
- break
619
-
620
- # If lock failed for any reason (already processed/processing or lock upload failed)
621
- if not lock_succeeded:
622
- # Unlock all files we successfully locked in this attempt
623
- for filename in locked_files:
624
- # We don't need to upload state for each unlock, we'll do it once at the end
625
- if filename in hf_state['file_states']:
626
- del hf_state['file_states'][filename]
627
-
628
- # If the failure was due to a file being processed/processed, we need to advance the index
629
- if current_index < len(audio_files) and hf_state['file_states'].get(Path(audio_files[current_index]).name) in ["processed", "processing"]:
630
- hf_state['next_download_index'] += 1
631
- await upload_hf_state(hf_state)
632
-
633
- # Wait and restart the loop
634
- print(f"[{FLOW_ID}] Batch aborted. Waiting 10s before retrying...")
635
- await asyncio.sleep(10)
636
- continue
637
-
638
- # 2. Download all files in batch in parallel
639
- print(f"[{FLOW_ID}] ⬇️ Downloading batch ({len(batch_files_full_path)} files)...")
640
- download_tasks = [download_audio_file(current_index + idx, path) for idx, path in enumerate(batch_files_full_path)]
641
  downloaded_paths = await asyncio.gather(*download_tasks, return_exceptions=True)
642
 
643
- # 3. Send all downloaded files to Whisper servers in parallel
644
- print(f"[{FLOW_ID}] 🎤 Distributing to {len(servers)} Whisper server(s) ({len(batch_files_full_path)} files)...")
645
 
646
  transcription_tasks = []
647
- file_metadata = []
648
 
649
- for idx, (repo_file_path, audio_path) in enumerate(zip(batch_files_full_path, downloaded_paths)):
 
650
  audio_filename = Path(repo_file_path).name
651
 
652
- if isinstance(audio_path, Exception) or not audio_path or not audio_path.exists():
653
- print(f"[{FLOW_ID}] ❌ Skipping {audio_filename} (download failed or path invalid)")
654
- # Mark as processed immediately so we don't get stuck on a bad file
655
- hf_state['file_states'][audio_filename] = "processed"
 
 
656
  continue
657
 
658
  reference_filename = find_matching_filename(audio_filename, reference_map)
@@ -660,211 +569,137 @@ async def process_audio_files_background():
660
  'audio_filename': audio_filename,
661
  'audio_path': audio_path,
662
  'reference_filename': reference_filename,
663
- 'file_index': current_index + idx
664
  })
665
 
666
- # Create transcription task
667
- # We need a wrapper function for parallel execution that uses the single-file logic
668
- # The original code had a send_audio_for_transcription_task, but we removed it.
669
- # We will use a lambda or a simple wrapper to call the existing send_audio_for_transcription
670
- # The original send_audio_for_transcription is already async and handles server selection.
671
- transcription_tasks.append(send_audio_for_transcription(audio_path, {'completed': 0, 'total': 1}))
672
-
673
  if transcription_tasks:
674
  print(f"[{FLOW_ID}] ⏳ Waiting for {len(transcription_tasks)} transcriptions (parallel)...")
675
  transcription_results = await asyncio.gather(*transcription_tasks, return_exceptions=True)
676
 
677
- # 4. Process results, save locally, and upload individually
678
- successful_uploads = 0
679
- next_index_to_set = current_index
680
 
681
  for metadata, result in zip(file_metadata, transcription_results):
682
- audio_filename = metadata['audio_filename']
683
- audio_path = metadata['audio_path']
684
-
685
- # Cleanup downloaded audio file
686
- if audio_path.exists():
687
- os.remove(audio_path)
688
-
689
- if isinstance(result, Exception) or not result:
690
- print(f"[{FLOW_ID}] ❌ Transcription failed for {audio_filename}. Marking as processed (failed).")
691
- hf_state['file_states'][audio_filename] = "processed"
692
- next_index_to_set = metadata['file_index'] + 1
693
  continue
694
 
695
- # Save JSON locally
696
- json_filename = Path(metadata['reference_filename']).stem if metadata['reference_filename'] else Path(audio_filename).stem
697
- json_path = RESULTS_DIR / f"{json_filename}.json"
698
-
699
- # Add reference file mapping to the result
700
- if metadata['reference_filename']:
701
- result['reference_file'] = metadata['reference_filename']
702
-
703
- save_json_state(str(json_path), result)
704
-
705
- # Upload the single JSON file
706
- if await upload_single_transcription(json_path):
707
- successful_uploads += 1
708
- hf_state['file_states'][audio_filename] = "processed"
709
- next_index_to_set = metadata['file_index'] + 1
710
- else:
711
- # Upload failed (likely rate limit). Keep the file locked and local for next attempt.
712
- # We must break the inner loop and restart the outer loop to re-check the rate limit.
713
- print(f"[{FLOW_ID}] 🛑 Upload failed (likely rate limit). Aborting batch processing.")
714
- # Keep the file locked (status is still 'processing') and local (json_path not removed)
715
- # We will break and restart the main loop.
716
- break
717
-
718
- # 5. ATOMIC UNLOCK/STATE UPDATE: Update state on HF
719
- # If the inner loop was broken due to rate limit, we skip the state update and restart the main loop.
720
- if successful_uploads == len(transcription_tasks):
721
- # All files in the batch were successfully processed and uploaded.
722
- hf_state['next_download_index'] = next_index_to_set
723
- await upload_hf_state(hf_state)
724
- print(f"[{FLOW_ID}] ✅ Batch completed. Next index set to {next_index_to_set}.")
725
- elif successful_uploads > 0:
726
- # Some files failed transcription or download, but were marked as 'processed' (failed).
727
- # The index was advanced past them.
728
- hf_state['next_download_index'] = next_index_to_set
729
- await upload_hf_state(hf_state)
730
- print(f"[{FLOW_ID}] ✅ Batch partially completed. Next index set to {next_index_to_set}.")
731
- elif successful_uploads == 0 and len(transcription_tasks) > 0:
732
- # If the break was due to rate limit, the outer loop will handle the wait/retry.
733
- # If all transcriptions failed, the index was advanced past them.
734
- if next_index_to_set > current_index:
735
- hf_state['next_download_index'] = next_index_to_set
736
- await upload_hf_state(hf_state)
737
- print(f"[{FLOW_ID}] ⚠️ Batch failed. Next index set to {next_index_to_set}.")
738
-
739
- # Wait a short period to avoid hammering the HF API
740
- await asyncio.sleep(1)
741
- await upload_hf_state(hf_state)
742
- print(f"[{FLOW_ID}] ✅ Batch partially completed. Next index set to {next_index_to_set}.")
743
- elif successful_uploads == 0 and len(transcription_tasks) > 0:
744
- # If the break was due to rate limit, the outer loop will handle the wait/retry.
745
- # If all transcriptions failed, the index was advanced past them.
746
- if next_index_to_set > current_index:
747
- hf_state['next_download_index'] = next_index_to_set
748
- await upload_hf_state(hf_state)
749
- print(f"[{FLOW_ID}] ⚠️ Batch failed. Next index set to {next_index_to_set}.")
750
-
751
- # Wait a short period to avoid hammering the HF API
752
- await asyncio.sleep(1)
753
- continue
754
 
755
- # 2. Download all files in batch in parallel
756
- print(f"[{FLOW_ID}] ⬇️ Downloading batch ({len(batch_files_full_path)} files)...")
757
- download_tasks = [download_audio_file(current_index + idx, path) for idx, path in enumerate(batch_files_full_path)]
758
- downloaded_paths = await asyncio.gather(*download_tasks, return_exceptions=True)
759
-
760
- # 3. Send all downloaded files to Whisper servers in parallel
761
- print(f"[{FLOW_ID}] 🎤 Distributing to {len(servers)} Whisper server(s) ({len(batch_files_full_path)} files)...")
762
 
763
- transcription_tasks = []
764
- file_metadata = []
 
765
 
766
- for idx, (repo_file_path, audio_path) in enumerate(zip(batch_files_full_path, downloaded_paths)):
767
- audio_filename = Path(repo_file_path).name
768
-
769
- if isinstance(audio_path, Exception) or not audio_path or not audio_path.exists():
770
- print(f"[{FLOW_ID}] ❌ Skipping {audio_filename} (download failed or path invalid)")
771
- # Mark as processed immediately so we don't get stuck on a bad file
772
- hf_state['file_states'][audio_filename] = "processed"
773
- continue
774
-
775
- reference_filename = find_matching_filename(audio_filename, reference_map)
776
- file_metadata.append({
777
- 'audio_filename': audio_filename,
778
- 'audio_path': audio_path,
779
- 'reference_filename': reference_filename,
780
- 'file_index': current_index + idx
781
- })
782
 
783
- # Create transcription task
784
- transcription_tasks.append(send_audio_for_transcription(audio_path, {'completed': 0, 'total': 1}))
 
 
785
 
786
- if transcription_tasks:
787
- print(f"[{FLOW_ID}] Waiting for {len(transcription_tasks)} transcriptions (parallel)...")
788
- transcription_results = await asyncio.gather(*transcription_tasks, return_exceptions=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
789
 
790
- # 4. Process results, save locally, and upload individually
791
- successful_uploads = 0
792
- next_index_to_set = current_index
793
 
794
- for metadata, result in zip(file_metadata, transcription_results):
795
- audio_filename = metadata['audio_filename']
796
- audio_path = metadata['audio_path']
797
-
798
- # Cleanup downloaded audio file
799
- if audio_path.exists():
800
- os.remove(audio_path)
801
-
802
- if isinstance(result, Exception) or not result:
803
- print(f"[{FLOW_ID}] ❌ Transcription failed for {audio_filename}. Marking as processed (failed).")
804
- hf_state['file_states'][audio_filename] = "processed"
805
- next_index_to_set = metadata['file_index'] + 1
806
- continue
807
-
808
- # Save JSON locally
809
- json_filename = Path(metadata['reference_filename']).stem if metadata['reference_filename'] else Path(audio_filename).stem
810
- json_path = RESULTS_DIR / f"{json_filename}.json"
811
-
812
- # Add reference file mapping to the result
813
- if metadata['reference_filename']:
814
- result['reference_file'] = metadata['reference_filename']
815
-
816
- save_json_state(str(json_path), result)
817
-
818
- # Upload the single JSON file
819
- if await upload_single_transcription(json_path):
820
- successful_uploads += 1
821
- hf_state['file_states'][audio_filename] = "processed"
822
- next_index_to_set = metadata['file_index'] + 1
823
- else:
824
- # Upload failed (likely rate limit). Keep the file locked and local for next attempt.
825
- # We must break the inner loop and restart the outer loop to re-check the rate limit.
826
- print(f"[{FLOW_ID}] 🛑 Upload failed (likely rate limit). Aborting batch processing.")
827
- # Keep the file locked (status is still 'processing') and local (json_path not removed)
828
- # We will break and restart the main loop.
829
- break
830
 
831
- # 5. ATOMIC UNLOCK/STATE UPDATE: Update state on HF
832
- # If the inner loop was broken due to rate limit, we skip the state update and restart the main loop.
833
- if successful_uploads == len(transcription_tasks):
834
- # All files in the batch were successfully processed and uploaded.
835
- hf_state['next_download_index'] = next_index_to_set
836
- await upload_hf_state(hf_state)
837
- print(f"[{FLOW_ID}] ✅ Batch completed. Next index set to {next_index_to_set}.")
838
- elif successful_uploads > 0:
839
- # Some files failed transcription or download, but were marked as 'processed' (failed).
840
- # The index was advanced past them.
841
- hf_state['next_download_index'] = next_index_to_set
842
- await upload_hf_state(hf_state)
843
- print(f"[{FLOW_ID}] ✅ Batch partially completed. Next index set to {next_index_to_set}.")
844
- elif successful_uploads == 0 and len(transcription_tasks) > 0:
845
- # If the break was due to rate limit, the outer loop will handle the wait/retry.
846
- # If all transcriptions failed, the index was advanced past them.
847
- if next_index_to_set > current_index:
848
- hf_state['next_download_index'] = next_index_to_set
849
- await upload_hf_state(hf_state)
850
- print(f"[{FLOW_ID}] ⚠️ Batch failed. Next index set to {next_index_to_set}.")
851
-
852
- # Wait a short period to avoid hammering the HF API
853
- await asyncio.sleep(1)
854
- await upload_hf_state(hf_state)
855
- print(f"[{FLOW_ID}] Batch partially completed. Next index set to {next_index_to_set}.")
856
- elif successful_uploads == 0 and len(transcription_tasks) > 0:
857
- # If the break was due to rate limit, the outer loop will handle the wait/retry.
858
- # If all transcriptions failed, the index was advanced past them.
859
- if next_index_to_set > current_index:
860
- hf_state['next_download_index'] = next_index_to_set
861
- await upload_hf_state(hf_state)
862
- print(f"[{FLOW_ID}] ⚠️ Batch failed. Next index set to {next_index_to_set}.")
863
-
864
- # Wait a short period to avoid hammering the HF API
865
- await asyncio.sleep(1)
866
-
867
-
868
 
869
  @app.get("/")
870
  async def root():
 
3
  import time
4
  import asyncio
5
  import aiohttp
6
+ import zipfile
7
  import shutil
 
 
8
  import threading
9
+ from typing import Dict, List, Set, Optional, Tuple, Any
10
  from urllib.parse import quote
11
  from datetime import datetime
12
  from pathlib import Path
 
61
  ]
62
 
63
  MODEL_TYPE = "whisper-small"
64
+ ZIP_UPLOAD_THRESHOLD = 100 # Upload and zip after this many transcriptions
 
65
 
66
  # Temporary storage for audio files
67
  TEMP_DIR = Path(f"temp_audio_{FLOW_ID}")
 
116
  except Exception as e:
117
  print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
118
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
120
  """Load state from JSON file with migration logic for new structure."""
121
  if os.path.exists(file_path):
 
334
  print(f"[{FLOW_ID}] Error downloading audio file {repo_file_full_path}: {e}")
335
  return None
336
 
337
+ async def zip_and_upload_transcriptions(transcription_files: List[Path], batch_number: int) -> bool:
338
+ """Zips transcription JSON files and uploads to dataset with batch numbering."""
339
+ if not transcription_files:
340
+ print(f"[{FLOW_ID}] No transcription files to zip.")
341
  return False
342
 
 
 
 
 
 
 
343
  try:
344
+ zip_filename = f"audio_json_batch_{batch_number}.zip"
345
+ zip_path = RESULTS_DIR / zip_filename
346
+
347
+ print(f"[{FLOW_ID}] 📦 Creating zip file: {zip_filename} with {len(transcription_files)} files...")
348
+
349
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
350
+ for file_path in transcription_files:
351
+ if file_path.exists():
352
+ zipf.write(file_path, arcname=file_path.name)
353
+
354
+ print(f"[{FLOW_ID}] 📤 Uploading zip file to {HF_OUTPUT_DATASET_ID}...")
355
 
356
  api = HfApi(token=HF_TOKEN)
357
  api.upload_file(
358
+ path_or_fileobj=str(zip_path),
359
+ path_in_repo=zip_filename,
360
  repo_id=HF_OUTPUT_DATASET_ID,
361
  repo_type="dataset",
362
+ commit_message=f"[{FLOW_ID}] Batch {batch_number}: {len(transcription_files)} transcriptions"
363
  )
364
 
365
+ print(f"[{FLOW_ID}] ✅ Successfully uploaded: {zip_filename}")
366
 
367
+ # Cleanup
368
+ os.remove(zip_path)
 
 
 
369
 
370
  return True
371
 
372
  except Exception as e:
373
+ print(f"[{FLOW_ID}] Error zipping and uploading transcriptions: {e}")
374
  return False
375
 
376
  # --- Core Processing Functions ---
 
504
  }
505
 
506
  async def process_audio_files_background():
507
+ """Background task that processes audio files with reference mapping using batch distribution."""
 
 
508
  progress_data = load_progress()
509
  reference_map = progress_data.get('reference_map', {})
510
 
 
519
  if not audio_files:
520
  print(f"[{FLOW_ID}] No audio files found. Exiting.")
521
  return
522
+
523
  # Dynamic batch size: one file per server
524
  BATCH_SIZE = len(servers)
525
  print(f"[{FLOW_ID}] 📊 Configuration: {len(servers)} Whisper server(s) → Batch size: {BATCH_SIZE} (1 file per server)")
526
 
527
+ start_index = progress_data['last_processed_index']
528
+ transcription_files = []
529
+ batch_number = 1
530
 
531
+ print(f"[{FLOW_ID}] Starting batch processing from file #{start_index} (out of {len(audio_files)})...")
532
+
533
+ # Process in batches
534
+ for batch_start in range(start_index, len(audio_files), BATCH_SIZE):
535
+ batch_end = min(batch_start + BATCH_SIZE, len(audio_files))
536
+ batch_files = audio_files[batch_start:batch_end]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
537
 
538
+ print(f"\n[{FLOW_ID}] 📦 BATCH: Processing files #{batch_start}-#{batch_end-1} ({len(batch_files)} files)")
 
539
 
540
+ # Step 1: Download all files in batch in parallel
541
+ print(f"[{FLOW_ID}] ⬇️ Downloading batch ({len(batch_files)} files)...")
542
+ download_tasks = []
543
+ for idx, repo_file_path in enumerate(batch_files):
544
+ file_index = batch_start + idx
545
+ download_tasks.append(download_audio_file(file_index, repo_file_path))
546
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
547
  downloaded_paths = await asyncio.gather(*download_tasks, return_exceptions=True)
548
 
549
+ # Step 2: Send all downloaded files to Whisper servers in parallel
550
+ print(f"[{FLOW_ID}] 🎤 Distributing to {len(servers)} Whisper server(s) ({len(batch_files)} files)...")
551
 
552
  transcription_tasks = []
553
+ file_metadata = [] # Track file info for results
554
 
555
+ for idx, (repo_file_path, audio_path) in enumerate(zip(batch_files, downloaded_paths)):
556
+ file_index = batch_start + idx
557
  audio_filename = Path(repo_file_path).name
558
 
559
+ # Skip if download failed
560
+ if isinstance(audio_path, Exception):
561
+ print(f"[{FLOW_ID}] ⏭️ Skipping {audio_filename} (download failed)")
562
+ continue
563
+
564
+ if not audio_path or not audio_path.exists():
565
  continue
566
 
567
  reference_filename = find_matching_filename(audio_filename, reference_map)
 
569
  'audio_filename': audio_filename,
570
  'audio_path': audio_path,
571
  'reference_filename': reference_filename,
572
+ 'file_index': file_index
573
  })
574
 
575
+ # Create transcription task (will be awaited in parallel)
576
+ transcription_tasks.append(send_audio_for_transcription_task(audio_path, audio_filename))
577
+
 
 
 
 
578
  if transcription_tasks:
579
  print(f"[{FLOW_ID}] ⏳ Waiting for {len(transcription_tasks)} transcriptions (parallel)...")
580
  transcription_results = await asyncio.gather(*transcription_tasks, return_exceptions=True)
581
 
582
+ # Step 3: Save transcriptions locally (don't upload individually)
583
+ successful = len([r for r in transcription_results if not isinstance(r, Exception) and r])
584
+ print(f"[{FLOW_ID}] 💾 Saving {successful}/{len(transcription_results)} transcriptions locally...")
585
 
586
  for metadata, result in zip(file_metadata, transcription_results):
587
+ if isinstance(result, Exception):
588
+ print(f"[{FLOW_ID}] Transcription failed for {metadata['audio_filename']}: {result}")
 
 
 
 
 
 
 
 
 
589
  continue
590
 
591
+ if result:
592
+ # Save JSON locally
593
+ json_filename = Path(metadata['reference_filename']).stem if metadata['reference_filename'] else Path(metadata['audio_filename']).stem
594
+ json_file_path = Path(RESULTS_DIR) / f"{json_filename}.json"
595
+
596
+ # Write JSON to file
597
+ with open(json_file_path, 'w', encoding='utf-8') as f:
598
+ json.dump(result, f, indent=2, ensure_ascii=False)
599
+
600
+ transcription_files.append(json_file_path)
601
+ progress_data['transcription_count'] += 1
602
+
603
+ # Mark as processed
604
+ state = await download_hf_state()
605
+ await unlock_file_as_processed(
606
+ metadata['audio_filename'],
607
+ state,
608
+ metadata['file_index'] + 1
609
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
610
 
611
+ # Step 4: Cleanup downloaded audio files
612
+ for metadata in file_metadata:
613
+ if metadata['audio_path'].exists():
614
+ os.remove(metadata['audio_path'])
 
 
 
615
 
616
+ # Save progress after batch
617
+ progress_data['last_processed_index'] = batch_end
618
+ save_progress(progress_data)
619
 
620
+ # Step 5: Check if we've reached the batch threshold for zipping (100 files)
621
+ if len(transcription_files) >= ZIP_UPLOAD_THRESHOLD:
622
+ print(f"\n[{FLOW_ID}] 📦 Reached ZIP threshold ({ZIP_UPLOAD_THRESHOLD}). Creating and uploading batch {batch_number}...")
623
+ files_to_zip = transcription_files[:ZIP_UPLOAD_THRESHOLD]
624
+ await zip_and_upload_transcriptions(files_to_zip, batch_number)
 
 
 
 
 
 
 
 
 
 
 
625
 
626
+ # Remove zipped files locally and update list
627
+ for file_path in files_to_zip:
628
+ if file_path.exists():
629
+ os.remove(file_path)
630
 
631
+ transcription_files = transcription_files[ZIP_UPLOAD_THRESHOLD:]
632
+ batch_number += 1
633
+
634
+ # Upload remaining transcriptions as final batch
635
+ if transcription_files:
636
+ print(f"\n[{FLOW_ID}] 📦 Uploading final batch {batch_number} with {len(transcription_files)} transcriptions...")
637
+ await zip_and_upload_transcriptions(transcription_files, batch_number)
638
+
639
+ # Cleanup
640
+ for file_path in transcription_files:
641
+ if file_path.exists():
642
+ os.remove(file_path)
643
+
644
+ print(f"\n[{FLOW_ID}] ✅ ALL DONE! Total transcriptions: {progress_data['transcription_count']}")
645
+
646
+ async def send_audio_for_transcription_task(audio_path: Path, audio_filename: str) -> Optional[Dict]:
647
+ """Wrapper for transcription that can be used in asyncio.gather."""
648
+ MAX_RETRIES = 3
649
+ for attempt in range(MAX_RETRIES):
650
+ server = None
651
+ try:
652
+ server = await get_available_server()
653
+ server.busy = True
654
+ start_time = time.time()
655
 
656
+ # Read file content once
657
+ with audio_path.open('rb') as f:
658
+ file_content = f.read()
659
 
660
+ form_data = aiohttp.FormData()
661
+ form_data.add_field('file',
662
+ io.BytesIO(file_content),
663
+ filename=audio_filename,
664
+ content_type='audio/mpeg')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
665
 
666
+ async with aiohttp.ClientSession() as session:
667
+ async with session.post(server.url, data=form_data, timeout=aiohttp.ClientTimeout(total=600)) as resp:
668
+ if resp.status == 200:
669
+ result = await resp.json()
670
+
671
+ if result.get('text') or result.get('transcription'):
672
+ print(f"[{FLOW_ID}] ✅ {audio_filename}")
673
+
674
+ return {
675
+ "audio_file": audio_filename,
676
+ "text": result.get('text', result.get('transcription', '')),
677
+ "language": result.get('language', 'unknown'),
678
+ "confidence": result.get('confidence'),
679
+ "duration": result.get('duration'),
680
+ }
681
+ else:
682
+ print(f"[{FLOW_ID}] ⚠️ Invalid response for {audio_filename}")
683
+ continue
684
+ else:
685
+ error_text = await resp.text()
686
+ print(f"[{FLOW_ID}] ❌ Server error {resp.status}: {audio_filename}")
687
+ continue
688
+
689
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
690
+ print(f"[{FLOW_ID}] ⏱️ Timeout/Connection error for {audio_filename}")
691
+ continue
692
+ except Exception as e:
693
+ print(f"[{FLOW_ID}] Error for {audio_filename}: {str(e)[:50]}")
694
+ continue
695
+ finally:
696
+ if server:
697
+ end_time = time.time()
698
+ server.busy = False
699
+ server.total_processed += 1
700
+ server.total_time += (end_time - start_time)
701
+
702
+ return None
703
 
704
  @app.get("/")
705
  async def root():