video-dubbing-agent / services /transcriber.py
dashhdata's picture
Upload folder using huggingface_hub
ea2dffa verified
"""
Stage 4 — Transcription Service
PRIMARY: HuggingFace Inference API (free GPU — whisper-large-v3)
FALLBACK: Local faster-whisper (CPU)
For long videos: splits audio into 30s chunks and sends to HF API.
This gives us GPU-quality transcription for FREE.
"""
import logging
import subprocess
import json
import time
import requests
from pathlib import Path
from typing import List, Dict, Optional
from config import HF_API_URL, HF_TOKEN, HF_CHUNK_DURATION_SEC, WHISPER_MODEL_SIZE
logger = logging.getLogger(__name__)
def transcribe_audio(
audio_path: Path,
output_dir: Path,
source_language: Optional[str] = None,
device: str = "cpu",
progress_callback=None,
) -> List[Dict]:
"""Transcribe audio. Chain: HF API (GPU) → local faster-whisper → error."""
# Try local faster-whisper FIRST (more reliable on Spaces)
try:
import faster_whisper
logger.info("Using local faster-whisper (tiny model, CPU)...")
return _transcribe_local(audio_path, output_dir, source_language, progress_callback)
except ImportError:
logger.info("faster-whisper not available, trying HF API...")
except Exception as e:
logger.warning(f"Local transcription failed: {e}. Trying HF API...")
# Fallback: HuggingFace Inference API (free GPU)
try:
logger.info("Attempting HuggingFace API transcription (whisper-small)...")
segments = _transcribe_hf_api(audio_path, output_dir, source_language, progress_callback)
if segments and len(segments) > 0:
logger.info(f"HF API transcription success: {len(segments)} segments")
return segments
except Exception as e:
logger.error(f"HF API also failed: {e}")
raise RuntimeError(
"Transcription failed with all methods. "
"The audio file may be too large or the service is overloaded. Try again."
)
def _transcribe_hf_api(
audio_path: Path,
output_dir: Path,
source_language: Optional[str],
progress_callback=None,
) -> List[Dict]:
"""
Transcribe using HuggingFace Inference API with GPU.
Splits long audio into chunks, sends each to the API.
"""
# Get audio duration
duration = _get_duration(audio_path)
logger.info(f"Audio duration: {duration:.1f}s ({duration/60:.1f} min)")
# Split into chunks
chunk_dir = output_dir / "audio_chunks"
chunk_dir.mkdir(exist_ok=True)
chunk_duration = HF_CHUNK_DURATION_SEC
chunks = _split_audio(audio_path, chunk_dir, chunk_duration)
logger.info(f"Split into {len(chunks)} chunks ({chunk_duration}s each)")
headers = {}
if HF_TOKEN:
headers["Authorization"] = f"Bearer {HF_TOKEN}"
all_segments = []
time_offset = 0.0
for idx, chunk_path in enumerate(chunks):
if progress_callback:
progress_callback(int((idx / len(chunks)) * 100))
# Read chunk bytes
with open(chunk_path, "rb") as f:
audio_bytes = f.read()
# Send to HF API
retries = 3
for attempt in range(retries):
try:
resp = requests.post(
HF_API_URL,
headers=headers,
data=audio_bytes,
timeout=120,
)
if resp.status_code == 503:
# Model is loading
wait_time = resp.json().get("estimated_time", 30)
logger.info(f"Model loading, waiting {wait_time:.0f}s...")
time.sleep(min(wait_time, 60))
continue
if resp.status_code == 429:
# Rate limited
logger.info("Rate limited, waiting 10s...")
time.sleep(10)
continue
resp.raise_for_status()
result = resp.json()
# Extract text and create segment
text = result.get("text", "").strip()
if text:
chunk_start = idx * chunk_duration
all_segments.append({
"start": round(chunk_start, 3),
"end": round(chunk_start + chunk_duration, 3),
"text": text,
"speaker": "SPEAKER_00",
"words": [],
})
# Handle chunked results if API returns them
if "chunks" in result:
for chunk_seg in result["chunks"]:
ts = chunk_seg.get("timestamp", [0, chunk_duration])
all_segments.append({
"start": round((ts[0] or 0) + idx * chunk_duration, 3),
"end": round((ts[1] or chunk_duration) + idx * chunk_duration, 3),
"text": chunk_seg.get("text", "").strip(),
"speaker": "SPEAKER_00",
"words": [],
})
# Remove the full-chunk segment we added above
if text and "chunks" in result:
all_segments = [s for s in all_segments if not (
s["start"] == round(idx * chunk_duration, 3) and
s["text"] == text
)]
break # Success
except requests.exceptions.Timeout:
logger.warning(f"Chunk {idx} timed out (attempt {attempt+1})")
time.sleep(5)
except Exception as e:
logger.warning(f"Chunk {idx} error: {e} (attempt {attempt+1})")
time.sleep(5)
if idx % 10 == 0:
logger.info(f"Transcribed chunk {idx+1}/{len(chunks)}")
if not all_segments:
raise RuntimeError("HF API returned no transcription results")
# Detect language from first few segments
detected_lang = source_language or "hi"
# Save transcript
transcript_path = output_dir / "transcript.json"
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump({
"language": detected_lang,
"segments": all_segments,
"total_segments": len(all_segments),
"method": "huggingface_gpu_api",
}, f, ensure_ascii=False, indent=2)
if progress_callback:
progress_callback(100)
return all_segments
def _transcribe_local(
audio_path: Path,
output_dir: Path,
source_language: Optional[str],
progress_callback=None,
) -> List[Dict]:
"""Fallback: local faster-whisper on CPU."""
try:
from faster_whisper import WhisperModel
except ImportError:
raise RuntimeError("faster-whisper not installed. Run: pip install faster-whisper")
model = WhisperModel(WHISPER_MODEL_SIZE, device="cpu", compute_type="int8")
raw_segments, info = model.transcribe(
str(audio_path),
language=source_language,
beam_size=5,
vad_filter=True,
)
segments = []
for seg in raw_segments:
segments.append({
"start": round(seg.start, 3),
"end": round(seg.end, 3),
"text": seg.text.strip(),
"speaker": "SPEAKER_00",
"words": [],
})
transcript_path = output_dir / "transcript.json"
with open(transcript_path, "w", encoding="utf-8") as f:
json.dump({
"language": info.language,
"segments": segments,
"total_segments": len(segments),
"method": "local_faster_whisper",
}, f, ensure_ascii=False, indent=2)
if progress_callback:
progress_callback(100)
return segments
def _get_duration(audio_path: Path) -> float:
cmd = ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", str(audio_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
return float(result.stdout.strip())
def _split_audio(audio_path: Path, output_dir: Path, chunk_sec: int) -> List[Path]:
"""Split audio into fixed-duration chunks."""
duration = _get_duration(audio_path)
chunks = []
for start in range(0, int(duration) + 1, chunk_sec):
chunk_path = output_dir / f"chunk_{start:06d}.wav"
cmd = [
"ffmpeg", "-y", "-i", str(audio_path),
"-ss", str(start), "-t", str(chunk_sec),
"-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
str(chunk_path)
]
subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if chunk_path.exists() and chunk_path.stat().st_size > 1000:
chunks.append(chunk_path)
return chunks