File size: 8,855 Bytes
822cbeb
 
 
 
 
 
86fcf07
 
822cbeb
100ae16
822cbeb
 
 
 
 
 
1095508
 
 
 
 
 
 
 
822cbeb
86fcf07
 
 
 
 
 
822cbeb
 
 
 
 
 
86fcf07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
822cbeb
 
 
 
 
 
 
 
 
 
 
 
 
86fcf07
 
 
 
822cbeb
86fcf07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
822cbeb
86fcf07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
822cbeb
86fcf07
 
 
 
 
 
 
 
 
 
 
 
 
822cbeb
 
 
 
86fcf07
822cbeb
 
 
 
 
 
 
 
 
 
1095508
 
822cbeb
 
1095508
 
 
822cbeb
 
 
 
 
40b8c7a
 
 
822cbeb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100ae16
822cbeb
 
1095508
 
 
822cbeb
100ae16
822cbeb
1095508
 
822cbeb
100ae16
822cbeb
40b8c7a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# app.py
import os
import tempfile
import uuid
import soundfile as sf
from pathlib import Path
import numpy as np
import logging

import gradio as gr
from transformers import pipeline

# Load the Hugging Face automatic speech recognition pipeline.
# The model "openai/whisper-small" is public and works on CPU (smaller memory footprint).
# Loading may take a few seconds at startup.
ASR_MODEL = "openai/whisper-small"
# Use Whisper's translate task so output is English regardless of input language
asr = pipeline(
    "automatic-speech-recognition",
    model=ASR_MODEL,
    chunk_length_s=30,
    ignore_warning=True,
    generate_kwargs={"task": "translate"},
)

# Debug flag: set True to print audio shapes/dtypes and save resampled temp WAVs
DEBUG = False
logger = logging.getLogger(__name__)
if DEBUG:
    logging.basicConfig(level=logging.DEBUG)

def save_audio_to_wav(audio, sr):
    """
    audio: numpy array (samples,) or path string
    sr: sample rate
    Returns path to saved wav
    """
    # unwrap common tuple forms (array, sr) or (sr, array)
    if isinstance(audio, (list, tuple)):
        # prefer numpy array element
        arr = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None)
        if isinstance(arr, (list, tuple)):
            audio = np.asarray(arr)
        elif isinstance(arr, np.ndarray):
            audio = arr
        else:
            # fallback to first element
            audio = np.asarray(audio[0])

    # ensure numpy array
    audio = np.asarray(audio)

    # If shape is (channels, frames) transpose to (frames, channels)
    if audio.ndim == 2 and audio.shape[0] <= 2 and audio.shape[1] > audio.shape[0]:
        audio = audio.T

    # Convert integer audio to float32 in [-1, 1] or ensure float32
    if np.issubdtype(audio.dtype, np.integer):
        maxv = np.iinfo(audio.dtype).max
        audio = audio.astype("float32") / float(maxv)
    else:
        audio = audio.astype("float32")

    tmpdir = tempfile.gettempdir()
    fname = Path(tmpdir) / f"hf_audio_{uuid.uuid4().hex}.wav"
    sf.write(str(fname), audio, sr, format="WAV")
    return str(fname)

