Samfredoly commited on
Commit
424a01e
·
verified ·
1 Parent(s): 963cadd

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +132 -60
app.py CHANGED
@@ -8,7 +8,7 @@ import shutil
8
  import threading
9
  from typing import Dict, List, Set, Optional, Tuple, Any
10
  from urllib.parse import quote
11
- from datetime import datetime
12
  from pathlib import Path
13
  import io
14
 
@@ -17,26 +17,24 @@ from pydantic import BaseModel, Field
17
  from huggingface_hub import HfApi, hf_hub_download
18
 
19
  # --- Configuration ---
20
- AUTO_START_INDEX = 1290 # Hardcoded default start index if no progress is found
21
  FLOW_ID = os.getenv("FLOW_ID", "flow_default")
22
  FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
23
  HF_TOKEN = os.getenv("HF_TOKEN", "")
24
- HF_AUDIO_DATASET_ID = os.getenv("HF_AUDIO_DATASET_ID", "Samfredoly/BG_Vid") # Source dataset for audio files
25
- HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samfred2/AT2") # Target dataset for transcriptions
26
 
27
  # Progress and State Tracking
28
  PROGRESS_FILE = Path("processing_progress.json")
29
- HF_STATE_FILE = "processing_state_transcriptions.json" # State file in output dataset
30
- LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
31
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
32
 
33
- # Directory within the HF dataset where audio files are located
34
  AUDIO_FILE_PREFIX = "audio/"
35
 
36
  # Reference dataset for filename mapping
37
- REFERENCE_REPO_ID = os.getenv("REFERENCE_REPO_ID", "Fred808/BG3") # For matching audio to reference files
38
 
39
- # Whisper server endpoints
40
  WHISPER_SERVERS = [
41
  "https://fred1012-switch3.hf.space/transcribe",
42
  "https://Eliasishere-mint-2.hf.space/transcribe",
@@ -84,14 +82,60 @@ class WhisperServer:
84
  def fps(self):
85
  return self.total_processed / self.total_time if self.total_time > 0 else 0
86
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
  # Global state for whisper servers
88
  servers = [WhisperServer(url) for url in WHISPER_SERVERS]
89
  server_index = 0
90
 
91
- # --- Progress and State Management Functions ---
92
 
93
  def load_progress() -> Dict:
94
- """Loads the local processing progress from the JSON file."""
95
  if PROGRESS_FILE.exists():
96
  try:
97
  with PROGRESS_FILE.open('r') as f:
@@ -99,13 +143,13 @@ def load_progress() -> Dict:
99
  except json.JSONDecodeError:
100
  print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
101
 
102
- # Default structure
103
  return {
104
  "last_processed_index": 0,
105
- "processed_files": {}, # {index: audio_file_path}
106
- "file_list": [], # Full list of all audio files found in the dataset
107
- "transcription_count": 0, # Count of transcriptions saved
108
- "reference_map": {} # Mapping from audio filename to reference filename
109
  }
110
 
111
  def save_progress(progress_data: Dict):
@@ -137,7 +181,7 @@ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str,
137
  return data
138
  except json.JSONDecodeError:
139
  print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
140
- return default_value
141
 
142
  def save_json_state(file_path: str, data: Dict[str, Any]):
143
  """Save state to JSON file"""
@@ -334,6 +378,32 @@ async def download_audio_file(file_index: int, repo_file_full_path: str) -> Opti
334
  print(f"[{FLOW_ID}] Error downloading audio file {repo_file_full_path}: {e}")
335
  return None
336
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
337
  async def zip_and_upload_transcriptions(transcription_files: List[Path], batch_number: int) -> bool:
338
  """Zips transcription JSON files and uploads to dataset with batch numbering."""
339
  if not transcription_files:
@@ -504,7 +574,14 @@ async def process_audio_files(background_tasks: BackgroundTasks):
504
  }
505
 
506
  async def process_audio_files_background():
