""" Stage 4 — Transcription Service PRIMARY: HuggingFace Inference API (free GPU — whisper-large-v3) FALLBACK: Local faster-whisper (CPU) For long videos: splits audio into 30s chunks and sends to HF API. This gives us GPU-quality transcription for FREE. """ import logging import subprocess import json import time import requests from pathlib import Path from typing import List, Dict, Optional from config import HF_API_URL, HF_TOKEN, HF_CHUNK_DURATION_SEC, WHISPER_MODEL_SIZE logger = logging.getLogger(__name__) def transcribe_audio( audio_path: Path, output_dir: Path, source_language: Optional[str] = None, device: str = "cpu", progress_callback=None, ) -> List[Dict]: """Transcribe audio. Chain: HF API (GPU) → local faster-whisper → error.""" # Try local faster-whisper FIRST (more reliable on Spaces) try: import faster_whisper logger.info("Using local faster-whisper (tiny model, CPU)...") return _transcribe_local(audio_path, output_dir, source_language, progress_callback) except ImportError: logger.info("faster-whisper not available, trying HF API...") except Exception as e: logger.warning(f"Local transcription failed: {e}. Trying HF API...") # Fallback: HuggingFace Inference API (free GPU) try: logger.info("Attempting HuggingFace API transcription (whisper-small)...") segments = _transcribe_hf_api(audio_path, output_dir, source_language, progress_callback) if segments and len(segments) > 0: logger.info(f"HF API transcription success: {len(segments)} segments") return segments except Exception as e: logger.error(f"HF API also failed: {e}") raise RuntimeError( "Transcription failed with all methods. " "The audio file may be too large or the service is overloaded. Try again." ) def _transcribe_hf_api( audio_path: Path, output_dir: Path, source_language: Optional[str], progress_callback=None, ) -> List[Dict]: """ Transcribe using HuggingFace Inference API with GPU. Splits long audio into chunks, sends each to the API. """ # Get audio duration duration = _get_duration(audio_path) logger.info(f"Audio duration: {duration:.1f}s ({duration/60:.1f} min)") # Split into chunks chunk_dir = output_dir / "audio_chunks" chunk_dir.mkdir(exist_ok=True) chunk_duration = HF_CHUNK_DURATION_SEC chunks = _split_audio(audio_path, chunk_dir, chunk_duration) logger.info(f"Split into {len(chunks)} chunks ({chunk_duration}s each)") headers = {} if HF_TOKEN: headers["Authorization"] = f"Bearer {HF_TOKEN}" all_segments = [] time_offset = 0.0 for idx, chunk_path in enumerate(chunks): if progress_callback: progress_callback(int((idx / len(chunks)) * 100)) # Read chunk bytes with open(chunk_path, "rb") as f: audio_bytes = f.read() # Send to HF API retries = 3 for attempt in range(retries): try: resp = requests.post( HF_API_URL, headers=headers, data=audio_bytes, timeout=120, ) if resp.status_code == 503: # Model is loading wait_time = resp.json().get("estimated_time", 30) logger.info(f"Model loading, waiting {wait_time:.0f}s...") time.sleep(min(wait_time, 60)) continue if resp.status_code == 429: # Rate limited logger.info("Rate limited, waiting 10s...") time.sleep(10) continue resp.raise_for_status() result = resp.json() # Extract text and create segment text = result.get("text", "").strip() if text: chunk_start = idx * chunk_duration all_segments.append({ "start": round(chunk_start, 3), "end": round(chunk_start + chunk_duration, 3), "text": text, "speaker": "SPEAKER_00", "words": [], }) # Handle chunked results if API returns them if "chunks" in result: for chunk_seg in result["chunks"]: ts = chunk_seg.get("timestamp", [0, chunk_duration]) all_segments.append({ "start": round((ts[0] or 0) + idx * chunk_duration, 3), "end": round((ts[1] or chunk_duration) + idx * chunk_duration, 3), "text": chunk_seg.get("text", "").strip(), "speaker": "SPEAKER_00", "words": [], }) # Remove the full-chunk segment we added above if text and "chunks" in result: all_segments = [s for s in all_segments if not ( s["start"] == round(idx * chunk_duration, 3) and s["text"] == text )] break # Success except requests.exceptions.Timeout: logger.warning(f"Chunk {idx} timed out (attempt {attempt+1})") time.sleep(5) except Exception as e: logger.warning(f"Chunk {idx} error: {e} (attempt {attempt+1})") time.sleep(5) if idx % 10 == 0: logger.info(f"Transcribed chunk {idx+1}/{len(chunks)}") if not all_segments: raise RuntimeError("HF API returned no transcription results") # Detect language from first few segments detected_lang = source_language or "hi" # Save transcript transcript_path = output_dir / "transcript.json" with open(transcript_path, "w", encoding="utf-8") as f: json.dump({ "language": detected_lang, "segments": all_segments, "total_segments": len(all_segments), "method": "huggingface_gpu_api", }, f, ensure_ascii=False, indent=2) if progress_callback: progress_callback(100) return all_segments def _transcribe_local( audio_path: Path, output_dir: Path, source_language: Optional[str], progress_callback=None, ) -> List[Dict]: """Fallback: local faster-whisper on CPU.""" try: from faster_whisper import WhisperModel except ImportError: raise RuntimeError("faster-whisper not installed. Run: pip install faster-whisper") model = WhisperModel(WHISPER_MODEL_SIZE, device="cpu", compute_type="int8") raw_segments, info = model.transcribe( str(audio_path), language=source_language, beam_size=5, vad_filter=True, ) segments = [] for seg in raw_segments: segments.append({ "start": round(seg.start, 3), "end": round(seg.end, 3), "text": seg.text.strip(), "speaker": "SPEAKER_00", "words": [], }) transcript_path = output_dir / "transcript.json" with open(transcript_path, "w", encoding="utf-8") as f: json.dump({ "language": info.language, "segments": segments, "total_segments": len(segments), "method": "local_faster_whisper", }, f, ensure_ascii=False, indent=2) if progress_callback: progress_callback(100) return segments def _get_duration(audio_path: Path) -> float: cmd = ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", str(audio_path)] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) return float(result.stdout.strip()) def _split_audio(audio_path: Path, output_dir: Path, chunk_sec: int) -> List[Path]: """Split audio into fixed-duration chunks.""" duration = _get_duration(audio_path) chunks = [] for start in range(0, int(duration) + 1, chunk_sec): chunk_path = output_dir / f"chunk_{start:06d}.wav" cmd = [ "ffmpeg", "-y", "-i", str(audio_path), "-ss", str(start), "-t", str(chunk_sec), "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", str(chunk_path) ] subprocess.run(cmd, capture_output=True, text=True, timeout=30) if chunk_path.exists() and chunk_path.stat().st_size > 1000: chunks.append(chunk_path) return chunks