Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import tempfile | |
| import uuid | |
| import soundfile as sf | |
| from pathlib import Path | |
| import numpy as np | |
| import logging | |
| import gradio as gr | |
| from transformers import pipeline | |
| # Load the Hugging Face automatic speech recognition pipeline. | |
| # The model "openai/whisper-small" is public and works on CPU (smaller memory footprint). | |
| # Loading may take a few seconds at startup. | |
| ASR_MODEL = "openai/whisper-small" | |
| # Use Whisper's translate task so output is English regardless of input language | |
| asr = pipeline( | |
| "automatic-speech-recognition", | |
| model=ASR_MODEL, | |
| chunk_length_s=30, | |
| ignore_warning=True, | |
| generate_kwargs={"task": "translate"}, | |
| ) | |
| # Debug flag: set True to print audio shapes/dtypes and save resampled temp WAVs | |
| DEBUG = False | |
| logger = logging.getLogger(__name__) | |
| if DEBUG: | |
| logging.basicConfig(level=logging.DEBUG) | |
| def save_audio_to_wav(audio, sr): | |
| """ | |
| audio: numpy array (samples,) or path string | |
| sr: sample rate | |
| Returns path to saved wav | |
| """ | |
| # unwrap common tuple forms (array, sr) or (sr, array) | |
| if isinstance(audio, (list, tuple)): | |
| # prefer numpy array element | |
| arr = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None) | |
| if isinstance(arr, (list, tuple)): | |
| audio = np.asarray(arr) | |
| elif isinstance(arr, np.ndarray): | |
| audio = arr | |
| else: | |
| # fallback to first element | |
| audio = np.asarray(audio[0]) | |
| # ensure numpy array | |
| audio = np.asarray(audio) | |
| # If shape is (channels, frames) transpose to (frames, channels) | |
| if audio.ndim == 2 and audio.shape[0] <= 2 and audio.shape[1] > audio.shape[0]: | |
| audio = audio.T | |
| # Convert integer audio to float32 in [-1, 1] or ensure float32 | |
| if np.issubdtype(audio.dtype, np.integer): | |
| maxv = np.iinfo(audio.dtype).max | |
| audio = audio.astype("float32") / float(maxv) | |
| else: | |
| audio = audio.astype("float32") | |
| tmpdir = tempfile.gettempdir() | |
| fname = Path(tmpdir) / f"hf_audio_{uuid.uuid4().hex}.wav" | |
| sf.write(str(fname), audio, sr, format="WAV") | |
| return str(fname) | |
| def transcribe(audio): | |
| """ | |
| audio: either a file path string (Gradio sometimes returns a path) | |
| or a tuple (np_array, sample_rate) from Gradio's audio component. | |
| """ | |
| if audio is None: | |
| return "No audio provided." | |
| # If Gradio gives a filepath (str), read it with soundfile to avoid ffmpeg requirement | |
| audio_array = None | |
| sampling_rate = None | |
| if isinstance(audio, str): | |
| try: | |
| audio_array, sampling_rate = sf.read(audio) | |
| except Exception as e: | |
| return f"Could not read audio file: {e}" | |
| else: | |
| # Normalize audio to (samples, sr) | |
| samples = None | |
| sr = None | |
| if isinstance(audio, (list, tuple)): | |
| # common forms: (samples, sr) or (sr, samples) | |
| if len(audio) >= 2: | |
| a0, a1 = audio[0], audio[1] | |
| if isinstance(a0, (list, tuple, np.ndarray)): | |
| samples, sr = a0, a1 | |
| elif isinstance(a1, (list, tuple, np.ndarray)): | |
| samples, sr = a1, a0 | |
| # fallback: try to find array and int within the tuple | |
| if samples is None: | |
| samples = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None) | |
| sr = next((x for x in audio if isinstance(x, int)), None) | |
| else: | |
| samples = audio | |
| if samples is None: | |
| return "Unsupported audio format." | |
| # default sr if missing | |
| if sr is None: | |
| sr = 16000 | |
| audio_array = np.asarray(samples) | |
| sampling_rate = sr | |
| # Ensure numpy array and float32 | |
| try: | |
| audio_array = np.asarray(audio_array) | |
| except Exception: | |
| return "Unsupported audio data - cannot convert to numpy array." | |
| # If 2D (frames, channels) or (channels, frames), make mono by averaging channels | |
| if audio_array.ndim == 2: | |
| # If shape looks like (channels, frames), transpose first | |
| if audio_array.shape[0] <= 2 and audio_array.shape[1] > audio_array.shape[0]: | |
| audio_array = audio_array.T | |
| # average channels to mono | |
| audio_array = np.mean(audio_array, axis=1) | |
| # Convert integer audio to float32 in [-1, 1] or ensure float32 | |
| if np.issubdtype(audio_array.dtype, np.integer): | |
| maxv = np.iinfo(audio_array.dtype).max | |
| audio_array = audio_array.astype("float32") / float(maxv) | |
| else: | |
| audio_array = audio_array.astype("float32") | |
| # Resample to the model's expected sampling rate if needed (avoid passing sampling_rate kwarg) | |
| try: | |
| model_sr = getattr(getattr(asr, "feature_extractor", None), "sampling_rate", None) | |
| except Exception: | |
| model_sr = None | |
| if model_sr is None: | |
| model_sr = 16000 | |
| # if incoming sampling_rate is missing, assume model rate | |
| if sampling_rate is None: | |
| sampling_rate = model_sr | |
| if sampling_rate != model_sr: | |
| # simple linear resampling via numpy.interp | |
| try: | |
| orig_len = audio_array.shape[0] | |
| new_len = int(round(orig_len * float(model_sr) / float(sampling_rate))) | |
| if new_len <= 0: | |
| return "Transcription failed: invalid resample length" | |
| new_indices = np.linspace(0, orig_len - 1, new_len) | |
| old_indices = np.arange(orig_len) | |
| audio_array = np.interp(new_indices, old_indices, audio_array).astype("float32") | |
| sampling_rate = model_sr | |
| except Exception as e: | |
| return f"Transcription failed during resampling: {e}" | |
| # Debug: log and optionally save the resampled audio | |
| if DEBUG: | |
| try: | |
| logger.debug(f"Calling ASR with audio_array.shape={audio_array.shape}, dtype={audio_array.dtype}, sampling_rate={sampling_rate}") | |
| tmpdir = tempfile.gettempdir() | |
| dbg_fname = Path(tmpdir) / f"hf_debug_audio_{uuid.uuid4().hex}.wav" | |
| sf.write(str(dbg_fname), audio_array, sampling_rate, format="WAV") | |
| logger.debug(f"Wrote debug WAV to {dbg_fname}") | |
| except Exception as e: | |
| logger.debug(f"Debug save failed: {e}") | |
| # Use the pipeline to transcribe by passing just the numpy array (model expects array at its sampling rate) | |
| try: | |
| result = asr(audio_array) | |
| except Exception as e: | |
| return f"Transcription failed: {e}" | |
| text = result.get("text", "").strip() | |
| # cleanup temporary file | |
| try: | |
| pass # Removed cleanup code referencing undefined audio_path | |
| except Exception: | |
| pass | |
| if not text: | |
| return "No speech detected / transcription empty." | |
| return text | |
| def clear_audio(): | |
| return None, "" | |
| with gr.Blocks(title="Whisper-Small Speech-to-English") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🎙️ Whisper-Small Speech-to-English | |
| Record or upload audio and click **Transcribe**. | |
| This app uses `openai/whisper-small` in translate mode and returns English text. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio(type="numpy", label="Record or upload audio") | |
| upload_input = gr.Audio(type="numpy", label="Or upload an audio file") | |
| transcribe_btn = gr.Button("Transcribe") | |
| clear_btn = gr.Button("Clear") | |
| with gr.Column(scale=1): | |
| transcript = gr.Textbox(label="Transcription", lines=8) | |
| copy_btn = gr.Button("Copy transcript") | |
| # When clicking the transcribe button, prefer recorded audio if present, | |
| # otherwise use uploaded audio. | |
| def _get_preferred_audio(recorded, uploaded): | |
| # recorded or uploaded may be numpy tuples or file paths depending on Gradio | |
| if recorded: | |
| return recorded | |
| if uploaded: | |
| return uploaded | |
| return None | |
| transcribe_btn.click( | |
| fn=lambda rec, up: transcribe(_get_preferred_audio(rec, up)), | |
| inputs=[audio_input, upload_input], | |
| outputs=transcript, | |
| ) | |
| clear_btn.click( | |
| fn=clear_audio, | |
| inputs=None, | |
| outputs=[audio_input, transcript], | |
| ) | |
| # Copy transcript to clipboard (Gradio has `copy` action for buttons) | |
| copy_btn.click( | |
| fn=lambda txt: txt, | |
| inputs=transcript, | |
| outputs=None, | |
| ) | |
| gr.Markdown( | |
| "Notes: The app translates spoken audio to English using Whisper (translate task). " | |
| "Small model runs on CPU and may take time for longer files. For lower latency or other target languages, consider the HF Inference API or additional translation pipelines." | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=True) | |