def transcribe(audio):
    """
    audio: either a file path string (Gradio sometimes returns a path)
           or a tuple (np_array, sample_rate) from Gradio's audio component.
    """
    if audio is None:
        return "No audio provided."

    # If Gradio gives a filepath (str), read it with soundfile to avoid ffmpeg requirement
    audio_array = None
    sampling_rate = None

    if isinstance(audio, str):
        try:
            audio_array, sampling_rate = sf.read(audio)
        except Exception as e:
            return f"Could not read audio file: {e}"
    else:
        # Normalize audio to (samples, sr)
        samples = None
        sr = None
        if isinstance(audio, (list, tuple)):
            # common forms: (samples, sr) or (sr, samples)
            if len(audio) >= 2:
                a0, a1 = audio[0], audio[1]
                if isinstance(a0, (list, tuple, np.ndarray)):
                    samples, sr = a0, a1
                elif isinstance(a1, (list, tuple, np.ndarray)):
                    samples, sr = a1, a0
            # fallback: try to find array and int within the tuple
            if samples is None:
                samples = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None)
                sr = next((x for x in audio if isinstance(x, int)), None)
        else:
            samples = audio

        if samples is None:
            return "Unsupported audio format."

        # default sr if missing
        if sr is None:
            sr = 16000

        audio_array = np.asarray(samples)
        sampling_rate = sr

    # Ensure numpy array and float32
    try:
        audio_array = np.asarray(audio_array)
    except Exception:
        return "Unsupported audio data - cannot convert to numpy array."

    # If 2D (frames, channels) or (channels, frames), make mono by averaging channels
    if audio_array.ndim == 2:
        # If shape looks like (channels, frames), transpose first
        if audio_array.shape[0] <= 2 and audio_array.shape[1] > audio_array.shape[0]:
            audio_array = audio_array.T
        # average channels to mono
        audio_array = np.mean(audio_array, axis=1)

    # Convert integer audio to float32 in [-1, 1] or ensure float32
    if np.issubdtype(audio_array.dtype, np.integer):
        maxv = np.iinfo(audio_array.dtype).max
        audio_array = audio_array.astype("float32") / float(maxv)
    else:
        audio_array = audio_array.astype("float32")
    # Resample to the model's expected sampling rate if needed (avoid passing sampling_rate kwarg)
    try:
        model_sr = getattr(getattr(asr, "feature_extractor", None), "sampling_rate", None)
    except Exception:
        model_sr = None

    if model_sr is None:
        model_sr = 16000

    # if incoming sampling_rate is missing, assume model rate
    if sampling_rate is None:
        sampling_rate = model_sr

    if sampling_rate != model_sr:
        # simple linear resampling via numpy.interp
        try:
            orig_len = audio_array.shape[0]
            new_len = int(round(orig_len * float(model_sr) / float(sampling_rate)))
            if new_len <= 0:
                return "Transcription failed: invalid resample length"
            new_indices = np.linspace(0, orig_len - 1, new_len)
            old_indices = np.arange(orig_len)
            audio_array = np.interp(new_indices, old_indices, audio_array).astype("float32")
            sampling_rate = model_sr
        except Exception as e:
            return f"Transcription failed during resampling: {e}"

    # Debug: log and optionally save the resampled audio
    if DEBUG:
        try:
            logger.debug(f"Calling ASR with audio_array.shape={audio_array.shape}, dtype={audio_array.dtype}, sampling_rate={sampling_rate}")
            tmpdir = tempfile.gettempdir()
            dbg_fname = Path(tmpdir) / f"hf_debug_audio_{uuid.uuid4().hex}.wav"
            sf.write(str(dbg_fname), audio_array, sampling_rate, format="WAV")
            logger.debug(f"Wrote debug WAV to {dbg_fname}")
        except Exception as e:
            logger.debug(f"Debug save failed: {e}")

    # Use the pipeline to transcribe by passing just the numpy array (model expects array at its sampling rate)
    try:
        result = asr(audio_array)
    except Exception as e:
        return f"Transcription failed: {e}"
    text = result.get("text", "").strip()

    # cleanup temporary file
    try:
            pass  # Removed cleanup code referencing undefined audio_path
    except Exception:
        pass

    if not text:
        return "No speech detected / transcription empty."
    return text

def clear_audio():
    return None, ""


with gr.Blocks(title="Whisper-Small Speech-to-English") as demo:
    gr.Markdown(
        """
        # 🎙️ Whisper-Small Speech-to-English
        Record or upload audio and click **Transcribe**.
        This app uses `openai/whisper-small` in translate mode and returns English text.
        """
    )

    with gr.Row():
        with gr.Column(scale=1):
            audio_input = gr.Audio(type="numpy", label="Record or upload audio")
            upload_input = gr.Audio(type="numpy", label="Or upload an audio file")

            transcribe_btn = gr.Button("Transcribe")
            clear_btn = gr.Button("Clear")
        with gr.Column(scale=1):
            transcript = gr.Textbox(label="Transcription", lines=8)
            copy_btn = gr.Button("Copy transcript")

    # When clicking the transcribe button, prefer recorded audio if present,
    # otherwise use uploaded audio.
    def _get_preferred_audio(recorded, uploaded):
        # recorded or uploaded may be numpy tuples or file paths depending on Gradio
        if recorded:
            return recorded
        if uploaded:
            return uploaded
        return None

    transcribe_btn.click(
        fn=lambda rec, up: transcribe(_get_preferred_audio(rec, up)),
        inputs=[audio_input, upload_input],
        outputs=transcript,
    )

    clear_btn.click(
        fn=clear_audio,
        inputs=None,
        outputs=[audio_input, transcript],
    )

    # Copy transcript to clipboard (Gradio has `copy` action for buttons)
    copy_btn.click(
        fn=lambda txt: txt,
        inputs=transcript,
        outputs=None,
    )

    gr.Markdown(
        "Notes: The app translates spoken audio to English using Whisper (translate task). "
        "Small model runs on CPU and may take time for longer files. For lower latency or other target languages, consider the HF Inference API or additional translation pipelines."
    )

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=True)