507
- """Background task that processes audio files with reference mapping using batch distribution."""
 
 
 
 
 
 
 
508
  progress_data = load_progress()
509
  reference_map = progress_data.get('reference_map', {})
510
 
@@ -525,8 +602,6 @@ async def process_audio_files_background():
525
  print(f"[{FLOW_ID}] 📊 Configuration: {len(servers)} Whisper server(s) → Batch size: {BATCH_SIZE} (1 file per server)")
526
 
527
  start_index = progress_data['last_processed_index']
528
- transcription_files = []
529
- batch_number = 1
530
 
531
  print(f"[{FLOW_ID}] Starting batch processing from file #{start_index} (out of {len(audio_files)})...")
532
 
@@ -535,7 +610,7 @@ async def process_audio_files_background():
535
  batch_end = min(batch_start + BATCH_SIZE, len(audio_files))
536
  batch_files = audio_files[batch_start:batch_end]
537
 
538
- print(f"\n[{FLOW_ID}] 📦 BATCH: Processing files #{batch_start}-#{batch_end-1} ({len(batch_files)} files)")
539
 
540
  # Step 1: Download all files in batch in parallel
541
  print(f"[{FLOW_ID}] ⬇️ Downloading batch ({len(batch_files)} files)...")
@@ -579,9 +654,12 @@ async def process_audio_files_background():
579
  print(f"[{FLOW_ID}] ⏳ Waiting for {len(transcription_tasks)} transcriptions (parallel)...")
580
  transcription_results = await asyncio.gather(*transcription_tasks, return_exceptions=True)
581
 
582
- # Step 3: Save transcriptions locally (don't upload individually)
583
- successful = len([r for r in transcription_results if not isinstance(r, Exception) and r])
584
- print(f"[{FLOW_ID}] 💾 Saving {successful}/{len(transcription_results)} transcriptions locally...")
 
 
 
585
 
586
  for metadata, result in zip(file_metadata, transcription_results):
587
  if isinstance(result, Exception):
@@ -589,7 +667,7 @@ async def process_audio_files_background():
589
  continue
590
 
591
  if result:
592
- # Save JSON locally
593
  json_filename = Path(metadata['reference_filename']).stem if metadata['reference_filename'] else Path(metadata['audio_filename']).stem
594
  json_file_path = Path(RESULTS_DIR) / f"{json_filename}.json"
595
 
@@ -597,49 +675,43 @@ async def process_audio_files_background():
597
  with open(json_file_path, 'w', encoding='utf-8') as f:
598
  json.dump(result, f, indent=2, ensure_ascii=False)
599
 
600
- transcription_files.append(json_file_path)
601
- progress_data['transcription_count'] += 1
 
 
 
602
 
603
- # Mark as processed
604
- state = await download_hf_state()
605
- await unlock_file_as_processed(
606
- metadata['audio_filename'],
607
- state,
608
- metadata['file_index'] + 1
609
- )
610
 
611
  # Step 4: Cleanup downloaded audio files
612
  for metadata in file_metadata:
613
  if metadata['audio_path'].exists():
614
  os.remove(metadata['audio_path'])
615
-
616
- # Save progress after batch
617
- progress_data['last_processed_index'] = batch_end
618
- save_progress(progress_data)
619
-
620
- # Step 5: Check if we've reached the batch threshold for zipping (100 files)
621
- if len(transcription_files) >= ZIP_UPLOAD_THRESHOLD:
622
- print(f"\n[{FLOW_ID}] 📦 Reached ZIP threshold ({ZIP_UPLOAD_THRESHOLD}). Creating and uploading batch {batch_number}...")
623
- files_to_zip = transcription_files[:ZIP_UPLOAD_THRESHOLD]
624
- await zip_and_upload_transcriptions(files_to_zip, batch_number)
625
 
626
- # Remove zipped files locally and update list
627
- for file_path in files_to_zip:
628
- if file_path.exists():
629
- os.remove(file_path)
630
 
