Spaces:
Build error
Build error
| """ | |
| Stage 4 — Transcription Service | |
| PRIMARY: HuggingFace Inference API (free GPU — whisper-large-v3) | |
| FALLBACK: Local faster-whisper (CPU) | |
| For long videos: splits audio into 30s chunks and sends to HF API. | |
| This gives us GPU-quality transcription for FREE. | |
| """ | |
| import logging | |
| import subprocess | |
| import json | |
| import time | |
| import requests | |
| from pathlib import Path | |
| from typing import List, Dict, Optional | |
| from config import HF_API_URL, HF_TOKEN, HF_CHUNK_DURATION_SEC, WHISPER_MODEL_SIZE | |
| logger = logging.getLogger(__name__) | |
| def transcribe_audio( | |
| audio_path: Path, | |
| output_dir: Path, | |
| source_language: Optional[str] = None, | |
| device: str = "cpu", | |
| progress_callback=None, | |
| ) -> List[Dict]: | |
| """Transcribe audio. Chain: HF API (GPU) → local faster-whisper → error.""" | |
| # Try local faster-whisper FIRST (more reliable on Spaces) | |
| try: | |
| import faster_whisper | |
| logger.info("Using local faster-whisper (tiny model, CPU)...") | |
| return _transcribe_local(audio_path, output_dir, source_language, progress_callback) | |
| except ImportError: | |
| logger.info("faster-whisper not available, trying HF API...") | |
| except Exception as e: | |
| logger.warning(f"Local transcription failed: {e}. Trying HF API...") | |
| # Fallback: HuggingFace Inference API (free GPU) | |
| try: | |
| logger.info("Attempting HuggingFace API transcription (whisper-small)...") | |
| segments = _transcribe_hf_api(audio_path, output_dir, source_language, progress_callback) | |
| if segments and len(segments) > 0: | |
| logger.info(f"HF API transcription success: {len(segments)} segments") | |
| return segments | |
| except Exception as e: | |
| logger.error(f"HF API also failed: {e}") | |
| raise RuntimeError( | |
| "Transcription failed with all methods. " | |
| "The audio file may be too large or the service is overloaded. Try again." | |
| ) | |
| def _transcribe_hf_api( | |
| audio_path: Path, | |
| output_dir: Path, | |
| source_language: Optional[str], | |
| progress_callback=None, | |
| ) -> List[Dict]: | |
| """ | |
| Transcribe using HuggingFace Inference API with GPU. | |
| Splits long audio into chunks, sends each to the API. | |
| """ | |
| # Get audio duration | |
| duration = _get_duration(audio_path) | |
| logger.info(f"Audio duration: {duration:.1f}s ({duration/60:.1f} min)") | |
| # Split into chunks | |
| chunk_dir = output_dir / "audio_chunks" | |
| chunk_dir.mkdir(exist_ok=True) | |
| chunk_duration = HF_CHUNK_DURATION_SEC | |
| chunks = _split_audio(audio_path, chunk_dir, chunk_duration) | |
| logger.info(f"Split into {len(chunks)} chunks ({chunk_duration}s each)") | |
| headers = {} | |
| if HF_TOKEN: | |
| headers["Authorization"] = f"Bearer {HF_TOKEN}" | |
| all_segments = [] | |
| time_offset = 0.0 | |
| for idx, chunk_path in enumerate(chunks): | |
| if progress_callback: | |
| progress_callback(int((idx / len(chunks)) * 100)) | |
| # Read chunk bytes | |
| with open(chunk_path, "rb") as f: | |
| audio_bytes = f.read() | |
| # Send to HF API | |
| retries = 3 | |
| for attempt in range(retries): | |
| try: | |
| resp = requests.post( | |
| HF_API_URL, | |
| headers=headers, | |
| data=audio_bytes, | |
| timeout=120, | |
| ) | |
| if resp.status_code == 503: | |
| # Model is loading | |
| wait_time = resp.json().get("estimated_time", 30) | |
| logger.info(f"Model loading, waiting {wait_time:.0f}s...") | |
| time.sleep(min(wait_time, 60)) | |
| continue | |
| if resp.status_code == 429: | |
| # Rate limited | |
| logger.info("Rate limited, waiting 10s...") | |
| time.sleep(10) | |
| continue | |
| resp.raise_for_status() | |
| result = resp.json() | |
| # Extract text and create segment | |
| text = result.get("text", "").strip() | |
| if text: | |
| chunk_start = idx * chunk_duration | |
| all_segments.append({ | |
| "start": round(chunk_start, 3), | |
| "end": round(chunk_start + chunk_duration, 3), | |
| "text": text, | |
| "speaker": "SPEAKER_00", | |
| "words": [], | |
| }) | |
| # Handle chunked results if API returns them | |
| if "chunks" in result: | |
| for chunk_seg in result["chunks"]: | |
| ts = chunk_seg.get("timestamp", [0, chunk_duration]) | |
| all_segments.append({ | |
| "start": round((ts[0] or 0) + idx * chunk_duration, 3), | |
| "end": round((ts[1] or chunk_duration) + idx * chunk_duration, 3), | |
| "text": chunk_seg.get("text", "").strip(), | |
| "speaker": "SPEAKER_00", | |
| "words": [], | |
| }) | |
| # Remove the full-chunk segment we added above | |
| if text and "chunks" in result: | |
| all_segments = [s for s in all_segments if not ( | |
| s["start"] == round(idx * chunk_duration, 3) and | |
| s["text"] == text | |
| )] | |
| break # Success | |
| except requests.exceptions.Timeout: | |
| logger.warning(f"Chunk {idx} timed out (attempt {attempt+1})") | |
| time.sleep(5) | |
| except Exception as e: | |
| logger.warning(f"Chunk {idx} error: {e} (attempt {attempt+1})") | |
| time.sleep(5) | |
| if idx % 10 == 0: | |
| logger.info(f"Transcribed chunk {idx+1}/{len(chunks)}") | |
| if not all_segments: | |
| raise RuntimeError("HF API returned no transcription results") | |
| # Detect language from first few segments | |
| detected_lang = source_language or "hi" | |
| # Save transcript | |
| transcript_path = output_dir / "transcript.json" | |
| with open(transcript_path, "w", encoding="utf-8") as f: | |
| json.dump({ | |
| "language": detected_lang, | |
| "segments": all_segments, | |
| "total_segments": len(all_segments), | |
| "method": "huggingface_gpu_api", | |
| }, f, ensure_ascii=False, indent=2) | |
| if progress_callback: | |
| progress_callback(100) | |
| return all_segments | |
| def _transcribe_local( | |
| audio_path: Path, | |
| output_dir: Path, | |
| source_language: Optional[str], | |
| progress_callback=None, | |
| ) -> List[Dict]: | |
| """Fallback: local faster-whisper on CPU.""" | |
| try: | |
| from faster_whisper import WhisperModel | |
| except ImportError: | |
| raise RuntimeError("faster-whisper not installed. Run: pip install faster-whisper") | |
| model = WhisperModel(WHISPER_MODEL_SIZE, device="cpu", compute_type="int8") | |
| raw_segments, info = model.transcribe( | |
| str(audio_path), | |
| language=source_language, | |
| beam_size=5, | |
| vad_filter=True, | |
| ) | |
| segments = [] | |
| for seg in raw_segments: | |
| segments.append({ | |
| "start": round(seg.start, 3), | |
| "end": round(seg.end, 3), | |
| "text": seg.text.strip(), | |
| "speaker": "SPEAKER_00", | |
| "words": [], | |
| }) | |
| transcript_path = output_dir / "transcript.json" | |
| with open(transcript_path, "w", encoding="utf-8") as f: | |
| json.dump({ | |
| "language": info.language, | |
| "segments": segments, | |
| "total_segments": len(segments), | |
| "method": "local_faster_whisper", | |
| }, f, ensure_ascii=False, indent=2) | |
| if progress_callback: | |
| progress_callback(100) | |
| return segments | |
| def _get_duration(audio_path: Path) -> float: | |
| cmd = ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", str(audio_path)] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
| return float(result.stdout.strip()) | |
| def _split_audio(audio_path: Path, output_dir: Path, chunk_sec: int) -> List[Path]: | |
| """Split audio into fixed-duration chunks.""" | |
| duration = _get_duration(audio_path) | |
| chunks = [] | |
| for start in range(0, int(duration) + 1, chunk_sec): | |
| chunk_path = output_dir / f"chunk_{start:06d}.wav" | |
| cmd = [ | |
| "ffmpeg", "-y", "-i", str(audio_path), | |
| "-ss", str(start), "-t", str(chunk_sec), | |
| "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", | |
| str(chunk_path) | |
| ] | |
| subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
| if chunk_path.exists() and chunk_path.stat().st_size > 1000: | |
| chunks.append(chunk_path) | |
| return chunks | |