EAG_s9_sample / app.py
Rahuluni's picture
add eng only
1095508
# app.py
import os
import tempfile
import uuid
import soundfile as sf
from pathlib import Path
import numpy as np
import logging
import gradio as gr
from transformers import pipeline
# Load the Hugging Face automatic speech recognition pipeline.
# The model "openai/whisper-small" is public and works on CPU (smaller memory footprint).
# Loading may take a few seconds at startup.
ASR_MODEL = "openai/whisper-small"
# Use Whisper's translate task so output is English regardless of input language
asr = pipeline(
"automatic-speech-recognition",
model=ASR_MODEL,
chunk_length_s=30,
ignore_warning=True,
generate_kwargs={"task": "translate"},
)
# Debug flag: set True to print audio shapes/dtypes and save resampled temp WAVs
DEBUG = False
logger = logging.getLogger(__name__)
if DEBUG:
logging.basicConfig(level=logging.DEBUG)
def save_audio_to_wav(audio, sr):
"""
audio: numpy array (samples,) or path string
sr: sample rate
Returns path to saved wav
"""
# unwrap common tuple forms (array, sr) or (sr, array)
if isinstance(audio, (list, tuple)):
# prefer numpy array element
arr = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None)
if isinstance(arr, (list, tuple)):
audio = np.asarray(arr)
elif isinstance(arr, np.ndarray):
audio = arr
else:
# fallback to first element
audio = np.asarray(audio[0])
# ensure numpy array
audio = np.asarray(audio)
# If shape is (channels, frames) transpose to (frames, channels)
if audio.ndim == 2 and audio.shape[0] <= 2 and audio.shape[1] > audio.shape[0]:
audio = audio.T
# Convert integer audio to float32 in [-1, 1] or ensure float32
if np.issubdtype(audio.dtype, np.integer):
maxv = np.iinfo(audio.dtype).max
audio = audio.astype("float32") / float(maxv)
else:
audio = audio.astype("float32")
tmpdir = tempfile.gettempdir()
fname = Path(tmpdir) / f"hf_audio_{uuid.uuid4().hex}.wav"
sf.write(str(fname), audio, sr, format="WAV")
return str(fname)
def transcribe(audio):
"""
audio: either a file path string (Gradio sometimes returns a path)
or a tuple (np_array, sample_rate) from Gradio's audio component.
"""
if audio is None:
return "No audio provided."
# If Gradio gives a filepath (str), read it with soundfile to avoid ffmpeg requirement
audio_array = None
sampling_rate = None
if isinstance(audio, str):
try:
audio_array, sampling_rate = sf.read(audio)
except Exception as e:
return f"Could not read audio file: {e}"
else:
# Normalize audio to (samples, sr)
samples = None
sr = None
if isinstance(audio, (list, tuple)):
# common forms: (samples, sr) or (sr, samples)
if len(audio) >= 2:
a0, a1 = audio[0], audio[1]
if isinstance(a0, (list, tuple, np.ndarray)):
samples, sr = a0, a1
elif isinstance(a1, (list, tuple, np.ndarray)):
samples, sr = a1, a0
# fallback: try to find array and int within the tuple
if samples is None:
samples = next((x for x in audio if isinstance(x, (list, tuple, np.ndarray))), None)
sr = next((x for x in audio if isinstance(x, int)), None)
else:
samples = audio
if samples is None:
return "Unsupported audio format."
# default sr if missing
if sr is None:
sr = 16000
audio_array = np.asarray(samples)
sampling_rate = sr
# Ensure numpy array and float32
try:
audio_array = np.asarray(audio_array)
except Exception:
return "Unsupported audio data - cannot convert to numpy array."
# If 2D (frames, channels) or (channels, frames), make mono by averaging channels
if audio_array.ndim == 2:
# If shape looks like (channels, frames), transpose first
if audio_array.shape[0] <= 2 and audio_array.shape[1] > audio_array.shape[0]:
audio_array = audio_array.T
# average channels to mono
audio_array = np.mean(audio_array, axis=1)
# Convert integer audio to float32 in [-1, 1] or ensure float32
if np.issubdtype(audio_array.dtype, np.integer):
maxv = np.iinfo(audio_array.dtype).max
audio_array = audio_array.astype("float32") / float(maxv)
else:
audio_array = audio_array.astype("float32")
# Resample to the model's expected sampling rate if needed (avoid passing sampling_rate kwarg)
try:
model_sr = getattr(getattr(asr, "feature_extractor", None), "sampling_rate", None)
except Exception:
model_sr = None
if model_sr is None:
model_sr = 16000
# if incoming sampling_rate is missing, assume model rate
if sampling_rate is None:
sampling_rate = model_sr
if sampling_rate != model_sr:
# simple linear resampling via numpy.interp
try:
orig_len = audio_array.shape[0]
new_len = int(round(orig_len * float(model_sr) / float(sampling_rate)))
if new_len <= 0:
return "Transcription failed: invalid resample length"
new_indices = np.linspace(0, orig_len - 1, new_len)
old_indices = np.arange(orig_len)
audio_array = np.interp(new_indices, old_indices, audio_array).astype("float32")
sampling_rate = model_sr
except Exception as e:
return f"Transcription failed during resampling: {e}"
# Debug: log and optionally save the resampled audio
if DEBUG:
try:
logger.debug(f"Calling ASR with audio_array.shape={audio_array.shape}, dtype={audio_array.dtype}, sampling_rate={sampling_rate}")
tmpdir = tempfile.gettempdir()
dbg_fname = Path(tmpdir) / f"hf_debug_audio_{uuid.uuid4().hex}.wav"
sf.write(str(dbg_fname), audio_array, sampling_rate, format="WAV")
logger.debug(f"Wrote debug WAV to {dbg_fname}")
except Exception as e:
logger.debug(f"Debug save failed: {e}")
# Use the pipeline to transcribe by passing just the numpy array (model expects array at its sampling rate)
try:
result = asr(audio_array)
except Exception as e:
return f"Transcription failed: {e}"
text = result.get("text", "").strip()
# cleanup temporary file
try:
pass # Removed cleanup code referencing undefined audio_path
except Exception:
pass
if not text:
return "No speech detected / transcription empty."
return text
def clear_audio():
return None, ""
with gr.Blocks(title="Whisper-Small Speech-to-English") as demo:
gr.Markdown(
"""
# 🎙️ Whisper-Small Speech-to-English
Record or upload audio and click **Transcribe**.
This app uses `openai/whisper-small` in translate mode and returns English text.
"""
)
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.Audio(type="numpy", label="Record or upload audio")
upload_input = gr.Audio(type="numpy", label="Or upload an audio file")
transcribe_btn = gr.Button("Transcribe")
clear_btn = gr.Button("Clear")
with gr.Column(scale=1):
transcript = gr.Textbox(label="Transcription", lines=8)
copy_btn = gr.Button("Copy transcript")
# When clicking the transcribe button, prefer recorded audio if present,
# otherwise use uploaded audio.
def _get_preferred_audio(recorded, uploaded):
# recorded or uploaded may be numpy tuples or file paths depending on Gradio
if recorded:
return recorded
if uploaded:
return uploaded
return None
transcribe_btn.click(
fn=lambda rec, up: transcribe(_get_preferred_audio(rec, up)),
inputs=[audio_input, upload_input],
outputs=transcript,
)
clear_btn.click(
fn=clear_audio,
inputs=None,
outputs=[audio_input, transcript],
)
# Copy transcript to clipboard (Gradio has `copy` action for buttons)
copy_btn.click(
fn=lambda txt: txt,
inputs=transcript,
outputs=None,
)
gr.Markdown(
"Notes: The app translates spoken audio to English using Whisper (translate task). "
"Small model runs on CPU and may take time for longer files. For lower latency or other target languages, consider the HF Inference API or additional translation pipelines."
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)), share=True)