Samfredoly commited on
Commit
675f2ca
·
verified ·
1 Parent(s): 6b99594

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +520 -566
app.py CHANGED
@@ -1,108 +1,100 @@
1
  import os
2
  import json
3
- import requests
4
- import subprocess
5
- import shutil
6
  import time
7
- import sys
8
- import threading
9
- from typing import Dict, List, Optional, Any
10
- from huggingface_hub import HfApi, hf_hub_url
11
- from fastapi import FastAPI, HTTPException
12
- from fastapi.responses import JSONResponse
13
- import uvicorn
14
-
15
- # Fix Unicode encoding for Windows
16
- if sys.platform == 'win32':
17
- import io
18
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
19
-
20
- # Initialize FastAPI app
21
- app = FastAPI(title="Audio Transcriber", description="Audio transcription and upload service")
22
-
23
- # ==== CONFIGURATION ====
24
- HF_TOKEN = os.environ.get("HF_TOKEN", "")
25
- SOURCE_REPO_ID = "Samfredoly/BG_Vid" # Fetch audio files from here
26
- TARGET_REPO_ID = "samfred2/A_Text" # Upload transcriptions here
27
- REFERENCE_REPO_ID = "Fred808/BG3" # Reference repo to match audio filenames
28
-
29
- # Path Configuration
30
- DOWNLOAD_FOLDER = "downloads_audio"
31
- TRANSCRIPTIONS_FOLDER = "transcriptions"
32
- LOCAL_STATE_FOLDER = ".state_audio"
33
-
34
- os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
35
- os.makedirs(TRANSCRIPTIONS_FOLDER, exist_ok=True)
36
- os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
37
-
38
- # State Files
39
- FAILED_FILES_LOG = "failed_audio_files.log"
40
- HF_STATE_FILE = "processing_audio_state.json"
41
-
42
- # Processing Parameters
43
- PROCESSING_DELAY = 2
44
- MAX_RETRIES = 3
45
- MIN_FREE_SPACE_GB = 1
46
- WHISPER_MODEL = "small" # Whisper model size
47
-
48
- # Initialize HF API
49
- hf_api = HfApi(token=HF_TOKEN)
50
-
51
- # Global State
52
- processing_status = {
53
- "is_running": False,
54
- "current_file": None,
55
- "total_files": 0,
56
- "processed_files": 0,
57
- "failed_files": 0,
58
- "transcribed_files": 0,
59
- "last_update": None,
60
- "logs": []
61
- }
62
-
63
- def log_message(message: str, level: str = "INFO"):
64
- """Log messages with timestamp"""
65
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
66
- log_entry = f"[{timestamp}] {level}: {message}"
67
- print(log_entry)
68
- processing_status["logs"].append(log_entry)
69
- processing_status["last_update"] = timestamp
70
- if len(processing_status["logs"]) > 100:
71
- processing_status["logs"] = processing_status["logs"][-100:]
72
-
73
- def log_failed_file(filename: str, error: str):
74
- """Log failed files to persistent file"""
75
- with open(FAILED_FILES_LOG, "a") as f:
76
- f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
77
-
78
- def get_disk_usage(path: str) -> Dict[str, float]:
79
- """Get disk usage statistics in GB"""
80
- statvfs = os.statvfs(path)
81
- total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
82
- free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
83
- used = total - free
84
- return {"total": total, "free": free, "used": used}
85
-
86
- def check_disk_space(path: str = ".") -> bool:
87
- """Check if there's enough disk space"""
88
- disk_info = get_disk_usage(path)
89
- if disk_info["free"] < MIN_FREE_SPACE_GB:
90
- log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
91
- return False
92
- return True
93
-
94
- def cleanup_temp_files():
95
- """Clean up temporary files to free space"""
96
- log_message("🧹 Cleaning up temporary files...", "INFO")
97
 
98
- current_file = processing_status.get("current_file")
99
- for file in os.listdir(DOWNLOAD_FOLDER):
100
- if file != current_file and file.endswith((".wav", ".mp3")):
101
- try:
102
- os.remove(os.path.join(DOWNLOAD_FOLDER, file))
103
- log_message(f"🗑️ Removed old download: {file}", "INFO")
104
- except:
105
- pass
 
 
 
 
 
 
 
 
106
 
107
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
108
  """Load state from JSON file with migration logic for new structure."""
@@ -111,16 +103,20 @@ def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str,
111
  with open(file_path, "r") as f:
112
  data = json.load(f)
113
 
 
114
  if "file_states" not in data or not isinstance(data["file_states"], dict):
115
- log_message("ℹ️ Initializing 'file_states' dictionary.", "INFO")
116
  data["file_states"] = {}
117
 
118
  if "next_download_index" not in data:
119
  data["next_download_index"] = 0
120
 
 
 
 
121
  return data
122
  except json.JSONDecodeError:
123
- log_message(f"⚠️ Corrupted state file: {file_path}", "WARNING")
124
  return default_value
125
 
126
  def save_json_state(file_path: str, data: Dict[str, Any]):
@@ -128,576 +124,534 @@ def save_json_state(file_path: str, data: Dict[str, Any]):
128
  with open(file_path, "w") as f:
129
  json.dump(data, f, indent=2)
130
 
131
- def download_hf_state(repo_id: str, filename: str) -> Dict[str, Any]:
132
  """Downloads the state file from Hugging Face or returns a default state."""
133
- local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
134
- default_state = {"next_download_index": 130, "file_states": {}}
135
 
136
  try:
137
- files = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
138
- if filename not in files:
139
- log_message(f"ℹ️ State file {filename} not found in {repo_id}. Starting from default state.", "INFO")
 
 
 
 
 
140
  return default_state
141
 
142
- from huggingface_hub import hf_hub_download
143
  hf_hub_download(
144
- repo_id=repo_id,
145
- filename=filename,
146
  repo_type="dataset",
147
  local_dir=LOCAL_STATE_FOLDER,
148
- local_dir_use_symlinks=False
 
149
  )
150
 
151
- log_message(f" Successfully downloaded state file from {repo_id}.", "INFO")
152
- return load_json_state(local_path, default_state)
153
 
154
  except Exception as e:
155
- log_message(f"⚠️ Failed to download state file from Hugging Face: {str(e)}. Starting from default state.", "WARNING")
156
  return default_state
157
 
158
- def upload_hf_state(repo_id: str, filename: str, state: Dict[str, Any]) -> bool:
159
  """Uploads the state file to Hugging Face."""
160
- local_path = os.path.join(LOCAL_STATE_FOLDER, filename)
161
 
162
  try:
163
- save_json_state(local_path, state)
164
-
165
- hf_api.upload_file(
166
- path_or_fileobj=local_path,
167
- path_in_repo=filename,
168
- repo_id=repo_id,
 
 
169
  repo_type="dataset",
170
- commit_message=f"Update audio processing state: next_index={state['next_download_index']}"
171
  )
172
- log_message(f" Successfully uploaded updated state file to {repo_id}", "INFO")
173
  return True
174
  except Exception as e:
175
- log_message(f" Failed to upload state file to Hugging Face: {str(e)}", "ERROR")
176
  return False
177
 
178
- def lock_file_for_processing(wav_filename: str, state: Dict[str, Any]) -> bool:
179
  """Marks a file as 'processing' in the state file and uploads the lock."""