631
- transcription_files = transcription_files[ZIP_UPLOAD_THRESHOLD:]
632
- batch_number += 1
633
-
634
- # Upload remaining transcriptions as final batch
635
- if transcription_files:
636
- print(f"\n[{FLOW_ID}] 📦 Uploading final batch {batch_number} with {len(transcription_files)} transcriptions...")
637
- await zip_and_upload_transcriptions(transcription_files, batch_number)
638
-
639
- # Cleanup
640
- for file_path in transcription_files:
641
- if file_path.exists():
642
- os.remove(file_path)
 
 
 
 
 
643
 
644
  print(f"\n[{FLOW_ID}] ✅ ALL DONE! Total transcriptions: {progress_data['transcription_count']}")
645
 
 
8
  import threading
9
  from typing import Dict, List, Set, Optional, Tuple, Any
10
  from urllib.parse import quote
11
+ from datetime import datetime, timedelta
12
  from pathlib import Path
13
  import io
14
 
 
17
  from huggingface_hub import HfApi, hf_hub_download
18
 
19
  # --- Configuration ---
20
+ AUTO_START_INDEX = 1290
21
  FLOW_ID = os.getenv("FLOW_ID", "flow_default")
22
  FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
23
  HF_TOKEN = os.getenv("HF_TOKEN", "")
24
+ HF_AUDIO_DATASET_ID = os.getenv("HF_AUDIO_DATASET_ID", "Samfredoly/BG_Vid")
25
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samfred2/AT2")
26
 
27
  # Progress and State Tracking
28
  PROGRESS_FILE = Path("processing_progress.json")
29
+ HF_STATE_FILE = "processing_state_transcriptions.json"
30
+ LOCAL_STATE_FOLDER = Path(".state")
31
  LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
32
 
 
33
  AUDIO_FILE_PREFIX = "audio/"
34
 
35
  # Reference dataset for filename mapping
