import os import tempfile import math import torch import soundfile as sf from transformers import pipeline import gradio as gr from pydub import AudioSegment # ---- Models available ---- MODEL_CHOICES = { "Yoruba (EYEDOL/Yoruba-ASRNEW)": "EYEDOL/Yoruba-ASRNEW", "Naija English (EYEDOL/NAIJA_ENG-ASRNEW)": "EYEDOL/NAIJA_ENG-ASRNEW", } # Device selection for pipeline creation DEVICE = 0 if torch.cuda.is_available() else -1 # Cache created pipelines to avoid reloading PIPELINE_CACHE = {} def get_asr_pipeline(model_id: str): """Return a cached pipeline for model_id or create a new one.""" if model_id in PIPELINE_CACHE: return PIPELINE_CACHE[model_id] # Create and cache asr = pipeline("automatic-speech-recognition", model=model_id, device=DEVICE) PIPELINE_CACHE[model_id] = asr return asr # Utilities def save_numpy_to_wav(np_tuple): samplerate, data = np_tuple tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") sf.write(tmp.name, data, samplerate) return tmp.name def get_duration_seconds(path): try: info = sf.info(path) return info.duration except Exception: seg = AudioSegment.from_file(path) return len(seg) / 1000.0 def split_audio_file(path, chunk_length_ms=25000, overlap_ms=500): audio = AudioSegment.from_file(path) duration_ms = len(audio) chunks = [] start = 0 while start < duration_ms: end = min(start + chunk_length_ms, duration_ms) chunk = audio[start:end] tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") chunk.export(tmp.name, format="wav") chunks.append((tmp.name, start, end)) start += max(1, chunk_length_ms - overlap_ms) return chunks def transcribe_file_with_pipeline(asr_pipeline, path, return_timestamps=False): # wrapper that calls pipeline and returns its output if return_timestamps: return asr_pipeline(path, return_timestamps=True) else: return asr_pipeline(path) def transcribe(audio_input, model_id, allow_longform_with_timestamps=False, chunk_length_seconds=25, overlap_seconds=0.5): """ audio_input: either (sr, numpy_array) from mic (type="numpy") or filepath from upload (type="filepath") model_id: Hugging Face model id string Returns dict: {"full_text": str, "segments": [{start_s,end_s,text}, ...]} """ if audio_input is None: return {"error": "No audio provided."} # Normalize to a filepath created_tmp_input = False if isinstance(audio_input, tuple): audio_path = save_numpy_to_wav(audio_input) # we created this tmp file created_tmp_input = True else: audio_path = audio_input duration_s = get_duration_seconds(audio_path) asr = get_asr_pipeline(model_id) # Short audio: direct call if duration_s <= 30: out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=False) text = out.get("text", out) if isinstance(out, dict) else str(out) segments = [{"start_s": 0.0, "end_s": duration_s, "text": text}] full_text = text if created_tmp_input: try: os.unlink(audio_path) except: pass return {"full_text": full_text, "segments": segments} # Long audio (>30s) if allow_longform_with_timestamps: try: out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=True) # Attempt to parse common structures full_text = out.get("text", None) if isinstance(out, dict) else str(out) segments = [] if isinstance(out, dict): if "chunks" in out and isinstance(out["chunks"], list): for c in out["chunks"]: # chunk may contain 'timestamp' e.g. [start, end] or 'start'/'end' ts = c.get("timestamp", None) if isinstance(ts, list) and len(ts) == 2: start_s, end_s = ts[0], ts[1] else: start_s = c.get("start", None) end_s = c.get("end", None) segments.append({"start_s": start_s, "end_s": end_s, "text": c.get("text", "")}) elif "segments" in out and isinstance(out["segments"], list): for s in out["segments"]: segments.append({"start_s": s.get("start", None), "end_s": s.get("end", None), "text": s.get("text", "")}) elif "words" in out and isinstance(out["words"], list): for w in out["words"]: segments.append({"start_s": w.get("start", None), "end_s": w.get("end", None), "text": w.get("word", "")}) else: # no detailed structure -> fall back to full text if full_text is None: full_text = str(out) segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] else: # pipeline returned just a string full_text = str(out) segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] if created_tmp_input: try: os.unlink(audio_path) except: pass return {"full_text": full_text, "segments": segments} except Exception as e: # fallback to chunking print("Long-form timestamps failed; falling back to chunking:", e) # Chunking fallback chunk_length_ms = int(chunk_length_seconds * 1000) overlap_ms = int(overlap_seconds * 1000) chunks = split_audio_file(audio_path, chunk_length_ms=chunk_length_ms, overlap_ms=overlap_ms) stitched = [] segments = [] for chunk_path, start_ms, end_ms in chunks: try: out = transcribe_file_with_pipeline(asr, chunk_path, return_timestamps=False) text = out.get("text", out) if isinstance(out, dict) else str(out) except Exception as e: text = f"[ERROR on chunk: {e}]" start_s = start_ms / 1000.0 end_s = end_ms / 1000.0 segments.append({"start_s": start_s, "end_s": end_s, "text": text}) stitched.append(text) try: os.unlink(chunk_path) except: pass if created_tmp_input: try: os.unlink(audio_path) except: pass full_text = " ".join([s for s in stitched if s]) return {"full_text": full_text, "segments": segments} # ---- Gradio UI ---- with gr.Blocks(title="EYEDOL ASR — Multi-model (Yoruba + Naija English)") as demo: gr.Markdown("## EYEDOL ASR Demo\nSelect model, upload audio or use the microphone. Supports long audio via chunking or model long-form timestamps.") with gr.Row(): with gr.Column(scale=2): model_choice = gr.Dropdown(list(MODEL_CHOICES.keys()), value=list(MODEL_CHOICES.keys())[0], label="Choose model") mic_input = gr.Audio(label="Record (click Record → Stop)", type="numpy") file_input = gr.Audio(label="Or upload audio file", type="filepath") source = gr.Radio(["Use microphone input", "Use uploaded file"], value="Use microphone input", label="Input source") longform = gr.Checkbox(label="Try model's built-in long-form timestamps (if supported)", value=False) chunk_len = gr.Slider(minimum=10, maximum=120, value=25, step=5, label="Chunk length (seconds)") overlap = gr.Slider(minimum=0.0, maximum=5.0, value=0.5, step=0.5, label="Chunk overlap (seconds)") transcribe_btn = gr.Button("Transcribe") gr.Markdown("**Note:** If a model is private add `HF_TOKEN` as a secret in Space settings. GPU recommended for best performance.") with gr.Column(scale=3): full_text_out = gr.Textbox(label="Full transcription", lines=8) segments_out = gr.JSON(label="Segments (start_s, end_s, text)") def handle_transcription(mic_input, file_input, source_choice, model_label, use_longform, chunk_len_s, overlap_s): model_id = MODEL_CHOICES.get(model_label) audio_src = mic_input if source_choice == "Use microphone input" else file_input res = transcribe(audio_src, model_id=model_id, allow_longform_with_timestamps=use_longform, chunk_length_seconds=chunk_len_s, overlap_seconds=overlap_s) if "error" in res: return res["error"], [] return res["full_text"], res["segments"] transcribe_btn.click( fn=handle_transcription, inputs=[mic_input, file_input, source, model_choice, longform, chunk_len, overlap], outputs=[full_text_out, segments_out], ) if __name__ == "__main__": demo.launch()