180
- log_message(f"🔒 Attempting to lock file: {wav_filename} (Marking as 'processing')", "INFO")
181
 
182
- state["file_states"][wav_filename] = "processing"
 
183
 
184
- if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
185
- log_message(f"✅ Successfully locked file: {wav_filename}", "INFO")
 
186
  return True
187
  else:
188
- log_message(f"❌ Failed to upload lock for file: {wav_filename}. Aborting processing.", "ERROR")
189
- if wav_filename in state["file_states"]:
190
- del state["file_states"][wav_filename]
 
191
  return False
192
 
193
- def unlock_file_as_processed(wav_filename: str, state: Dict[str, Any], next_index: int) -> bool:
194
  """Marks a file as 'processed', updates the index, and uploads the state."""
195
- log_message(f"🔓 Attempting to unlock file: {wav_filename} (Marking as 'processed')", "INFO")
196
 
197
- state["file_states"][wav_filename] = "processed"
 
198
  state["next_download_index"] = next_index
199
 
200
- if upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state):
201
- log_message(f"✅ Successfully unlocked and marked as processed: {wav_filename}", "INFO")
 
202
  return True
203
  else:
204
- log_message(f"❌ Failed to upload final state for file: {wav_filename}.", "ERROR")
205
  return False
206
 
207
- def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
208
- """Download file with retry logic and disk space checking"""
209
- if not check_disk_space():
210
- cleanup_temp_files()
211
- if not check_disk_space():
212
- log_message("❌ Insufficient disk space even after cleanup", "ERROR")
213
- return False
214
-
215
- try:
216
- os.makedirs(os.path.dirname(dest_path), exist_ok=True)
217
- except Exception as e:
218
- log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
219
- return False
220
-
221
- headers = {"Authorization": f"Bearer {HF_TOKEN}"}
222
- for attempt in range(max_retries):
223
- try:
224
- with requests.get(url, headers=headers, stream=True) as r:
225
- r.raise_for_status()
226
-
227
- with open(dest_path, "wb") as f:
228
- for chunk in r.iter_content(chunk_size=8192):
229
- if chunk:
230
- f.write(chunk)
231
-
232
- log_message(f"✅ Download successful: {dest_path}", "INFO")
233
- return True
234
-
235
- except requests.exceptions.RequestException as e:
236
- log_message(f"❌ Download attempt {attempt + 1} failed for {url}: {str(e)}", "WARNING")
237
- time.sleep(PROCESSING_DELAY)
238
- except Exception as e:
239
- log_message(f"❌ An unexpected error occurred during download: {str(e)}", "ERROR")
240
- return False
241
-
242
- log_message(f"❌ Failed to download {url} after {max_retries} attempts.", "ERROR")
243
- return False
244
 
245
- def fetch_reference_files(repo_id: str) -> Dict[str, str]:
246
- """Fetch all files from Fred808/BG3 repo to match with audio filenames."""
247
- log_message(f"📋 Fetching file list from {repo_id}...", "INFO")
 
 
 
248
 
249
  try:
250
- files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
251
-
252
- # Include all file types (zip, rar, wav, mp3, etc.)
253
- all_files = [f for f in files_list]
254
 
255
- # Create a mapping of base filename (without extension) to full path
256
- filename_map = {}
257
- for file_path in all_files:
258
- base_name = os.path.splitext(os.path.basename(file_path))[0]
259
- filename_map[base_name] = file_path
260
 
261
- log_message(f"✅ Found {len(filename_map)} files in reference repo", "INFO")
262
- return filename_map
263
 
264
  except Exception as e:
265
- log_message(f" Failed to fetch reference files: {str(e)}", "ERROR")
266
  return {}
267
 
268
- def find_matching_filename(transcribed_filename: str, reference_map: Dict[str, str]) -> Optional[str]:
269
- """Find matching filename in reference map from Fred808/BG3."""
270
- base_name = os.path.splitext(transcribed_filename)[0]
271
-
272
- # Exact match first
273
- if base_name in reference_map:
274
- full_path = reference_map[base_name]
275
- print(f"\n✅ EXACT MATCH FOUND:")
276
- print(f" Audio: {transcribed_filename}")
277
- print(f" File: {full_path}")
278
- log_message(f"✅ Found exact match: {transcribed_filename} -> {full_path}", "INFO")
279
- return full_path
280
-
281
- # Partial/fuzzy match (check if reference contains transcribed as substring)
282
- matches = []
283
- for ref_base, ref_full_path in reference_map.items():
284
- if base_name.lower() in ref_base.lower() or ref_base.lower() in base_name.lower():
285
- matches.append((ref_base, ref_full_path))
286
-
287
- # Return first partial match if found
288
- if matches:
289
- ref_base, ref_full_path = matches[0]
290
- print(f"\n✅ PARTIAL MATCH FOUND:")
291
- print(f" Audio: {transcribed_filename}")
292
- print(f" File: {ref_full_path}")
293
- log_message(f"✅ Found partial match: {transcribed_filename} -> {ref_full_path}", "INFO")
294
- return ref_full_path
295
-
296
- print(f"\n⚠️ NO EXACT/PARTIAL MATCH FOUND (will still process):")
297
- print(f" Audio: {transcribed_filename}")
298
- log_message(f"⚠️ No matching filename found for: {transcribed_filename}. Will use original filename.", "WARNING")
299
- return None
300
 
301
- def transcribe_audio(wav_path: str) -> Optional[Dict[str, Any]]:
302
- """Transcribe audio file using Whisper from Transformers."""
303
- log_message(f"🎤 Transcribing audio file: {wav_path}", "INFO")
304
-
 
 
 
 
 
 
305
  try:
306
- from transformers import pipeline
307
- import librosa
308
-
309
- # Load audio with librosa
310
- log_message(f"Loading audio file: {wav_path}", "INFO")
311
- audio, sr = librosa.load(wav_path, sr=16000)
312
-
313
- # Initialize Whisper pipeline
314
- log_message(f"Loading Whisper {WHISPER_MODEL} model from Transformers...", "INFO")
315
- pipe = pipeline(
316
- "automatic-speech-recognition",
317
- model=f"openai/whisper-{WHISPER_MODEL}",
318
- device=0 if __import__('torch').cuda.is_available() else -1 # GPU if available, else CPU
319
  )
320
 
321
- # Transcribe
322
- log_message("Transcribing audio...", "INFO")
323
- result = pipe(audio)
 
 
 
 
 
 
324
 
325
- # Format result to match openai-whisper format
326
- formatted_result = {
327
- "text": result["text"],
328
- "segments": [{"text": result["text"]}]
329
- }
330
 
331
- log_message(f"✅ Successfully transcribed: {wav_path}", "INFO")
332
- return formatted_result
 
 
 
333
 
334
- except ImportError as e:
335
- missing_lib = str(e)
336
- log_message(f"❌ Missing library. Install with: pip install transformers librosa torch torchaudio", "ERROR")
337
- log_message(f" Error: {missing_lib}", "ERROR")
338
- return None
339
  except Exception as e:
340
- log_message(f" Failed to transcribe {wav_path}: {str(e)}", "ERROR")
341
- return None
342
 