36
+ REFERENCE_REPO_ID = os.getenv("REFERENCE_REPO_ID", "Fred808/BG3")
37
 
 
38
  WHISPER_SERVERS = [
39
  "https://fred1012-switch3.hf.space/transcribe",
40
  "https://Eliasishere-mint-2.hf.space/transcribe",
 
82
  def fps(self):
83
  return self.total_processed / self.total_time if self.total_time > 0 else 0
84
 
85
+ class RateLimiter:
86
+ """Tracks uploads per hour with max limit of 120, stops at 128."""
87
+ def __init__(self, max_per_hour: int = 120, stop_at: int = 128):
88
+ self.max_per_hour = max_per_hour
89
+ self.stop_at = stop_at
90
+ self.uploads = [] # List of timestamps
91
+ self.lock = asyncio.Lock()
92
+
93
+ async def wait_if_needed(self) -> bool:
94
+ """
95
+ Returns True if upload can proceed, False if rate limit reached.
96
+ Waits if needed to stay within limits.
97
+ """
98
+ async with self.lock:
99
+ now = datetime.now()
100
+ one_hour_ago = now - timedelta(hours=1)
101
+
102
+ # Remove old uploads outside the 1-hour window
103
+ self.uploads = [ts for ts in self.uploads if ts > one_hour_ago]
104
+
105
+ # If we've reached the hard stop limit (128), return False
106
+ if len(self.uploads) >= self.stop_at:
107
+ print(f"[{FLOW_ID}] ⏸️ Upload limit ({self.stop_at}) reached. Waiting for next hour...")
108
+ return False
109
+
110
+ # If we're at the soft limit (120), add timestamp and continue
111
+ if len(self.uploads) < self.max_per_hour:
112
+ self.uploads.append(now)
113
+ remaining = self.max_per_hour - len(self.uploads)
114
+ print(f"[{FLOW_ID}] 📤 Upload #{len(self.uploads)}/120 this hour ({remaining} remaining)")
115
+ return True
116
+
117
+ # Between soft limit and hard stop, add and continue
118
+ self.uploads.append(now)
119
+ print(f"[{FLOW_ID}] ⚠️ Upload #{len(self.uploads)}/120 this hour (approaching limit)")
120
+ return True
121
+
122
+ async def can_upload(self) -> bool:
123
+ """Check if upload is allowed without waiting."""
124
+ async with self.lock:
125
+ now = datetime.now()
126
+ one_hour_ago = now - timedelta(hours=1)
127
+ self.uploads = [ts for ts in self.uploads if ts > one_hour_ago]
128
+ return len(self.uploads) < self.stop_at
129
+
130
+ # Global rate limiter
131
+ rate_limiter = RateLimiter(max_per_hour=120, stop_at=128)
132
+
133
  # Global state for whisper servers
134
  servers = [WhisperServer(url) for url in WHISPER_SERVERS]
135
  server_index = 0
136
 
 
137
 
138
  def load_progress() -> Dict:
 
139
  if PROGRESS_FILE.exists():
140
  try:
141
  with PROGRESS_FILE.open('r') as f:
 
143
  except json.JSONDecodeError:
144
  print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
145
 
146
+
147
  return {
148
  "last_processed_index": 0,
149
+ "processed_files": {},
150
+ "file_list": [],
151
+ "transcription_count": 0,
152
+ "reference_map": {},
153
  }
154
 
155
  def save_progress(progress_data: Dict):
 
181
  return data
182
  except json.JSONDecodeError:
183
  print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
184
+ return default_value
185
 
186
  def save_json_state(file_path: str, data: Dict[str, Any]):
187
  """Save state to JSON file"""
 
378
  print(f"[{FLOW_ID}] Error downloading audio file {repo_file_full_path}: {e}")
379
  return None
380
 
381
+ async def upload_json_to_dataset(json_file_path: Path, json_filename: str) -> bool:
382
+ """Uploads a single JSON transcription file directly to HF dataset."""
383
+ try:
384
+ # Check rate limit before uploading
385
+ if not await rate_limiter.wait_if_needed():
386
+ print(f"[{FLOW_ID}] ⏸️ Upload rate limit reached for {json_filename}. Waiting...")
387
+ return False
388
+
389
+ print(f"[{FLOW_ID}] 📤 Uploading JSON file: {json_filename}...")
390
+
391
+ api = HfApi(token=HF_TOKEN)
392
+ api.upload_file(
393
+ path_or_fileobj=str(json_file_path),
394
+ path_in_repo=f"transcriptions/{json_filename}",
395
+ repo_id=HF_OUTPUT_DATASET_ID,
396
+ repo_type="dataset",
397
+ commit_message=f"[{FLOW_ID}] Transcription: {json_filename}"
398
+ )
399
+
400
+ print(f"[{FLOW_ID}] ✅ Successfully uploaded: {json_filename}")
401
+ return True
402
+
403
+ except Exception as e:
404
+ print(f"[{FLOW_ID}] ❌ Error uploading {json_filename}: {e}")
405
+ return False
406
+
407
  async def zip_and_upload_transcriptions(transcription_files: List[Path], batch_number: int) -> bool:
408
  """Zips transcription JSON files and uploads to dataset with batch numbering."""
409
  if not transcription_files:
 
574
  }
575
 
576
  async def process_audio_files_background():
577
+ """
578
+ Background task that processes audio files with reference mapping.
579
+ - Downloads batch of files (1 per server)
580
+ - Distributes to Whisper servers in parallel
581
+ - Uploads JSON results directly to HF dataset
582
+ - Updates processing state after each batch round (dynamically based on actual processed count)
583
+ - Respects rate limit: max 120 uploads/hour, stops at 128
584
+ """
585
  progress_data = load_progress()
586
  reference_map = progress_data.get('reference_map', {})
587
 
 
602
  print(f"[{FLOW_ID}] 📊 Configuration: {len(servers)} Whisper server(s) → Batch size: {BATCH_SIZE} (1 file per server)")
