fred1012 commited on
Commit
777ea53
·
verified ·
1 Parent(s): 143cb23

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +232 -720
app.py CHANGED
@@ -1,720 +1,232 @@
1
- import os
2
- import json
3
- import requests
4
- import subprocess
5
- import shutil
6
- import time
7
- import sys
8
- import threading
9
- from typing import Dict, List, Optional, Any
10
- from fastapi import FastAPI, HTTPException
11
- from fastapi.responses import JSONResponse
12
- import uvicorn
13
- import torch
14
- import librosa
15
- from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
16
-
17
- # Fix Unicode encoding for Windows
18
- if sys.platform == 'win32':
19
- import io
20
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
21
-
22
- # Initialize FastAPI app
23
- app = FastAPI(title="Audio Transcriber", description="Audio transcription and upload service")
24
-
25
- # ==== CONFIGURATION ====
26
- # The new backend URL for state management and transcription upload
27
- # It is now read from an environment variable, falling back to the default if not set.
28
- BACKEND_URL = os.environ.get("BACKEND_URL", "https://samfredoly-acp.hf.space")
29
- # The original Hugging Face repo IDs are still needed for fetching the audio files
30
- # and the reference file list, as the backend only handles transcription storage.
31
- SOURCE_REPO_ID = "Samfredoly/BG_Vid" # Fetch audio files from here
32
- TARGET_REPO_ID = "samfred2/A_Text" # Target repo ID is now a constant for the backend
33
- REFERENCE_REPO_ID = "Fred808/BG3" # Reference repo to match audio filenames
34
-
35
- # Path Configuration
36
- DOWNLOAD_FOLDER = "downloads_audio"
37
- TRANSCRIPTIONS_FOLDER = "transcriptions"
38
- LOCAL_STATE_FOLDER = ".state_audio"
39
-
40
- os.makedirs(DOWNLOAD_FOLDER, exist_ok=True)
41
- os.makedirs(TRANSCRIPTIONS_FOLDER, exist_ok=True)
42
- os.makedirs(LOCAL_STATE_FOLDER, exist_ok=True)
43
-
44
- # Whisper Model Setup (using transformers)
45
- DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
46
- TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32
47
- WHISPER_MODEL_ID = f"openai/whisper-small"
48
-
49
- # Global model cache
50
- _whisper_model = None
51
- _whisper_processor = None
52
- _whisper_pipeline = None
53
-
54
- def get_whisper_pipeline():
55
- """Get or initialize the Whisper pipeline."""
56
- global _whisper_model, _whisper_processor, _whisper_pipeline
57
-
58
- if _whisper_pipeline is not None:
59
- return _whisper_pipeline
60
-
61
- try:
62
- log_message(f"Loading Whisper model {WHISPER_MODEL_ID}...", "INFO")
63
-
64
- model = AutoModelForSpeechSeq2Seq.from_pretrained(
65
- WHISPER_MODEL_ID,
66
- torch_dtype=TORCH_DTYPE,
67
- low_cpu_mem_usage=True,
68
- use_safetensors=True
69
- )
70
- model = model.to(DEVICE)
71
-
72
- processor = AutoProcessor.from_pretrained(WHISPER_MODEL_ID)
73
-
74
- _whisper_pipeline = pipeline(
75
- "automatic-speech-recognition",
76
- model=model,
77
- tokenizer=processor.tokenizer,
78
- feature_extractor=processor.feature_extractor,
79
- torch_dtype=TORCH_DTYPE,
80
- device=DEVICE
81
- )
82
-
83
- log_message(f"✅ Whisper model loaded successfully on {DEVICE.upper()}", "INFO")
84
- return _whisper_pipeline
85
-
86
- except Exception as e:
87
- log_message(f"❌ Failed to load Whisper model: {str(e)}", "ERROR")
88
- raise
89
-
90
- # State Files
91
- FAILED_FILES_LOG = "failed_audio_files.log"
92
- HF_STATE_FILE = "processing_audio_state.json" # This is the filename the backend uses
93
-
94
- # Processing Parameters
95
- PROCESSING_DELAY = 2
96
- MAX_RETRIES = 3
97
- MIN_FREE_SPACE_GB = 1
98
- WHISPER_MODEL = "small" # Whisper model size
99
-
100
- # NOTE: The Hugging Face API is still required for listing files in SOURCE_REPO_ID and REFERENCE_REPO_ID
101
- from huggingface_hub import HfApi, hf_hub_url
102
- HF_TOKEN = os.environ.get("HF_TOKEN", "")
103
- hf_api = HfApi(token=HF_TOKEN)
104
-
105
- # Global State
106
- processing_status = {
107
- "is_running": False,
108
- "current_file": None,
109
- "total_files": 0,
110
- "processed_files": 0,
111
- "failed_files": 0,
112
- "transcribed_files": 0,
113
- "last_update": None,
114
- "logs": []
115
- }
116
-
117
- def log_message(message: str, level: str = "INFO"):
118
- """Log messages with timestamp"""
119
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
120
- log_entry = f"[{timestamp}] {level}: {message}"
121
- print(log_entry)
122
- processing_status["logs"].append(log_entry)
123
- processing_status["last_update"] = timestamp
124
- if len(processing_status["logs"]) > 100:
125
- processing_status["logs"] = processing_status["logs"][-100:]
126
-
127
- def log_failed_file(filename: str, error: str):
128
- """Log failed files to persistent file"""
129
- with open(FAILED_FILES_LOG, "a") as f:
130
- f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {filename}: {error}\n")
131
-
132
- def get_disk_usage(path: str) -> Dict[str, float]:
133
- """Get disk usage statistics in GB"""
134
- statvfs = os.statvfs(path)
135
- total = statvfs.f_frsize * statvfs.f_blocks / (1024**3)
136
- free = statvfs.f_frsize * statvfs.f_bavail / (1024**3)
137
- used = total - free
138
- return {"total": total, "free": free, "used": used}
139
-
140
- def check_disk_space(path: str = ".") -> bool:
141
- """Check if there's enough disk space"""
142
- disk_info = get_disk_usage(path)
143
- if disk_info["free"] < MIN_FREE_SPACE_GB:
144
- log_message(f'⚠️ Low disk space: {disk_info["free"]:.2f}GB free, {disk_info["used"]:.2f}GB used')
145
- return False
146
- return True
147
-
148
- def cleanup_temp_files():
149
- """Clean up temporary files to free space"""
150
- log_message("🧹 Cleaning up temporary files...", "INFO")
151
-
152
- current_file = processing_status.get("current_file")
153
- for file in os.listdir(DOWNLOAD_FOLDER):
154
- if file != current_file and file.endswith((".wav", ".mp3")):
155
- try:
156
- os.remove(os.path.join(DOWNLOAD_FOLDER, file))
157
- log_message(f"🗑️ Removed old download: {file}", "INFO")
158
- except:
159
- pass
160
-
161
- # Helper function to save state locally
162
- def save_json_state(file_path: str, data: Dict[str, Any]):
163
- """Save state to JSON file"""
164
- with open(file_path, "w") as f:
165
- json.dump(data, f, indent=2)
166
-
167
- # --- NEW API FUNCTIONS FOR STATE MANAGEMENT AND UPLOAD ---
168
-
169
- def download_state_from_api() -> Dict[str, Any]:
170
- """Downloads the state file from the backend API."""
171
- url = f"{BACKEND_URL}/state/"
172
- default_state = {"next_download_index": 0, "file_states": {}}
173
-
174
- try:
175
- response = requests.get(url, timeout=10)
176
- response.raise_for_status()
177
-
178
- # The API returns {"state": {...}}
179
- state_data = response.json().get("state", default_state)
180
-
181
- # Ensure the structure is correct (migration logic from original load_json_state)
182
- if "file_states" not in state_data or not isinstance(state_data["file_states"], dict):
183
- state_data["file_states"] = {}
184
- if "next_download_index" not in state_data:
185
- state_data["next_download_index"] = 0
186
-
187
- log_message(f"✅ Downloaded state: next_download_index={state_data['next_download_index']}, processed_files={len([f for f,s in state_data['file_states'].items() if s=='processed'])}", "INFO")
188
- return state_data
189
-
190
- except requests.exceptions.RequestException as e:
191
- log_message(f"⚠️ Failed to download state from API ({url}): {str(e)}. Starting from default state.", "WARNING")
192
- return default_state
193
-
194
- def upload_state_to_api(state: Dict[str, Any]) -> bool:
195
- """
196
- Saves the state locally and uploads it to the backend API's /upload/ endpoint.
197
- This simulates the original HF state upload for locking/unlocking.
198
- """
199
- local_path = os.path.join(LOCAL_STATE_FOLDER, HF_STATE_FILE)
200
- url = f"{BACKEND_URL}/upload/"
201
-
202
- try:
203
- # 1. Save the current state locally
204
- save_json_state(local_path, state)
205
-
206
- # 2. Upload the state file to the backend
207
- with open(local_path, "rb") as f:
208
- files = {'file': (HF_STATE_FILE, f, 'application/json')}
209
-
210
- response = requests.post(url, files=files, timeout=30)
211
- response.raise_for_status()
212
-
213
- log_message(f"✅ Successfully uploaded state file to API: {HF_STATE_FILE}", "INFO")
214
- return True
215
-
216
- except requests.exceptions.HTTPError as e:
217
- if hasattr(e, 'response') and e.response.status_code == 409:
218
- log_message(f"⚠️ State file already exists on server (409 Conflict) - Treating as success.", "INFO")
219
- return True
220
- log_message(f"❌ Failed to upload state file to API ({url}): {str(e)}", "ERROR")
221
- return False
222
- except requests.exceptions.RequestException as e:
223
- log_message(f"❌ Failed to upload state file to API ({url}): {str(e)}", "ERROR")
224
- return False
225
- except Exception as e:
226
- log_message(f"❌ An unexpected error occurred during API state upload: {str(e)}", "ERROR")
227
- return False
228
-
229
- def upload_transcription_to_api(json_output_path: str, matched_filename: str) -> bool:
230
- """Uploads the transcription JSON file to the backend API's /upload/ endpoint."""
231
- url = f"{BACKEND_URL}/upload/"
232
-
233
- try:
234
- with open(json_output_path, "rb") as f:
235
- files = {'file': (os.path.basename(json_output_path), f, 'application/json')}
236
-
237
- response = requests.post(url, files=files, timeout=30)
238
- response.raise_for_status()
239
-
240
- log_message(f"✅ Successfully uploaded transcription to API: {os.path.basename(json_output_path)}", "INFO")
241
- return True
242
-
243
- except requests.exceptions.HTTPError as e:
244
- if hasattr(e, 'response') and e.response.status_code == 409:
245
- log_message(f"⚠️ File already exists on server (409 Conflict) - Treating as success.", "INFO")
246
- return True
247
- log_message(f"❌ Failed to upload transcription to API ({url}): {str(e)}", "ERROR")
248
- return False
249
- except requests.exceptions.RequestException as e:
250
- log_message(f"❌ Failed to upload transcription to API ({url}): {str(e)}", "ERROR")
251
- return False
252
- except Exception as e:
253
- log_message(f"❌ An unexpected error occurred during API upload: {str(e)}", "ERROR")
254
- return False
255
-
256
- def lock_file_for_processing(wav_filename: str, state: Dict[str, Any]) -> bool:
257
- """Marks a file as 'processing' in the state file and uploads the lock via API."""
258
- log_message(f"🔒 Attempting to lock file: {wav_filename} (Marking as 'processing')", "INFO")
259
-
260
- state["file_states"][wav_filename] = "processing"
261
-
262
- if upload_state_to_api(state):
263
- log_message(f"✅ Successfully locked file: {wav_filename} via API state upload", "INFO")
264
- return True
265
- else:
266
- log_message(f"❌ Failed to upload lock for file: {wav_filename}. Aborting processing.", "ERROR")
267
- # Revert local state change if upload fails
268
- if wav_filename in state["file_states"]:
269
- del state["file_states"][wav_filename]
270
- return False
271
-
272
- def unlock_file_as_processed(wav_filename: str, state: Dict[str, Any], next_index: int) -> bool:
273
- """Marks a file as 'processed', updates the index, and uploads the state via API."""
274
- log_message(f"🔓 Attempting to unlock file: {wav_filename} (Marking as 'processed')", "INFO")
275
-
276
- state["file_states"][wav_filename] = "processed"
277
- state["next_download_index"] = next_index
278
-
279
- if upload_state_to_api(state):
280
- log_message(f"✅ Successfully unlocked and marked as processed: {wav_filename} via API state upload", "INFO")
281
- return True
282
- else:
283
- log_message(f"❌ Failed to upload final state for file: {wav_filename}.", "ERROR")
284
- return False
285
-
286
- # --- END NEW API FUNCTIONS ---
287
-
288
- def download_with_retry(url: str, dest_path: str, max_retries: int = 3) -> bool:
289
- """Download file with retry logic and disk space checking"""
290
- if not check_disk_space():
291
- cleanup_temp_files()
292
- if not check_disk_space():
293
- log_message("❌ Insufficient disk space even after cleanup", "ERROR")
294
- return False
295
-
296
- try:
297
- os.makedirs(os.path.dirname(dest_path), exist_ok=True)
298
- except Exception as e:
299
- log_message(f"❌ Failed to create directory for download path {os.path.dirname(dest_path)}: {str(e)}", "ERROR")
300
- return False
301
-
302
- # The original code used HF_TOKEN for authorization headers, which is only needed
303
- # if the source repo is private. We keep it for compatibility.
304
- headers = {"Authorization": f"Bearer {HF_TOKEN}"}
305
- for attempt in range(max_retries):
306
- try:
307
- with requests.get(url, headers=headers, stream=True) as r:
308
- r.raise_for_status()
309
-
310
- with open(dest_path, "wb") as f:
311
- for chunk in r.iter_content(chunk_size=8192):
312
- if chunk:
313
- f.write(chunk)
314
-
315
- log_message(f"✅ Download successful: {os.path.basename(dest_path)}", "INFO")
316
- return True
317
- except requests.exceptions.RequestException as e:
318
- log_message(f"⚠️ Download attempt {attempt + 1}/{max_retries} failed for {url}: {str(e)}", "WARNING")
319
- if attempt < max_retries - 1:
320
- time.sleep(2 ** attempt) # Exponential backoff
321
- else:
322
- log_message(f"❌ Download failed after {max_retries} attempts for {url}", "ERROR")
323
- return False
324
- except Exception as e:
325
- log_message(f"❌ An unexpected error occurred during download: {str(e)}", "ERROR")
326
- return False
327
- return False
328
-
329
- def get_reference_map(reference_repo_id: str) -> Dict[str, str]:
330
- """
331
- Downloads the reference file list from the Hugging Face repo and creates a map
332
- from audio filename (without extension) to the reference filename.
333
- """
334
- log_message(f"Fetching reference file list from {reference_repo_id}...", "INFO")
335
-
336
- # This is a placeholder for the actual logic to get the file list.
337
- # Assuming the reference repo contains a list of files that match the audio files.
338
- # In a real scenario, this would involve listing files in the repo.
339
- # For now, we'll assume a simple list of files can be retrieved.
340
-
341
- try:
342
- # Use HfApi to list files in the reference repo
343
- repo_files = hf_api.list_repo_files(repo_id=reference_repo_id, repo_type="dataset")
344
-
345
- reference_map = {}
346
- for file in repo_files:
347
- # Assuming the reference files are named like 'audio_file_name.txt'
348
- # and we want to map the audio file name (e.g., 'audio_file_name.wav') to it.
349
- base_name, ext = os.path.splitext(file)
350
- if ext.lower() in ['.txt', '.json']: # Only consider text/json files as reference
351
- # The key is the audio file name without extension
352
- reference_map[base_name] = file
353
-
354
- log_message(f"✅ Successfully created reference map with {len(reference_map)} entries.", "INFO")
355
- return reference_map
356
-
357
- except Exception as e:
358
- log_message(f"❌ Failed to fetch reference map from Hugging Face: {str(e)}", "ERROR")
359
- return {}
360
-
361
- def find_matching_filename(audio_filename: str, reference_map: Dict[str, str]) -> Optional[str]:
362
- """Finds the matching reference filename for a given audio filename."""
363
- base_name, _ = os.path.splitext(audio_filename)
364
- return reference_map.get(base_name)
365
-
366
- def get_next_file_to_process(source_repo_id: str, state: Dict[str, Any]) -> Optional[Dict[str, Any]]:
367
- """
368
- Determines the next file to process based on the current state and the file list
369
- from the source Hugging Face repository.
370
- """
371
- log_message(f"Determining next file to process from {source_repo_id}...", "INFO")
372
-
373
- try:
374
- # 1. Get the list of all files in the source repo
375
- repo_files = hf_api.list_repo_files(repo_id=source_repo_id, repo_type="dataset")
376
-
377
- # Filter for audio files (e.g., .wav, .mp3)
378
- audio_files = sorted([f for f in repo_files if f.lower().endswith(('.wav', '.mp3'))])
379
-
380
- processing_status["total_files"] = len(audio_files)
381
-
382
- if not audio_files:
383
- log_message("No audio files found in the source repository.", "INFO")
384
- return None
385
-
386
- # 2. Get the next index from the state
387
- next_index = state.get("next_download_index", 0)
388
- file_states = state.get("file_states", {})
389
-
390
- # 3. Skip forward past all processed and processing files starting from next_index
391
- # This ensures we don't repeatedly find files that have already been handled
392
- current_index = next_index
393
- while current_index < len(audio_files):
394
- filename = audio_files[current_index]
395
- status = file_states.get(filename, "unprocessed")
396
-
397
- # If this file is processed or currently processing, skip it
398
- if status in ["processed", "processing"]:
399
- current_index += 1
400
- continue
401
-
402
- # If this file failed, we can retry it, so return it
403
- if status == "failed":
404
- file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
405
- log_message(f"Found failed file for retry at index {current_index}: {filename}", "INFO")
406
- return {
407
- "filename": filename,
408
- "url": file_url,
409
- "index": current_index
410
- }
411
-
412
- # If this file is unprocessed, we found our next file
413
- file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
414
- log_message(f"Found next file at index {current_index}: {filename}", "INFO")
415
- return {
416
- "filename": filename,
417
- "url": file_url,
418
- "index": current_index
419
- }
420
-
421
- log_message("All files have been processed or are locked. Checking for any failed files from the start.", "INFO")
422
-
423
- # 4. If we've processed all files from next_index to end, check from beginning for failed files
424
- for i in range(0, next_index):
425
- filename = audio_files[i]
426
- status = file_states.get(filename, "unprocessed")
427
-
428
- if status == "failed":
429
- file_url = hf_hub_url(repo_id=source_repo_id, filename=filename, repo_type="dataset")
430
- log_message(f"Found failed file for retry at index {i}: {filename}", "INFO")
431
- return {
432
- "filename": filename,
433
- "url": file_url,
434
- "index": i
435
- }
436
-
437
- log_message("All files have been processed. Waiting for new files...", "INFO")
438
- return None
439
-
440
- except Exception as e:
441
- log_message(f"❌ Failed to get next file to process: {str(e)}", "ERROR")
442
- return None
443
-
444
- def run_whisper_transcription(audio_path: str, output_dir: str, model: str) -> Optional[str]:
445
- """
446
- Runs Whisper transcription using the transformers library.
447
- Returns the path to the generated JSON file on success.
448
- No ffmpeg dependency required.
449
- """
450
- log_message(f"🎙️ Starting transcription for {os.path.basename(audio_path)} with model {model}...", "INFO")
451
-
452
- try:
453
- # Get the Whisper pipeline
454
- pipe = get_whisper_pipeline()
455
-
456
- # Load audio using librosa
457
- log_message(f"Loading audio file: {audio_path}", "INFO")
458
- audio_data, sample_rate = librosa.load(audio_path, sr=16000)
459
-
460
- # Run transcription
461
- log_message(f"Running transcription...", "INFO")
462
- result = pipe(
463
- audio_data,
464
- chunk_length_s=30,
465
- batch_size=8,
466
- return_timestamps=True
467
- )
468
-
469
- # Extract text and chunks
470
- transcription_text = result.get("text", "")
471
- chunks = result.get("chunks", [])
472
-
473
- log_message(f"✅ Transcription successful: {len(transcription_text)} characters", "INFO")
474
-
475
- # Prepare output JSON structure
476
- output_json = {
477
- "text": transcription_text,
478
- "chunks": chunks,
479
- "language": result.get("language", "en")
480
- }
481
-
482
- # Save to JSON file
483
- base_name, _ = os.path.splitext(os.path.basename(audio_path))
484
- json_output_path = os.path.join(output_dir, f"{base_name}.json")
485
-
486
- with open(json_output_path, "w", encoding="utf-8") as f:
487
- json.dump(output_json, f, indent=2, ensure_ascii=False)
488
-
489
- log_message(f"✅ Saved transcription to: {json_output_path}", "INFO")
490
- return json_output_path
491
-
492
- except Exception as e:
493
- log_message(f"❌ An error occurred during transcription: {str(e)}", "ERROR")
494
- import traceback
495
- log_message(f"Traceback: {traceback.format_exc()}", "ERROR")
496
- return None
497
-
498
- def process_audio_file(audio_path: str, reference_map: Dict[str, str], output_filename: str) -> bool:
499
- """
500
- Transcribes the audio file, renames the output JSON to match the reference,
501
- and uploads the result to the API.
502
- """
503
-
504
- # 1. Run transcription
505
- json_output_path = run_whisper_transcription(audio_path, TRANSCRIPTIONS_FOLDER, WHISPER_MODEL)
506
-
507
- if not json_output_path:
508
- return False
509
-
510
- # 2. Rename the JSON file to the matched filename
511
- # The output_filename already includes the correct extension (e.g., .txt or .json)
512
- # We assume the reference map provides the full target filename.
513
-
514
- # The whisper output is a JSON file named after the audio file.
515
- # We need to rename it to the target filename (which should be a JSON file for the backend).
516
-
517
- # The output_filename is the matched filename from the reference map (e.g., 'audio_file_name.txt')
518
- # The backend expects a JSON file. Let's assume the matched filename should be used as the base
519
- # but with a .json extension for the upload.
520
-
521
- # Let's stick to the original logic: the backend expects a JSON file with the name
522
- # of the audio file (or the matched reference file) with a .json extension.
523
-
524
- # Since the whisper output is already a JSON file, we just need to rename it
525
- # to the desired final name.
526
-
527
- # The output_filename passed here is the base name of the audio file or the matched reference file.
528
- # If it's a reference file name (e.g., 'file.txt'), we should probably use 'file.json'.
529
-
530
- # For simplicity and to match the backend's expectation (which handles JSON),
531
- # we will rename the whisper output JSON to the base name of the audio file
532
- # and ensure it has a .json extension.
533
-
534
- base_name, _ = os.path.splitext(output_filename)
535
- final_json_filename = f"{base_name}.json"
536
- final_json_path = os.path.join(TRANSCRIPTIONS_FOLDER, final_json_filename)
537
-
538
- try:
539
- if json_output_path != final_json_path:
540
- shutil.move(json_output_path, final_json_path)
541
- log_message(f"✅ Renamed transcription to: {final_json_filename}", "INFO")
542
- except Exception as e:
543
- log_message(f"❌ Failed to rename transcription file: {str(e)}", "ERROR")
544
- return False
545
-
546
- # 3. Upload transcription to API
547
- if upload_transcription_to_api(final_json_path, final_json_filename):
548
- processing_status["transcribed_files"] += 1
549
- # Clean up the local transcription file after successful upload
550
- try:
551
- os.remove(final_json_path)
552
- log_message(f"🗑️ Cleaned up local transcription file: {final_json_path}", "INFO")
553
- except Exception as e:
554
- log_message(f"❌ Failed to clean up transcription file: {str(e)}", "ERROR")
555
- return True
556
- else:
557
- log_message(f"❌ Failed to upload transcription to API: {final_json_filename}", "ERROR")
558
- return False
559
-
560
- def main_processing_loop():
561
- """The main loop that continuously checks for and processes new audio files."""
562
- global processing_status
563
-
564
- if processing_status["is_running"]:
565
- log_message("Processing loop is already running.", "WARNING")
566
- return
567
-
568
- processing_status["is_running"] = True
569
- log_message("🚀 Audio transcription processing loop started.", "INFO")
570
-
571
- # 1. Get the reference map once
572
- reference_map = get_reference_map(REFERENCE_REPO_ID)
573
- if not reference_map:
574
- log_message("❌ Could not get reference map. Stopping loop.", "CRITICAL")
575
- processing_status["is_running"] = False
576
- return
577
-
578
- try:
579
- while processing_status["is_running"]:
580
- time.sleep(PROCESSING_DELAY)
581
-
582
- # 1. Download FRESH state from the API at the start of each iteration
583
- # This ensures we respect the next_download_index that other workers may have set
584
- current_state = download_state_from_api()
585
- next_file_info = get_next_file_to_process(SOURCE_REPO_ID, current_state)
586
-
587
- if next_file_info is None:
588
- log_message("💤 No new audio files to process. Sleeping for a while...", "INFO")
589
- time.sleep(PROCESSING_DELAY * 5)
590
- continue
591
-
592
- target_file = next_file_info['filename']
593
- audio_url = next_file_info['url']
594
- target_index = next_file_info['index']
595
-
596
- processing_status["current_file"] = target_file
597
- success = False
598
- matched_filename = None
599
-
600
- try:
601
- # 2. Lock file by updating state on the API
602
- # IMPORTANT: Update next_download_index when locking to prevent other workers from picking same file
603
- old_index = current_state["next_download_index"]
604
- current_state["next_download_index"] = target_index + 1
605
- log_message(f"📍 Incrementing next_download_index from {old_index} to {current_state['next_download_index']}", "INFO")
606
-
607
- if not lock_file_for_processing(target_file, current_state):
608
- log_message(f"❌ Failed to lock file {target_file}. Skipping.", "ERROR")
609
- time.sleep(PROCESSING_DELAY)
610
- continue
611
-
612
- local_wav_path = os.path.join(DOWNLOAD_FOLDER, os.path.basename(target_file))
613
- log_message(f"⬇️ Downloading audio file: {target_file}", "INFO")
614
-
615
- if download_with_retry(audio_url, local_wav_path):
616
-
617
- # Extract base filename for matching
618
- base_filename = os.path.basename(target_file)
619
- matched_filename = find_matching_filename(base_filename, reference_map)
620
-
621
- # Use matched filename if found, otherwise use original filename
622
- output_filename = matched_filename if matched_filename else base_filename
623
-
624
- # 3. Process and Upload transcription to API
625
- if process_audio_file(local_wav_path, reference_map, output_filename):
626
- success = True
627
- log_message(f"✅ Finished processing: {target_file}", "INFO")
628
- else:
629
- log_message(f"❌ Processing failed for: {target_file}", "ERROR")
630
- else:
631
- log_message(f"❌ Download failed for: {target_file}", "ERROR")
632
-
633
- except Exception as e:
634
- log_message(f"🔥 An unhandled error occurred while processing {target_file}: {str(e)}", "ERROR")
635
- log_failed_file(target_file, str(e))
636
-
637
- finally:
638
- # 4. Unlock/Mark as processed by updating state on the API
639
- # IMPORTANT: Keep the incremented next_download_index from locking
640
-
641
- if success:
642
- # Mark as processed and keep the incremented index, then upload state
643
- unlock_file_as_processed(target_file, current_state, current_state["next_download_index"])
644
- processing_status["processed_files"] += 1
645
- else:
646
- # Mark as failed but keep the incremented index so next worker can proceed
647
- log_message(f"⚠️ File {target_file} failed. Marking as 'failed' and updating state.", "WARNING")
648
- current_state["file_states"][target_file] = "failed"
649
- # Keep the incremented next_download_index - don't change it
650
- upload_state_to_api(current_state)
651
-
652
- # Clean up the downloaded audio file regardless of success
653
- try:
654
- if os.path.exists(local_wav_path):
655
- os.remove(local_wav_path)
656
- log_message(f"🗑️ Cleaned up local audio file: {local_wav_path}", "INFO")
657
- except Exception as e:
658
- log_message(f"❌ Failed to clean up audio file: {str(e)}", "ERROR")
659
-
660
- processing_status["current_file"] = None
661
- time.sleep(PROCESSING_DELAY)
662
-
663
- except Exception as e:
664
- log_message(f"🔥 Critical error in main processing loop: {str(e)}", "CRITICAL")
665
-
666
- finally:
667
- processing_status["is_running"] = False
668
- log_message("🛑 Audio transcription processing loop stopped.", "INFO")
669
-
670
- # --- FastAPI Endpoints (Unchanged) ---
671
-
672
- # Add to configuration section
673
- AUTO_START_PROCESSING = os.environ.get("AUTO_START_PROCESSING", "true").lower() == "true"
674
-
675
- @app.on_event("startup")
676
- async def startup_event():
677
- """Conditionally start processing based on environment variable."""
678
- if AUTO_START_PROCESSING:
679
- log_message("🚀 AUTO_START_PROCESSING enabled - Starting processing loop...", "INFO")
680
- thread = threading.Thread(target=main_processing_loop, daemon=True)
681
- thread.start()
682
- log_message("✅ Background processing thread started", "INFO")
683
- else:
684
- log_message("⏸️ AUTO_START_PROCESSING disabled - Use /start endpoint to begin", "INFO")
685
-
686
- @app.get("/")
687
- async def root():
688
- """Root endpoint to check service status."""
689
- return {"message": "Audio Transcriber Service is running", "status": processing_status}
690
-
691
- @app.get("/status")
692
- async def get_status():
693
- """Get the current processing status."""
694
- return processing_status
695
-
696
- @app.post("/start")
697
- async def start_processing():
698
- """Start the background processing loop."""
699
- if processing_status["is_running"]:
700
- return JSONResponse(status_code=200, content={"message": "Processing already running."})
701
-
702
- thread = threading.Thread(target=main_processing_loop)
703
- thread.start()
704
- return JSONResponse(status_code=200, content={"message": "Processing started in background."})
705
-
706
- @app.post("/stop")
707
- async def stop_processing():
708
- """Stop the background processing loop."""
709
- if not processing_status["is_running"]:
710
- return JSONResponse(status_code=200, content={"message": "Processing is not running."})
711
-
712
- processing_status["is_running"] = False
713
- return JSONResponse(status_code=200, content={"message": "Processing stop requested. Will stop after current file."})
714
-
715
- # --- Main Execution ---
716
-
717
- if __name__ == "__main__":
718
- # This block is for local testing and won't be used in the final sandbox execution
719
- # but is good practice for a runnable script.
720
- uvicorn.run(app, host="0.0.0.0", port=8000)
 
