File size: 3,536 Bytes
b0c3a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
372d5c5
 
 
 
b0c3a57
 
 
 
 
 
 
372d5c5
b0c3a57
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""OphthalmoCapture — Whisper Transcription Service

Encapsulates all Whisper-related logic: model loading, transcription,
and segment-level timestamps.  Temporary files are ALWAYS cleaned up.
"""

import os
import shutil
import tempfile
import streamlit as st
import whisper

# ── Ensure ffmpeg is available ───────────────────────────────────────────────
# If system ffmpeg is not in PATH, use the bundled one from imageio-ffmpeg.
if shutil.which("ffmpeg") is None:
    try:
        import imageio_ffmpeg
        _ffmpeg_real = imageio_ffmpeg.get_ffmpeg_exe()
        _ffmpeg_dir = os.path.dirname(_ffmpeg_real)
        # Create an alias so that Whisper (which calls "ffmpeg") can find it.
        _alias_name = "ffmpeg.exe" if os.name == "nt" else "ffmpeg"
        _ffmpeg_alias = os.path.join(_ffmpeg_dir, _alias_name)
        if not os.path.exists(_ffmpeg_alias):
            try:
                os.link(_ffmpeg_real, _ffmpeg_alias)   # hard link (no admin)
            except OSError:
                import shutil as _sh
                _sh.copy2(_ffmpeg_real, _ffmpeg_alias)  # fallback: copy
        os.environ["PATH"] = (
            _ffmpeg_dir + os.pathsep + os.environ.get("PATH", "")
        )
    except ImportError:
        pass  # Will fail later with a clear Whisper error


@st.cache_resource
def load_whisper_model(model_size: str):
    """Load and cache a Whisper model."""
    print(f"Loading Whisper model: {model_size}...")
    return whisper.load_model(model_size)


def transcribe_audio(model, audio_bytes: bytes, language: str = "es") -> str:
    """Transcribe raw WAV bytes and return plain text.

    The temporary file is **always** deleted (try/finally).
    """
    tmp_path = None
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
            tmp.write(audio_bytes)
            tmp_path = tmp.name

        result = model.transcribe(tmp_path, language=language)
        return result.get("text", "").strip()
    except Exception as e:
        st.error(f"Error de transcripción: {e}")
        return ""
    finally:
        if tmp_path and os.path.exists(tmp_path):
            os.unlink(tmp_path)


def transcribe_audio_with_timestamps(
    model, audio_bytes: bytes, language: str = "es"
) -> tuple[str, list[dict]]:
    """Transcribe raw WAV bytes and return (plain_text, segments).

    Each segment dict contains:
        {"start": float, "end": float, "text": str}

    Useful for syncing transcript highlights with audio playback.
    """
    tmp_path = None
    try:
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
            tmp.write(audio_bytes)
            tmp_path = tmp.name

        result = model.transcribe(tmp_path, language=language)
        text = result.get("text", "").strip()

        segments = []
        for seg in result.get("segments", []):
            segments.append({
                "start": round(seg["start"], 2),
                "end": round(seg["end"], 2),
                "text": seg["text"].strip(),
            })

        return text, segments
    except Exception as e:
        st.error(f"Error de transcripción: {e}")
        return "", []
    finally:
        if tmp_path and os.path.exists(tmp_path):
            os.unlink(tmp_path)


def format_timestamp(seconds: float) -> str:
    """Convert seconds to MM:SS format."""
    m, s = divmod(int(seconds), 60)
    return f"{m:02d}:{s:02d}"