603
 
604
  start_index = progress_data['last_processed_index']
 
 
605
 
606
  print(f"[{FLOW_ID}] Starting batch processing from file #{start_index} (out of {len(audio_files)})...")
607
 
 
610
  batch_end = min(batch_start + BATCH_SIZE, len(audio_files))
611
  batch_files = audio_files[batch_start:batch_end]
612
 
613
+ print(f"\n[{FLOW_ID}] 📦 BATCH ROUND: Processing files #{batch_start}-#{batch_end-1} ({len(batch_files)} files)")
614
 
615
  # Step 1: Download all files in batch in parallel
616
  print(f"[{FLOW_ID}] ⬇️ Downloading batch ({len(batch_files)} files)...")
 
654
  print(f"[{FLOW_ID}] ⏳ Waiting for {len(transcription_tasks)} transcriptions (parallel)...")
655
  transcription_results = await asyncio.gather(*transcription_tasks, return_exceptions=True)
656
 
657
+ # Step 3: Upload transcriptions directly to HF dataset
658
+ successful_uploads = 0
659
+ uploaded_files = []
660
+ state = await download_hf_state()
661
+
662
+ print(f"[{FLOW_ID}] 📤 Uploading {len([r for r in transcription_results if r and not isinstance(r, Exception)])}/{len(transcription_results)} transcriptions directly to dataset...")
663
 
664
  for metadata, result in zip(file_metadata, transcription_results):
665
  if isinstance(result, Exception):
 
667
  continue
668
 
669
  if result:
670
+ # Save JSON locally first
671
  json_filename = Path(metadata['reference_filename']).stem if metadata['reference_filename'] else Path(metadata['audio_filename']).stem
672
  json_file_path = Path(RESULTS_DIR) / f"{json_filename}.json"
673
 
 
675
  with open(json_file_path, 'w', encoding='utf-8') as f:
676
  json.dump(result, f, indent=2, ensure_ascii=False)
677
 
678
+ # Upload directly to HF dataset
679
+ if await upload_json_to_dataset(json_file_path, f"{json_filename}.json"):
680
+ successful_uploads += 1
681
+ uploaded_files.append(json_file_path)
682
+ progress_data['transcription_count'] += 1
683
 
684
+ # Cleanup local JSON file after upload
685
+ if json_file_path.exists():
686
+ os.remove(json_file_path)
 
 
 
 
687
 
688
  # Step 4: Cleanup downloaded audio files
689
  for metadata in file_metadata:
690
  if metadata['audio_path'].exists():
691
  os.remove(metadata['audio_path'])
 
 
 
 
 
 
 
 
 
 
692
 
693
+ # Step 5: Update processing state after this batch round
694
+ # Update next_download_index based on actual files processed this round
695
+ files_processed_this_round = len([m for m in file_metadata if m]) # Count of files actually processed
696
+ new_download_index = batch_start + files_processed_this_round
697
 
698
+ print(f"[{FLOW_ID}] 🔄 Batch round complete: {files_processed_this_round} files distributed and processed")
699
+ print(f"[{FLOW_ID}] 📊 Updating state: next_download_index {state['next_download_index']} → {new_download_index}")
700
+
701
+ state['next_download_index'] = new_download_index
702
+
703
+ # Mark all files in this round as processed in the state
704
+ for metadata in file_metadata:
705
+ state['file_states'][metadata['audio_filename']] = "processed"
706
+
707
+ # Upload updated state
708
+ await upload_hf_state(state)
709
+
710
+ # Save local progress
711
+ progress_data['last_processed_index'] = batch_end
712
+ save_progress(progress_data)
713
+
714
+ print(f"[{FLOW_ID}] ✅ State updated. Successful uploads this round: {successful_uploads}/{len(file_metadata)}")
715
 
716
  print(f"\n[{FLOW_ID}] ✅ ALL DONE! Total transcriptions: {progress_data['transcription_count']}")
717