343
- def process_audio_file(wav_path: str, reference_map: Dict[str, str], matched_filename: str) -> bool:
344
- """
345
- Main processing logic for a single audio file:
346
- 1. Transcribe using Whisper
347
- 2. Save transcription as JSON
348
- 3. Upload to HF dataset
349
- 4. Clean up local files
350
- """
351
- wav_filename = os.path.basename(wav_path)
352
 
353
- # 1. Transcribe audio
354
- transcription = transcribe_audio(wav_path)
355
- if transcription is None:
356
- log_failed_file(wav_filename, "Transcription failed")
357
- return False
358
 
359
- # 2. Save transcription as JSON
360
- json_filename = os.path.splitext(matched_filename)[0] + "_transcription.json"
361
- json_output_path = os.path.join(TRANSCRIPTIONS_FOLDER, json_filename)
362
 
363
  try:
364
- os.makedirs(os.path.dirname(json_output_path), exist_ok=True)
 
 
 
 
 
 
365
 
366
- with open(json_output_path, "w", encoding="utf-8") as f:
367
- json.dump(transcription, f, indent=2, ensure_ascii=False)
368
 
369
- log_message(f"✅ Saved transcription: {json_output_path}", "INFO")
 
 
 
 
370
 
371
  except Exception as e:
372
- log_message(f" Failed to save transcription JSON: {str(e)}", "ERROR")
373
- log_failed_file(wav_filename, f"Failed to save JSON: {str(e)}")
374
- return False
 
 
 
 
 
 
 
 
 
375
 
376
- # 3. Upload to HF dataset
377
  try:
378
- path_in_repo = f"transcriptions/{json_filename}"
379
- commit_message = f"Add transcription for: {matched_filename}"
 
 
380
 
381
- hf_api.upload_file(
382
- path_or_fileobj=json_output_path,
383
- path_in_repo=path_in_repo,
384
- repo_id=TARGET_REPO_ID,
 
385
  repo_type="dataset",
386
- commit_message=commit_message
387
  )
388
- log_message(f"✅ Successfully uploaded transcription: {json_filename}", "INFO")
389
- processing_status["transcribed_files"] += 1
 
390
 
391
  except Exception as e:
392
- log_message(f" Failed to upload transcription to HF: {str(e)}", "ERROR")
393
- log_failed_file(wav_filename, f"Failed to upload: {str(e)}")
394
  return False
395
-
396
- # 4. Clean up local files
397
- try:
398
- os.remove(json_output_path)
399
- log_message(f"🗑️ Cleaned up local transcription file: {json_output_path}", "INFO")
400
- except:
401
- pass
402
-
403
- return True
404
 
405
- def get_next_file_to_process(repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
406
- """
407
- Finds the next audio file to process from the source repo in reverse order (oldest to newest).
408
- Returns: { 'filename': str, 'url': str, 'index': int } or None
409
- """
410
- log_message(f"🔍 Searching for next audio file to process in {repo_id}", "INFO")
411
 
412
  try:
413
- files_list = hf_api.list_repo_files(repo_id=repo_id, repo_type="dataset")
414
-
415
- # Filter for audio files and sort in reverse order (descending)
416
- audio_files = sorted([f for f in files_list if f.endswith(('.wav', '.mp3'))], reverse=True)
417
-
418
- if not audio_files:
419
- log_message("ℹ️ No audio files found in the source repository.", "INFO")
420
- return None
421
-
422
- processing_status["total_files"] = len(audio_files)
423
 
424
- start_index = state.get("next_download_index", 0)
425
 
426
- for index in range(start_index, len(audio_files)):
427
- filename = audio_files[index]
428
- file_state = state["file_states"].get(filename)
429
-
430
- if file_state is None or file_state == "failed":
431
- url = hf_hub_url(repo_id=repo_id, filename=filename, repo_type="dataset", subfolder=None)
432
-
433
- log_message(f"✅ Found next audio file: {filename} at index {index}", "INFO")
434
- return {
435
- 'filename': filename,
436
- 'url': url,
437
- 'index': index
438
- }
439
-
440
- elif file_state == "processing":
441
- log_message(f"⚠️ File {filename} is currently marked as 'processing'. Skipping for now.", "WARNING")
442
-
443
- elif file_state == "processed":
444
- log_message(f"ℹ️ File {filename} already processed. Skipping.", "INFO")
445
-
446
- log_message("ℹ️ All files up to the current index have been processed or skipped.", "INFO")
447
 
448
- if start_index >= len(audio_files):
449
- log_message("ℹ️ Reached end of file list. Resetting index to 0 for next loop.", "INFO")
450
- state["next_download_index"] = 0
451
- upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, state)
452
-
453
- return None
454
 
455
- except Exception as e:
456
- log_message(f"❌ Failed to list files from Hugging Face: {str(e)}", "ERROR")
457
- return None
458
-
459
- def main_processing_loop():
460
- """The main loop that orchestrates the download, transcription, and upload cycle."""
461
-
462
- if processing_status["is_running"]:
463
- log_message("⚠️ Processing loop is already running.", "WARNING")
464
- return
465
 
466
- processing_status["is_running"] = True
467
-
468
- try:
469
- log_message("🚀 Starting audio transcription processing loop...", "INFO")
470
 
471
- # Fetch reference files from BG_Vid repo once at the start
472
- reference_map = fetch_reference_files(REFERENCE_REPO_ID)
473
 
474
- if not reference_map:
475
- log_message("❌ No reference files found. Cannot proceed.", "ERROR")
476
- return
477
 
478
- while processing_status["is_running"]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
479
 
480
- current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
481
- next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
482
 
483
- if next_file_info is None:
484
- log_message("💤 No new audio files to process. Sleeping for a while...", "INFO")
485
- time.sleep(PROCESSING_DELAY * 5)
486
- continue
487
-
488
- target_file = next_file_info['filename']
489
- audio_url = next_file_info['url']
490
- target_index = next_file_info['index']
491
 
492
- processing_status["current_file"] = target_file
493
- success = False
494
- matched_filename = None
495
-
496
- try:
497
- if not lock_file_for_processing(target_file, current_state):
498
- log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
499
- time.sleep(PROCESSING_DELAY)
500
- continue
501
-
502
- local_wav_path = os.path.join(DOWNLOAD_FOLDER, os.path.basename(target_file))
503
- log_message(f"⬇️ Downloading audio file: {target_file}", "INFO")
504
-
505
- if download_with_retry(audio_url, local_wav_path):
506
-
507
- # Extract base filename for matching
508
- base_filename = os.path.basename(target_file)
509
- matched_filename = find_matching_filename(base_filename, reference_map)
510
-
511
- # Use matched filename if found, otherwise use original filename
512
- output_filename = matched_filename if matched_filename else base_filename
513
-
514
- if process_audio_file(local_wav_path, reference_map, output_filename):
515
- success = True
516
- log_message(f" Finished processing: {target_file}", "INFO")
 
517
  else:
518
- log_message(f"❌ Processing failed for: {target_file}", "ERROR")
519
- else:
520
- log_message(f"❌ Download failed for: {target_file}", "ERROR")
521
-
522
- except Exception as e:
523
- log_message(f"🔥 An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
524
- log_failed_file(target_file, str(e))
525
-
526
- finally:
527
- next_index_to_save = target_index + 1
528
- current_state = download_hf_state(TARGET_REPO_ID, HF_STATE_FILE)
529
-
530
- if success:
531
- unlock_file_as_processed(target_file, current_state, next_index_to_save)
532
- processing_status["processed_files"] += 1
533
- else:
534
- log_message(f"⚠️ Processing failed for {target_file}. Marking as 'failed' and advancing index.", "WARNING")
535
- current_state["file_states"][target_file] = "failed"
536
- current_state["next_download_index"] = next_index_to_save
537
- upload_hf_state(TARGET_REPO_ID, HF_STATE_FILE, current_state)
538
- processing_status["failed_files"] += 1
539
-
540
- if os.path.exists(local_wav_path):
541
- os.remove(local_wav_path)
542
- log_message(f"🗑️ Cleaned up local file: {local_wav_path}", "INFO")
543
-
544
- time.sleep(PROCESSING_DELAY)
545
-
546
- log_message("🎉 Processing complete!", "INFO")
547
- log_message(f"📊 Final stats: {processing_status['transcribed_files']} audio files transcribed, {processing_status['processed_files']} files processed", "INFO")
548
-
549
- except KeyboardInterrupt:
550
- log_message("⏹️ Processing interrupted by user", "WARNING")
551
- except Exception as e:
552
- log_message(f"❌ Fatal error: {str(e)}", "ERROR")
553
- finally:
554
- processing_status["is_running"] = False
555
- cleanup_temp_files()
556
 
557
- if __name__ == "__main__":
558
- main_processing_loop()
559
 
560
- # ===== FASTAPI ENDPOINTS =====
 
 
 
 
561
 
562
- @app.get("/")
563
- async def root():
564
- """Root endpoint with service info"""
565
- return {
566
- "service": "Audio Transcriber",
567
- "status": "running",
568
- "version": "1.0.0",
569
- "endpoints": {
570
- "status": "/status",
571
- "start": "/start",
572
- "stop": "/stop",
573
- "process": "/process/{filename}",
574
- "logs": "/logs"
575
- }
576
- }
577
 
578
- @app.get("/status")
579
- async def get_status():
580
- """Get current processing status"""
 
 
 
 
 
581
  return {
582
- "is_running": processing_status["is_running"],
583
- "current_file": processing_status["current_file"],
584
- "total_files": processing_status["total_files"],
585
- "processed_files": processing_status["processed_files"],
586
- "transcribed_files": processing_status["transcribed_files"],
587
- "failed_files": processing_status["failed_files"],
588
- "last_update": processing_status["last_update"],
589
- "recent_logs": processing_status["logs"][-10:]
590
  }
591
 
592
- @app.post("/start")
593
- async def start_processing():
594
- """Start the main processing loop"""
595
- if processing_status["is_running"]:
596
- raise HTTPException(status_code=400, detail="Processing already running")
597
 
598
- # Start processing in a separate thread
599
- thread = threading.Thread(target=main_processing_loop, daemon=True)
600
- thread.start()
 
 
 
601
 
602
- return {
603
- "message": "Processing started",
604
- "status": "started"
605
- }
606
-
607
- @app.post("/stop")
608
- async def stop_processing():
609
- """Stop the main processing loop"""
610
- if not processing_status["is_running"]:
611
- raise HTTPException(status_code=400, detail="Processing not running")
612
 
613
- processing_status["is_running"] = False
 
 
614
 
615
- return {
616
- "message": "Processing stopped",
617
- "status": "stopped"
618
- }
619
-
620
- @app.get("/logs")
621
- async def get_logs(limit: int = 50):
622
- """Get recent logs"""
623
- logs = processing_status["logs"][-limit:]
624
- return {
625
- "total_logs": len(processing_status["logs"]),
626
- "recent_logs": logs
627
  }
628
-
629
- @app.post("/process/{filename}")
630
- async def process_single_file(filename: str):
631
- """Process a single audio file manually"""
632
- try:
633
- log_message(f"🎯 Manual processing requested for: {filename}", "INFO")
634
-
635
- # Download and process the file
636
- reference_map = fetch_reference_files(REFERENCE_REPO_ID)
637
- if not reference_map:
638
- raise HTTPException(status_code=500, detail="Could not fetch reference files")
639
-
640
- # Get file URL
641
- audio_url = hf_hub_url(repo_id=SOURCE_REPO_ID, filename=filename, repo_type="dataset", subfolder=None)
642
- local_wav_path = os.path.join(DOWNLOAD_FOLDER, os.path.basename(filename))
643
-
644
- # Download
645
- if not download_with_retry(audio_url, local_wav_path):
646
- raise HTTPException(status_code=500, detail="Failed to download file")
647
 
648
- # Find match
649
- base_filename = os.path.basename(filename)
650
- matched_filename = find_matching_filename(base_filename, reference_map)
 
 
651
 
652
- if not matched_filename:
653
- os.remove(local_wav_path)
654
- raise HTTPException(status_code=404, detail="No matching filename found")
 
655
 
656
- # Process
657
- if process_audio_file(local_wav_path, reference_map, matched_filename):
658
- processing_status["transcribed_files"] += 1
 
 
 
659
 
660
- if os.path.exists(local_wav_path):
661
- os.remove(local_wav_path)
 
 
 
 
662
 
663
- return {
664
- "status": "success",
665
- "file": filename,
666
- "matched": matched_filename,
667
- "message": "Audio transcribed and uploaded successfully"
668
- }
669
- else:
670
- if os.path.exists(local_wav_path):
671
- os.remove(local_wav_path)
672
- raise HTTPException(status_code=500, detail="Processing failed")
673
 
674
- except Exception as e:
675
- log_message(f"❌ Manual processing error: {str(e)}", "ERROR")
676
- raise HTTPException(status_code=500, detail=str(e))
677
-
678
- @app.on_event("startup")
679
- async def startup_event():
680
- """Auto-start processing when server starts"""
681
- log_message("🚀 Server startup: Checking dependencies...", "INFO")
682
-
683
- try:
684
- import transformers
685
- log_message("✅ Transformers found", "INFO")
686
- except ImportError:
687
- log_message("⚠️ WARNING: Transformers not installed!", "WARNING")
688
- log_message(" Install with: pip install transformers librosa torch torchaudio", "WARNING")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
689
 
690
- log_message("🚀 Server startup: Auto-starting processing loop", "INFO")
 
 
 
691
 
692
- # Start processing in a separate thread
693
- thread = threading.Thread(target=main_processing_loop, daemon=True)
694
- thread.start()
 
 
 
 
 
 
 
 
 
 
 
 
695
 
696
- def run_api(host: str = "0.0.0.0", port: int = 8000):
697
- """Run the FastAPI server"""
698
- log_message(f"🚀 Starting FastAPI server on {host}:{port}", "INFO")
699
- uvicorn.run(app, host=host, port=port)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
700
 
701
  if __name__ == "__main__":
702
- # Run API server (processing will auto-start via startup event)
703
- run_api()
 
1
  import os
2
  import json
 
 
 
3
  import time
4
+ import asyncio
5
+ import aiohttp
6
+ import zipfile
7
+ import shutil
8
+ from typing import Dict, List, Set, Optional, Tuple, Any
9
+ from urllib.parse import quote
10
+ from datetime import datetime
11
+ from pathlib import Path
12
+ import io
13
+
14
+ from fastapi import FastAPI, BackgroundTasks, HTTPException, status
15
+ from pydantic import BaseModel, Field
16
+ from huggingface_hub import HfApi, hf_hub_download
17
+
18
+ # --- Configuration ---
19
+ AUTO_START_INDEX = 0 # Hardcoded default start index if no progress is found
20
+ FLOW_ID = os.getenv("FLOW_ID", "flow_default")
21
+ FLOW_PORT = int(os.getenv("FLOW_PORT", 8001))
22
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
23
+ HF_AUDIO_DATASET_ID = os.getenv("HF_AUDIO_DATASET_ID", "Samfredoly/BG_Vid") # Source dataset for audio files
24
+ HF_OUTPUT_DATASET_ID = os.getenv("HF_OUTPUT_DATASET_ID", "samfred2/AT2") # Target dataset for transcriptions
25
+
26
+ # Progress and State Tracking
27
+ PROGRESS_FILE = Path("processing_progress.json")
28
+ HF_STATE_FILE = "processing_state_transcriptions.json" # State file in output dataset
29
+ LOCAL_STATE_FOLDER = Path(".state") # Local folder for state file
30
+ LOCAL_STATE_FOLDER.mkdir(exist_ok=True)
31
+
32
+ # Directory within the HF dataset where audio files are located
33
+ AUDIO_FILE_PREFIX = "audio/"
34
+
35
+ # Reference dataset for filename mapping
36
+ REFERENCE_REPO_ID = os.getenv("REFERENCE_REPO_ID", "Fred808/BG3") # For matching audio to reference files
37
+
38
+ # Whisper server endpoints
39
+ WHISPER_SERVERS = [
40
+ "https://fred1012-switch3.hf.space/transcribe",
41
+ ]
42
+
43
+ MODEL_TYPE = "whisper-small"
44
+ ZIP_UPLOAD_THRESHOLD = 100 # Upload and zip after this many transcriptions
45
+
46
+ # Temporary storage for audio files
47
+ TEMP_DIR = Path(f"temp_audio_{FLOW_ID}")
48
+ TEMP_DIR.mkdir(exist_ok=True)
49
+
50
+ # Temporary storage for transcription results
51
+ RESULTS_DIR = Path(f"transcription_results_{FLOW_ID}")
52
+ RESULTS_DIR.mkdir(exist_ok=True)
53
+
54
+ # --- Models ---
55
+ class WhisperServer:
56
+ def __init__(self, url):
57
+ self.url = url
58
+ self.busy = False
59
+ self.total_processed = 0
60
+ self.total_time = 0
61
+ self.model = MODEL_TYPE
62
+
63
+ @property
64
+ def fps(self):
65
+ return self.total_processed / self.total_time if self.total_time > 0 else 0
66
+
67
+ # Global state for whisper servers
68
+ servers = [WhisperServer(url) for url in WHISPER_SERVERS]
69
+ server_index = 0
70
+
71
+ # --- Progress and State Management Functions ---
72
+
73
+ def load_progress() -> Dict:
74
+ """Loads the local processing progress from the JSON file."""
75
+ if PROGRESS_FILE.exists():
76
+ try:
77
+ with PROGRESS_FILE.open('r') as f:
78
+ return json.load(f)
79
+ except json.JSONDecodeError:
80
+ print(f"[{FLOW_ID}] WARNING: Progress file is corrupted. Starting fresh.")
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
+ # Default structure
83
+ return {
84
+ "last_processed_index": 0,
85
+ "processed_files": {}, # {index: audio_file_path}
86
+ "file_list": [], # Full list of all audio files found in the dataset
87
+ "transcription_count": 0, # Count of transcriptions saved
88
+ "reference_map": {} # Mapping from audio filename to reference filename
89
+ }
90
+
91
+ def save_progress(progress_data: Dict):
92
+ """Saves the local processing progress to the JSON file."""
93
+ try:
94
+ with PROGRESS_FILE.open('w') as f:
95
+ json.dump(progress_data, f, indent=4)
96
+ except Exception as e:
97
+ print(f"[{FLOW_ID}] CRITICAL ERROR: Could not save progress to {PROGRESS_FILE}: {e}")
98
 
99
  def load_json_state(file_path: str, default_value: Dict[str, Any]) -> Dict[str, Any]:
100
  """Load state from JSON file with migration logic for new structure."""
 
103
  with open(file_path, "r") as f:
104
  data = json.load(f)
105
 
106
+ # Migration Logic
107
  if "file_states" not in data or not isinstance(data["file_states"], dict):
108
+ print(f"[{FLOW_ID}] Initializing 'file_states' dictionary.")
109
  data["file_states"] = {}
110
 
111
  if "next_download_index" not in data:
112
  data["next_download_index"] = 0
113
 
114
+ if "transcription_count" not in data:
115
+ data["transcription_count"] = 0
116
+
117
  return data
118
  except json.JSONDecodeError:
119
+ print(f"[{FLOW_ID}] WARNING: Corrupted state file: {file_path}")
120
  return default_value
121
 
122
  def save_json_state(file_path: str, data: Dict[str, Any]):
 
124
  with open(file_path, "w") as f:
125
  json.dump(data, f, indent=2)
126
 
127
+ async def download_hf_state() -> Dict[str, Any]:
128
  """Downloads the state file from Hugging Face or returns a default state."""
129
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
130
+ default_state = {"next_download_index": 0, "file_states": {}, "transcription_count": 0}
131
 
132
  try:
133
+ # Check if the file exists in the output repo
134
+ files = HfApi(token=HF_TOKEN).list_repo_files(
135
+ repo_id=HF_OUTPUT_DATASET_ID,
136
+ repo_type="dataset"
137
+ )
138
+
139
+ if HF_STATE_FILE not in files:
140
+ print(f"[{FLOW_ID}] State file not found in {HF_OUTPUT_DATASET_ID}. Starting fresh.")
141
  return default_state
142
 
143
+ # Download the file
144
  hf_hub_download(
145
+ repo_id=HF_OUTPUT_DATASET_ID,
146
+ filename=HF_STATE_FILE,
147
  repo_type="dataset",
148
  local_dir=LOCAL_STATE_FOLDER,
149
+ local_dir_use_symlinks=False,
150
+ token=HF_TOKEN
151
  )
152
 
153
+ print(f"[{FLOW_ID}] Successfully downloaded state file.")
154
+ return load_json_state(str(local_path), default_state)
155
 
156
  except Exception as e:
157
+ print(f"[{FLOW_ID}] Failed to download state file: {str(e)}. Starting fresh.")
158
  return default_state
159
 
160
+ async def upload_hf_state(state: Dict[str, Any]) -> bool:
161
  """Uploads the state file to Hugging Face."""
162
+ local_path = LOCAL_STATE_FOLDER / HF_STATE_FILE
163
 
164
  try:
165
+ # Save state locally first
166
+ save_json_state(str(local_path), state)
167
+
168
+ # Upload to output dataset
169
+ HfApi(token=HF_TOKEN).upload_file(
170
+ path_or_fileobj=str(local_path),
171
+ path_in_repo=HF_STATE_FILE,
172
+ repo_id=HF_OUTPUT_DATASET_ID,
173
  repo_type="dataset",
174
+ commit_message=f"Update transcription processing state: next_index={state['next_download_index']}, count={state.get('transcription_count', 0)}"
175
  )
176
+ print(f"[{FLOW_ID}] Successfully uploaded state file.")
177
  return True
178
  except Exception as e:
179
+ print(f"[{FLOW_ID}] Failed to upload state file: {str(e)}")
180
  return False
181
 
182
+ async def lock_file_for_processing(audio_filename: str, state: Dict[str, Any]) -> bool:
183
  """Marks a file as 'processing' in the state file and uploads the lock."""
184
+ print(f"[{FLOW_ID}] 🔒 Attempting to lock file: {audio_filename}")
185
 
186
+ # Update state locally
187
+ state["file_states"][audio_filename] = "processing"
188
 
189
+ # Upload the updated state file immediately to establish the lock
190
+ if await upload_hf_state(state):
191
+ print(f"[{FLOW_ID}] ✅ Successfully locked file: {audio_filename}")
192
  return True
193
  else:
194
+ print(f"[{FLOW_ID}] ❌ Failed to lock file: {audio_filename}")
195
+ # Revert local state
196
+ if audio_filename in state["file_states"]:
197
+ del state["file_states"][audio_filename]
198
  return False
199
 
200
+ async def unlock_file_as_processed(audio_filename: str, state: Dict[str, Any], next_index: int) -> bool:
201
  """Marks a file as 'processed', updates the index, and uploads the state."""
202
+ print(f"[{FLOW_ID}] 🔓 Marking file as processed: {audio_filename}")
203
 
204
+ # Update state locally
205
+ state["file_states"][audio_filename] = "processed"
206
  state["next_download_index"] = next_index
207
 
208
+ # Upload the updated state
209
+ if await upload_hf_state(state):
210
+ print(f"[{FLOW_ID}] ✅ Successfully marked as processed: {audio_filename}")
211
  return True
212
  else:
213
+ print(f"[{FLOW_ID}] ❌ Failed to update state for: {audio_filename}")
214
  return False
215
 
216
+ # --- Hugging Face Utility Functions ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
217
 
218
+ async def get_reference_map(reference_repo_id: str) -> Dict[str, str]:
219
+ """
220
+ Fetches the reference file list from the Hugging Face repo and creates a map
221
+ from audio filename (without extension) to reference filename.
222
+ """
223
+ print(f"[{FLOW_ID}] Fetching reference file list from {reference_repo_id}...")
224
 
225
  try:
226
+ api = HfApi(token=HF_TOKEN)
227
+ repo_files = api.list_repo_files(repo_id=reference_repo_id, repo_type="dataset")
 
 
228
 
229
+ reference_map = {}
230
+ for file in repo_files:
231
+ base_name, ext = os.path.splitext(file)
232
+ if ext.lower() in ['.txt', '.json']: # Consider text/json files as reference
233
+ reference_map[base_name] = file
234
 
235
+ print(f"[{FLOW_ID}] Successfully created reference map with {len(reference_map)} entries.")
236
+ return reference_map
237
 
238
  except Exception as e:
239
+ print(f"[{FLOW_ID}] ⚠️ Failed to fetch reference map from Hugging Face: {e}")
240
  return {}
241
 
242
+ def find_matching_filename(audio_filename: str, reference_map: Dict[str, str]) -> Optional[str]:
243
+ """
244
+ Finds the matching reference filename for a given audio filename.
245
+ Returns the reference filename if found, otherwise None.
246
+ """
247
+ base_name, _ = os.path.splitext(audio_filename)
248
+ return reference_map.get(base_name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
249
 
250
+ async def get_audio_file_list(progress_data: Dict) -> List[str]:
251
+ """
252
+ Fetches the list of all audio files from the dataset, or uses the cached list.
253
+ Updates the progress_data with the file list if a new list is fetched.
254
+ """
255
+ if progress_data['file_list']:
256
+ print(f"[{FLOW_ID}] Using cached file list with {len(progress_data['file_list'])} files.")
257
+ return progress_data['file_list']
258
+
259
+ print(f"[{FLOW_ID}] Fetching full list of audio files from {HF_AUDIO_DATASET_ID}...")
260
  try:
261
+ api = HfApi(token=HF_TOKEN)
262
+ repo_files = api.list_repo_files(
263
+ repo_id=HF_AUDIO_DATASET_ID,
264
+ repo_type="dataset"
 
 
 
 
 
 
 
 
 
265
  )
266
 
267
+ # Filter for audio files in the specified directory and sort them alphabetically for consistent indexing
268
+ audio_extensions = ['.mp3', '.wav', '.m4a', '.flac', '.ogg', '.aac']
269
+ audio_files = sorted([
270
+ f for f in repo_files
271
+ if f.startswith(AUDIO_FILE_PREFIX) and any(f.lower().endswith(ext) for ext in audio_extensions)
272
+ ])
273
+
274
+ if not audio_files:
275
+ raise FileNotFoundError(f"No audio files found in '{AUDIO_FILE_PREFIX}' directory of dataset '{HF_AUDIO_DATASET_ID}'.")
276
 
277
+ print(f"[{FLOW_ID}] Found {len(audio_files)} audio files.")
 
 
 
 
278
 
279
+ # Update and save the progress data
280
+ progress_data['file_list'] = audio_files
281
+ save_progress(progress_data)
282
+
283
+ return audio_files
284
 
 
 
 
 
 
285
  except Exception as e:
286
+ print(f"[{FLOW_ID}] Error fetching file list from Hugging Face: {e}")
287
+ return []
288
 
289
+ async def download_audio_file(file_index: int, repo_file_full_path: str) -> Optional[Path]:
290
+ """Downloads the audio file for the given index."""
 
 
 
 
 
 
 
291
 
292
+ audio_filename = Path(repo_file_full_path).name
 
 
 
 
293
 
294
+ print(f"[{FLOW_ID}] Processing audio file #{file_index}: {repo_file_full_path}")
 
 
295
 
296
  try:
297
+ # Use hf_hub_download to get the file path
298
+ audio_path = hf_hub_download(
299
+ repo_id=HF_AUDIO_DATASET_ID,
300
+ filename=repo_file_full_path,
301
+ repo_type="dataset",
302
+ token=HF_TOKEN,
303
+ )
304
 
305
+ print(f"[{FLOW_ID}] Downloaded audio to {audio_path}.")
 
306
 
307
+ # Copy to temp directory
308
+ temp_path = TEMP_DIR / audio_filename
309
+ shutil.copy2(audio_path, temp_path)
310
+
311
+ return temp_path
312
 
313
  except Exception as e:
314
+ print(f"[{FLOW_ID}] Error downloading audio file {repo_file_full_path}: {e}")
315
+ return None
316
+
317
+ async def upload_transcription_to_hf(audio_filename: str, transcription_data: Dict, reference_filename: Optional[str] = None) -> bool:
318
+ """
319
+ Uploads the transcription JSON file to the output dataset.
320
+ If reference_filename is provided, uses it as the base for the output filename.
321
+ Otherwise, uses the audio filename.
322
+ """
323
+ # Use reference filename if provided, otherwise use audio filename
324
+ output_base = Path(reference_filename).stem if reference_filename else Path(audio_filename).stem
325
+ json_filename = f"{output_base}.json"
326
 
 
327
  try:
328
+ print(f"[{FLOW_ID}] Uploading transcription for {audio_filename} as {json_filename} to {HF_OUTPUT_DATASET_ID}...")
329
+
330
+ # Create JSON content in memory
331
+ json_content = json.dumps(transcription_data, indent=2, ensure_ascii=False).encode('utf-8')
332
 
333
+ api = HfApi(token=HF_TOKEN)
334
+ api.upload_file(
335
+ path_or_fileobj=io.BytesIO(json_content),
336
+ path_in_repo=json_filename,
337
+ repo_id=HF_OUTPUT_DATASET_ID,
338
  repo_type="dataset",
339
+ commit_message=f"[{FLOW_ID}] Transcription for {audio_filename}"
340
  )
341
+
342
+ print(f"[{FLOW_ID}] Successfully uploaded transcription for {audio_filename}.")
343
+ return True
344
 
345
  except Exception as e:
346
+ print(f"[{FLOW_ID}] Error uploading transcription for {audio_filename}: {e}")
 
347
  return False
 
 
 
 
 
 
 
 
 
348
 
349
+ async def zip_and_upload_transcriptions(transcription_files: List[Path]) -> bool:
350
+ """Zips transcription JSON files and uploads to dataset."""
351
+ if not transcription_files:
352
+ print(f"[{FLOW_ID}] No transcription files to zip.")
353
+ return False
 
354
 
355
  try:
356
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
357
+ zip_filename = f"transcriptions_{timestamp}.zip"
358
+ zip_path = RESULTS_DIR / zip_filename
 
 
 
 
 
 
 
359
 
360
+ print(f"[{FLOW_ID}] Creating zip file: {zip_path} with {len(transcription_files)} files...")
361
 
362
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
363
+ for file_path in transcription_files:
364
+ if file_path.exists():
365
+ zipf.write(file_path, arcname=file_path.name)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
366
 
367
+ print(f"[{FLOW_ID}] Uploading zip file to {HF_OUTPUT_DATASET_ID}...")
 
 
 
 
 
368
 
369
+ api = HfApi(token=HF_TOKEN)
370
+ api.upload_file(
371
+ path_or_fileobj=str(zip_path),
372
+ path_in_repo=zip_filename,
373
+ repo_id=HF_OUTPUT_DATASET_ID,
374
+ repo_type="dataset",
375
+ commit_message=f"[{FLOW_ID}] Batch transcriptions: {len(transcription_files)} files"
376
+ )
 
 
377
 
378
+ print(f"[{FLOW_ID}] Successfully uploaded zip file: {zip_filename}")
 
 
 
379
 
380
+ # Cleanup
381
+ os.remove(zip_path)
382
 
383
+ return True
 
 
384
 
385
+ except Exception as e:
386
+ print(f"[{FLOW_ID}] Error zipping and uploading transcriptions: {e}")
387
+ return False
388
+
389
+ # --- Core Processing Functions ---
390
+
391
+ async def get_available_server(timeout: float = 300.0) -> WhisperServer:
392
+ """Round-robin selection of an available whisper server."""
393
+ global server_index
394
+ start_time = time.time()
395
+ while True:
396
+ # Round-robin check for an available server
397
+ for _ in range(len(servers)):
398
+ server = servers[server_index]
399
+ server_index = (server_index + 1) % len(servers)
400
+ if not server.busy:
401
+ return server
402
+
403
+ # If all servers are busy, wait for a short period and check again
404
+ await asyncio.sleep(0.5)
405
+
406
+ # Check if timeout has been reached
407
+ if time.time() - start_time > timeout:
408
+ raise TimeoutError(f"Timeout ({timeout}s) waiting for an available whisper server.")
409
+
410
+ async def send_audio_for_transcription(audio_path: Path, progress_tracker: Dict) -> Optional[Dict]:
411
+ """Sends a single audio file to a whisper server for transcription."""
412
+ MAX_RETRIES = 3
413
+ for attempt in range(MAX_RETRIES):
414
+ server = None
415
+ try:
416
+ # 1. Get an available server
417
+ server = await get_available_server()
418
+ server.busy = True
419
+ start_time = time.time()
420
 
421
+ if attempt == 0:
422
+ print(f"[{FLOW_ID}] Starting transcription attempt on {audio_path.name}...")
423
 
424
+ # 2. Prepare request data
425
+ form_data = aiohttp.FormData()
426
+ form_data.add_field('file',
427
+ audio_path.open('rb'),
428
+ filename=audio_path.name,
429
+ content_type='audio/mpeg')
 
 
430
 
431
+ # 3. Send request
432
+ async with aiohttp.ClientSession() as session:
433
+ async with session.post(server.url, data=form_data, timeout=aiohttp.ClientTimeout(total=600)) as resp:
434
+ if resp.status == 200:
435
+ result = await resp.json()
436
+
437
+ # Check if response contains transcription data
438
+ if result.get('text') or result.get('transcription'):
439
+ # Update progress counter
440
+ progress_tracker['completed'] += 1
441
+ if progress_tracker['completed'] % 10 == 0:
442
+ print(f"[{FLOW_ID}] PROGRESS: {progress_tracker['completed']}/{progress_tracker['total']} transcriptions completed.")
443
+
444
+ print(f"[{FLOW_ID}] Success: {audio_path.name} transcribed by {server.url}")
445
+
446
+ # Store the full transcription result
447
+ return {
448
+ "audio_file": audio_path.name,
449
+ "text": result.get('text', result.get('transcription', '')),
450
+ "language": result.get('language', 'unknown'),
451
+ "confidence": result.get('confidence'),
452
+ "duration": result.get('duration'),
453
+ }
454
+ else:
455
+ print(f"[{FLOW_ID}] Server {server.url} returned invalid response format for {audio_path.name}. Response: {result}")
456
+ continue
457
  else:
458
+ error_text = await resp.text()
459
+ print(f"[{FLOW_ID}] Error from server {server.url} for {audio_path.name}: {resp.status} - {error_text}. Retrying...")
460
+ continue
461
+
462
+ except (aiohttp.ClientError, asyncio.TimeoutError, TimeoutError) as e:
463
+ print(f"[{FLOW_ID}] Connection/Timeout error for {audio_path.name} on {server.url if server else 'unknown server'}: {e}. Retrying...")
464
+ continue
465
+ except Exception as e:
466
+ print(f"[{FLOW_ID}] Unexpected error during transcription for {audio_path.name}: {e}. Retrying...")
467
+ continue
468
+ finally:
469
+ if server:
470
+ end_time = time.time()
471
+ server.busy = False
472
+ server.total_processed += 1
473
+ server.total_time += (end_time - start_time)
474
+
475
+ print(f"[{FLOW_ID}] FAILED after {MAX_RETRIES} attempts for {audio_path.name}.")
476
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
477
 
478
+ # --- FastAPI App and Endpoints ---
 
479
 
480
+ app = FastAPI(
481
+ title=f"Flow Server {FLOW_ID} API",
482
+ description="Processes audio files from a dataset, sends to whisper servers for transcription, and tracks progress.",
483
+ version="1.0.0"
484
+ )
485
 
486
+ @app.on_event("startup")
487
+ async def startup_event():
488
+ print(f"Flow Server {FLOW_ID} started on port {FLOW_PORT}.")
 
 
 
 
 
 
 
 
 
 
 
 
489
 
490
+ @app.post("/process")
491
+ async def process_audio_files(background_tasks: BackgroundTasks):
492
+ """
493
+ Main processing endpoint that orchestrates transcription of audio files.
494
+ Fetches audio from HF dataset, sends to Whisper servers, and uploads results.
495
+ Uses reference file mapping for output filename renaming.
496
+ """
497
+ background_tasks.add_task(process_audio_files_background)
498
  return {
499
+ "status": "processing_started",
500
+ "flow_id": FLOW_ID,
501
+ "message": "Background processing task started. Check /status for progress."
 
 
 
 
 
502
  }
503
 
504
+ async def process_audio_files_background():
505
+ """Background task that processes audio files with reference mapping."""
506
+ progress_data = load_progress()
507
+ reference_map = progress_data.get('reference_map', {})
 
508
 
509
+ # Fetch reference map if empty
510
+ if not reference_map:
511
+ print(f"[{FLOW_ID}] Reference map is empty. Fetching from {REFERENCE_REPO_ID}...")
512
+ reference_map = await get_reference_map(REFERENCE_REPO_ID)
513
+ progress_data['reference_map'] = reference_map
514
+ save_progress(progress_data)
515
 
516
+ audio_files = await get_audio_file_list(progress_data)
517
+ if not audio_files:
518
+ print(f"[{FLOW_ID}] No audio files found. Exiting.")
519
+ return
 
 
 
 
 
 
520
 
521
+ start_index = progress_data['last_processed_index']
522
+ transcription_results = []
523
+ transcription_files = []
524
 
525
+ # Progress tracking for console output
526
+ progress_tracker = {
527
+ 'total': len(audio_files) - start_index,
528
+ 'completed': 0
 
 
 
 
 
 
 
 
529
  }
530
+
531
+ print(f"[{FLOW_ID}] Starting processing from file #{start_index} (out of {len(audio_files)})...")
532
+
533
+ for file_index in range(start_index, len(audio_files)):
534
+ repo_file_path = audio_files[file_index]
535
+ audio_filename = Path(repo_file_path).name
 
 
 
 
 
 
 
 
 
 
 
 
 
536
 
537
+ # Check if already processed
538
+ state = await download_hf_state()
539
+ if audio_filename in state.get('file_states', {}) and state['file_states'][audio_filename] == 'processed':
540
+ print(f"[{FLOW_ID}] Skipping already processed: {audio_filename}")
541
+ continue
542
 
543
+ # Lock the file for processing
544
+ if not await lock_file_for_processing(audio_filename, state):
545
+ print(f"[{FLOW_ID}] Could not lock file {audio_filename}, skipping.")
546
+ continue
547
 
548
+ try:
549
+ # Download audio file
550
+ audio_path = await download_audio_file(file_index, repo_file_path)
551
+ if not audio_path:
552
+ print(f"[{FLOW_ID}] Failed to download {audio_filename}")
553
+ continue
554
 
555
+ # Get matching reference filename
556
+ reference_filename = find_matching_filename(audio_filename, reference_map)
557
+ if reference_filename:
558
+ print(f"[{FLOW_ID}] Found reference match: {audio_filename} → {reference_filename}")
559
+ else:
560
+ print(f"[{FLOW_ID}] No reference match for {audio_filename}, will use audio filename")
561
 
562
+ # Send for transcription
563
+ transcription_result = await send_audio_for_transcription(audio_path, progress_tracker)
 
 
 
 
 
 
 
 
564
 
565
+ if transcription_result:
566
+ # Upload with reference filename if available
567
+ success = await upload_transcription_to_hf(
568
+ audio_filename,
569
+ transcription_result,
570
+ reference_filename=reference_filename
571
+ )
572
+
573
+ if success:
574
+ transcription_results.append(transcription_result)
575
+ json_filename = Path(reference_filename).stem if reference_filename else Path(audio_filename).stem
576
+ transcription_files.append(Path(RESULTS_DIR) / f"{json_filename}.json")
577
+
578
+ # Mark as processed with fresh state
579
+ fresh_state = await download_hf_state()
580
+ await unlock_file_as_processed(audio_filename, fresh_state, file_index + 1)
581
+ progress_data['transcription_count'] += 1
582
+ save_progress(progress_data)
583
+
584
+ # Cleanup
585
+ if audio_path.exists():
586
+ os.remove(audio_path)
587
+
588
+ # Check if we've reached the batch threshold
589
+ if len(transcription_files) >= ZIP_UPLOAD_THRESHOLD:
590
+ print(f"[{FLOW_ID}] Reached batch threshold ({ZIP_UPLOAD_THRESHOLD}). Creating zip...")
591
+ await zip_and_upload_transcriptions(transcription_files)
592
+ transcription_files = []
593
+ transcription_results = []
594
+
595
+ except Exception as e:
596
+ print(f"[{FLOW_ID}] Error processing {audio_filename}: {e}")
597
+
598
+ # Save progress after each file
599
+ progress_data['last_processed_index'] = file_index + 1
600
+ save_progress(progress_data)
601
 
602
+ # Upload remaining transcriptions
603
+ if transcription_files:
604
+ print(f"[{FLOW_ID}] Uploading final batch of {len(transcription_files)} transcriptions...")
605
+ await zip_and_upload_transcriptions(transcription_files)
606
 
607
+ print(f"[{FLOW_ID}] Processing complete! Total transcriptions: {progress_data['transcription_count']}")
608
+
609
+ @app.get("/")
610
+ async def root():
611
+ progress = load_progress()
612
+ return {
613
+ "flow_id": FLOW_ID,
614
+ "status": "ready",
615
+ "last_processed_index": progress['last_processed_index'],
616
+ "total_files_in_list": len(progress['file_list']),
617
+ "processed_files_count": len(progress['processed_files']),
618
+ "transcription_count": progress.get('transcription_count', 0),
619
+ "total_servers": len(servers),
620
+ "busy_servers": sum(1 for s in servers if s.busy),
621
+ }
622
 
623
+ @app.get("/status")
624
+ async def get_status():
625
+ """Returns detailed processing status with reference map info."""
626
+ progress = load_progress()
627
+ state = await download_hf_state()
628
+
629
+ return {
630
+ "flow_id": FLOW_ID,
631
+ "status": "processing" if state['next_download_index'] < len(progress.get('file_list', [])) else "idle",
632
+ "progress": {
633
+ "current_index": state['next_download_index'],
634
+ "total_files": len(progress.get('file_list', [])),
635
+ "percentage": (state['next_download_index'] / len(progress.get('file_list', [])) * 100) if progress.get('file_list') else 0
636
+ },
637
+ "transcription_count": progress.get('transcription_count', 0),
638
+ "reference_map_size": len(progress.get('reference_map', {})),
639
+ "server_stats": {
640
+ "total_servers": len(servers),
641
+ "busy_servers": sum(1 for s in servers if s.busy),
642
+ "details": [
643
+ {
644
+ "url": s.url,
645
+ "busy": s.busy,
646
+ "total_processed": s.total_processed,
647
+ "avg_time_per_file": s.total_time / s.total_processed if s.total_processed > 0 else 0
648
+ }
649
+ for s in servers
650
+ ]
651
+ },
652
+ "files_in_processing": list(state.get('file_states', {}).keys())
653
+ }
654
 
655
  if __name__ == "__main__":
656
+ import uvicorn
657
+ uvicorn.run(app, host="0.0.0.0", port=FLOW_PORT)