| import gradio as gr |
| import numpy as np |
| from transformers import pipeline |
|
|
| MODEL_ID = "openai/whisper-tiny" |
|
|
| asr = pipeline( |
| "automatic-speech-recognition", |
| model=MODEL_ID, |
| chunk_length_s=10, |
| device=-1, |
| ) |
|
|
| TARGET_SR = 16000 |
|
|
|
|
| def to_mono(audio_tuple): |
| if audio_tuple is None: |
| return None, None |
| sr, data = audio_tuple |
| if data is None: |
| return None, None |
| data = np.asarray(data) |
| if data.ndim == 2: |
| data = data.mean(axis=1) |
| if np.issubdtype(data.dtype, np.integer): |
| max_val = np.iinfo(data.dtype).max |
| data = data.astype(np.float32) / max_val |
| else: |
| data = data.astype(np.float32) |
| return sr, data |
|
|
|
|
| def linear_resample(audio, orig_sr, target_sr=TARGET_SR): |
| if orig_sr == target_sr: |
| return audio |
| duration = len(audio) / orig_sr |
| old_times = np.linspace(0, duration, num=len(audio), endpoint=False) |
| new_length = int(duration * target_sr) |
| new_times = np.linspace(0, duration, num=new_length, endpoint=False) |
| return np.interp(new_times, old_times, audio).astype(np.float32) |
|
|
|
|
| def run_asr(audio_np, sr): |
| if audio_np is None or len(audio_np) < sr * 0.4: |
| return "" |
| result = asr({"sampling_rate": sr, "raw": audio_np}) |
| if isinstance(result, dict): |
| return result.get("text", "").strip() |
| return str(result).strip() |
|
|
|
|
| def stream_transcribe(audio, state): |
| if state is None: |
| state = {"buffer": np.zeros(0, dtype=np.float32), "stable": "", "partial": ""} |
|
|
| sr, chunk = to_mono(audio) |
| if chunk is None: |
| return state, state.get("partial", ""), state.get("stable", "") |
|
|
| chunk = linear_resample(chunk, sr, TARGET_SR) |
| state["buffer"] = np.concatenate([state["buffer"], chunk]) |
|
|
| max_samples = TARGET_SR * 20 |
| if len(state["buffer"]) > max_samples: |
| state["buffer"] = state["buffer"][-max_samples:] |
|
|
| preview_samples = TARGET_SR * 8 |
| preview_audio = state["buffer"][-preview_samples:] |
|
|
| partial = run_asr(preview_audio, TARGET_SR) |
| state["partial"] = partial |
| live = (state["stable"] + " " + partial).strip() |
| return state, partial, live |
|
|
|
|
| def finalize(state): |
| if state is None: |
| state = {"buffer": np.zeros(0, dtype=np.float32), "stable": "", "partial": ""} |
| stable = state.get("stable", "").strip() |
| partial = state.get("partial", "").strip() |
| if partial: |
| stable = f"{stable} {partial}".strip() |
| state["stable"] = stable |
| state["partial"] = "" |
| state["buffer"] = np.zeros(0, dtype=np.float32) |
| return state, "", stable |
|
|
|
|
| def clear(): |
| state = {"buffer": np.zeros(0, dtype=np.float32), "stable": "", "partial": ""} |
| return None, state, "", "" |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown( |
| """ |
| # NeoScribe (pseudo-live) |
| Stream audio from the browser microphone and transcribe in near real time. |
| |
| Next step: send audio chunks from your browser extension to this backend. |
| """ |
| ) |
|
|
| state = gr.State({"buffer": np.zeros(0, dtype=np.float32), "stable": "", "partial": ""}) |
|
|
| audio = gr.Audio(sources=["microphone"], streaming=True, type="numpy", label="Input audio") |
| partial = gr.Textbox(label="Partial text", lines=3) |
| final = gr.Textbox(label="Stable transcript", lines=10) |
|
|
| clear_btn = gr.Button("Clear") |
|
|
| audio.stream( |
| stream_transcribe, |
| inputs=[audio, state], |
| outputs=[state, partial, final], |
| stream_every=0.8, |
| time_limit=120, |
| ) |
|
|
| audio.stop_recording(finalize, inputs=[state], outputs=[state, partial, final]) |
|
|
| clear_btn.click(clear, outputs=[audio, state, partial, final]) |
|
|
|
|
| demo.launch() |
|
|