1
+ import os
2
+ import json
3
+ import torch
4
+ import librosa
5
+ import numpy as np
6
+ from pathlib import Path
7
+ from typing import Dict, Optional, List
8
+ from datetime import datetime
9
+
10
+ from fastapi import FastAPI, File, UploadFile, HTTPException
11
+ from pydantic import BaseModel
12
+ from transformers import pipeline
13
+
14
+ # --- Configuration ---
15
+ WHISPER_MODEL = os.getenv("WHISPER_MODEL", "small") # small, medium, large
16
+ WHISPER_PORT = int(os.getenv("WHISPER_PORT", 8000))
17
+ DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
18
+ TORCH_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32
19
+
20
+ # Global model cache
21
+ _whisper_pipeline = None
22
+ _model_info = {
23
+ "model_name": WHISPER_MODEL,
24
+ "device": DEVICE,
25
+ "dtype": str(TORCH_DTYPE),
26
+ "cuda_available": torch.cuda.is_available()
27
+ }
28
+
29
+ # --- Models ---
30
+ class TranscriptionResponse(BaseModel):
31
+ text: str
32
+ language: str = "en"
33
+ confidence: Optional[float] = None
34
+ duration: float = 0.0
35
+ timestamp: str = ""
36
+
37
+ # --- Utility Functions ---
38
+ def get_whisper_pipeline():
39
+ """Get or initialize the Whisper pipeline (cached)."""
40
+ global _whisper_pipeline
41
+ if _whisper_pipeline is not None:
42
+ return _whisper_pipeline
43
+
44
+ print(f"🔄 Loading Whisper model: {WHISPER_MODEL} on {DEVICE} with dtype {TORCH_DTYPE}")
45
+
46
+ _whisper_pipeline = pipeline(
47
+ "automatic-speech-recognition",
48
+ model=f"openai/whisper-{WHISPER_MODEL}",
49
+ device=DEVICE,
50
+ torch_dtype=TORCH_DTYPE
51
+ )
52
+
53
+ print(f"✅ Whisper model loaded successfully")
54
+ return _whisper_pipeline
55
+
56
+ def load_and_resample_audio(audio_path: str, target_sr: int = 16000) -> tuple:
57
+ """Load audio file and resample to 16kHz (required by Whisper)."""
58
+ try:
59
+ # Load audio file with librosa (no ffmpeg needed)
60
+ audio, sr = librosa.load(audio_path, sr=target_sr, mono=True)
61
+ duration = librosa.get_duration(y=audio, sr=sr)
62
+ print(f"📁 Loaded audio: {Path(audio_path).name} | Duration: {duration:.2f}s | SR: {sr}Hz")
63
+ return audio, sr, duration
64
+ except Exception as e:
65
+ print(f"❌ Error loading audio: {e}")
66
+ raise
67
+
68
+ async def transcribe_audio(audio_path: str) -> Dict:
69
+ """Transcribe audio file using Whisper."""
70
+ try:
71
+ # Load audio
72
+ audio, sr, duration = load_and_resample_audio(audio_path)
73
+
74
+ # Get pipeline
75
+ pipeline_model = get_whisper_pipeline()
76
+
77
+ print(f"🎤 Transcribing {Path(audio_path).name}...")
78
+
79
+ # Transcribe
80
+ result = pipeline_model(
81
+ audio,
82
+ chunk_length_s=30,
83
+ stride_length_s=(4, 2),
84
+ language="en", # Specify language or remove for auto-detection
85
+ batch_size=24 if torch.cuda.is_available() else 4
86
+ )
87
+
88
+ print(f"✅ Transcription complete")
89
+
90
+ return {
91
+ "text": result.get("text", "").strip(),
92
+ "language": "en", # Whisper doesn't return language detection reliably
93
+ "confidence": None, # Whisper doesn't provide per-segment confidence
94
+ "duration": duration,
95
+ "timestamp": datetime.now().isoformat()
96
+ }
97
+
98
+ except Exception as e:
99
+ print(f"❌ Transcription error: {e}")
100
+ raise
101
+
102
+ # --- FastAPI App ---
103
+ app = FastAPI(
104
+ title="Whisper Transcription Server",
105
+ description="FastAPI server for audio transcription using OpenAI Whisper",
106
+ version="1.0.0"
107
+ )
108
+
109
+ @app.on_event("startup")
110
+ async def startup():
111
+ print(f"🚀 Whisper Server starting on port {WHISPER_PORT}")
112
+ print(f"📊 Configuration:")
113
+ print(f" - Model: {WHISPER_MODEL}")
114
+ print(f" - Device: {DEVICE}")
115
+ print(f" - CUDA Available: {torch.cuda.is_available()}")
116
+ print(f" - Torch Dtype: {TORCH_DTYPE}")
117
+
118
+ # Pre-load model
119
+ get_whisper_pipeline()
120
+
121
+ @app.get("/health")
122
+ async def health_check():
123
+ """Check server health and model status."""
124
+ return {
125
+ "status": "healthy",
126
+ "model_info": _model_info,
127
+ "cuda_available": torch.cuda.is_available(),
128
+ "device": DEVICE
129
+ }
130
+
131
+ @app.get("/")
132
+ async def root():
133
+ """Root endpoint with server info."""
134
+ return {
135
+ "server": "Whisper Transcription Backend",
136
+ "model": WHISPER_MODEL,
137
+ "device": DEVICE,
138
+ "endpoints": {
139
+ "/health": "Server health check",
140
+ "/transcribe": "POST - Transcribe audio file",
141
+ "/transcribe_file": "POST - Alternative transcribe endpoint"
142
+ }
143
+ }
144
+
145
+ @app.post("/transcribe")
146
+ async def transcribe(file: UploadFile = File(...)):
147
+ """
148
+ Transcribe an uploaded audio file.
149
+ Accepts: mp3, wav, m4a, flac, ogg, aac
150
+ """
151
+ if not file.filename:
152
+ raise HTTPException(status_code=400, detail="No file provided")
153
+
154
+ # Check file extension
155
+ allowed_extensions = {'.mp3', '.wav', '.m4a', '.flac', '.ogg', '.aac'}
156
+ file_ext = Path(file.filename).suffix.lower()
157
+
158
+ if file_ext not in allowed_extensions:
159
+ raise HTTPException(
160
+ status_code=400,
161
+ detail=f"Unsupported file format: {file_ext}. Allowed: {allowed_extensions}"
162
+ )
163
+
164
+ temp_file = None
165
+ try:
166
+ # Save uploaded file temporarily
167
+ temp_path = Path(f"temp_{file.filename}")
168
+ with open(temp_path, 'wb') as f:
169
+ content = await file.read()
170
+ f.write(content)
171
+
172
+ temp_file = temp_path
173
+
174
+ print(f"📤 Processing uploaded file: {file.filename} ({len(content)} bytes)")
175
+
176
+ # Transcribe
177
+ result = await transcribe_audio(str(temp_path))
178
+
179
+ return {
180
+ "audio_file": file.filename,
181
+ "text": result["text"],
182
+ "language": result["language"],
183
+ "duration": result["duration"],
184
+ "timestamp": result["timestamp"]
185
+ }
186
+
187
+ except Exception as e:
188
+ print(f"❌ Transcription failed: {e}")
189
+ raise HTTPException(status_code=500, detail=str(e))
190
+
191
+ finally:
192
+ # Cleanup
193
+ if temp_file and temp_file.exists():
194
+ temp_file.unlink()
195
+ print(f"🧹 Cleaned up temp file: {temp_file}")
196
+
197
+ @app.post("/transcribe_file")
198
+ async def transcribe_file(file: UploadFile = File(...)):
199
+ """Alternative endpoint name for transcription."""
200
+ return await transcribe(file)
201
+
202
+ @app.post("/transcribe_batch")
203
+ async def transcribe_batch(files: List[UploadFile] = File(...)):
204
+ """
205
+ Transcribe multiple audio files in parallel.
206
+ """
207
+ if not files:
208
+ raise HTTPException(status_code=400, detail="No files provided")
209
+
210
+ results = []
211
+ for file in files:
212
+ try:
213
+ result = await transcribe(file)
214
+ results.append({
215
+ "status": "success",
216
+ "data": result
217
+ })
218
+ except Exception as e:
219
+ results.append({
220
+ "status": "error",
221
+ "filename": file.filename,
222
+ "error": str(e)
223
+ })
224
+
225
+ return {
226
+ "total": len(files),
227
+ "results": results
228
+ }
229
+
230
+ if __name__ == "__main__":
231
+ import uvicorn
232
+ uvicorn.run(app, host="0.0.0.0", port=WHISPER_PORT)