OphtalmoCapture / interface /services /whisper_service.py
TheBug95's picture
cambios realizados para despligue con docker en Huggingface
372d5c5
"""OphthalmoCapture — Whisper Transcription Service
Encapsulates all Whisper-related logic: model loading, transcription,
and segment-level timestamps. Temporary files are ALWAYS cleaned up.
"""
import os
import shutil
import tempfile
import streamlit as st
import whisper
# ── Ensure ffmpeg is available ───────────────────────────────────────────────
# If system ffmpeg is not in PATH, use the bundled one from imageio-ffmpeg.
if shutil.which("ffmpeg") is None:
try:
import imageio_ffmpeg
_ffmpeg_real = imageio_ffmpeg.get_ffmpeg_exe()
_ffmpeg_dir = os.path.dirname(_ffmpeg_real)
# Create an alias so that Whisper (which calls "ffmpeg") can find it.
_alias_name = "ffmpeg.exe" if os.name == "nt" else "ffmpeg"
_ffmpeg_alias = os.path.join(_ffmpeg_dir, _alias_name)
if not os.path.exists(_ffmpeg_alias):
try:
os.link(_ffmpeg_real, _ffmpeg_alias) # hard link (no admin)
except OSError:
import shutil as _sh
_sh.copy2(_ffmpeg_real, _ffmpeg_alias) # fallback: copy
os.environ["PATH"] = (
_ffmpeg_dir + os.pathsep + os.environ.get("PATH", "")
)
except ImportError:
pass # Will fail later with a clear Whisper error
@st.cache_resource
def load_whisper_model(model_size: str):
"""Load and cache a Whisper model."""
print(f"Loading Whisper model: {model_size}...")
return whisper.load_model(model_size)
def transcribe_audio(model, audio_bytes: bytes, language: str = "es") -> str:
"""Transcribe raw WAV bytes and return plain text.
The temporary file is **always** deleted (try/finally).
"""
tmp_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_bytes)
tmp_path = tmp.name
result = model.transcribe(tmp_path, language=language)
return result.get("text", "").strip()
except Exception as e:
st.error(f"Error de transcripción: {e}")
return ""
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
def transcribe_audio_with_timestamps(
model, audio_bytes: bytes, language: str = "es"
) -> tuple[str, list[dict]]:
"""Transcribe raw WAV bytes and return (plain_text, segments).
Each segment dict contains:
{"start": float, "end": float, "text": str}
Useful for syncing transcript highlights with audio playback.
"""
tmp_path = None
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp.write(audio_bytes)
tmp_path = tmp.name
result = model.transcribe(tmp_path, language=language)
text = result.get("text", "").strip()
segments = []
for seg in result.get("segments", []):
segments.append({
"start": round(seg["start"], 2),
"end": round(seg["end"], 2),
"text": seg["text"].strip(),
})
return text, segments
except Exception as e:
st.error(f"Error de transcripción: {e}")
return "", []
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
def format_timestamp(seconds: float) -> str:
"""Convert seconds to MM:SS format."""
m, s = divmod(int(seconds), 60)
return f"{m:02d}:{s:02d}"