DEV_LANG / app.py
EYEDOL's picture
Update app.py
975eb7a verified
import os
import tempfile
import math
import torch
import soundfile as sf
from transformers import pipeline
import gradio as gr
from pydub import AudioSegment
# ---- Models available ----
MODEL_CHOICES = {
"Yoruba (EYEDOL/Yoruba-ASRNEW)": "EYEDOL/Yoruba-ASRNEW",
"Naija English (EYEDOL/NAIJA_ENG-ASRNEW)": "EYEDOL/NAIJA_ENG-ASRNEW",
}
# Device selection for pipeline creation
DEVICE = 0 if torch.cuda.is_available() else -1
# Cache created pipelines to avoid reloading
PIPELINE_CACHE = {}
def get_asr_pipeline(model_id: str):
"""Return a cached pipeline for model_id or create a new one."""
if model_id in PIPELINE_CACHE:
return PIPELINE_CACHE[model_id]
# Create and cache
asr = pipeline("automatic-speech-recognition", model=model_id, device=DEVICE)
PIPELINE_CACHE[model_id] = asr
return asr
# Utilities
def save_numpy_to_wav(np_tuple):
samplerate, data = np_tuple
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
sf.write(tmp.name, data, samplerate)
return tmp.name
def get_duration_seconds(path):
try:
info = sf.info(path)
return info.duration
except Exception:
seg = AudioSegment.from_file(path)
return len(seg) / 1000.0
def split_audio_file(path, chunk_length_ms=25000, overlap_ms=500):
audio = AudioSegment.from_file(path)
duration_ms = len(audio)
chunks = []
start = 0
while start < duration_ms:
end = min(start + chunk_length_ms, duration_ms)
chunk = audio[start:end]
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
chunk.export(tmp.name, format="wav")
chunks.append((tmp.name, start, end))
start += max(1, chunk_length_ms - overlap_ms)
return chunks
def transcribe_file_with_pipeline(asr_pipeline, path, return_timestamps=False):
# wrapper that calls pipeline and returns its output
if return_timestamps:
return asr_pipeline(path, return_timestamps=True)
else:
return asr_pipeline(path)
def transcribe(audio_input, model_id, allow_longform_with_timestamps=False, chunk_length_seconds=25, overlap_seconds=0.5):
"""
audio_input: either (sr, numpy_array) from mic (type="numpy") or filepath from upload (type="filepath")
model_id: Hugging Face model id string
Returns dict: {"full_text": str, "segments": [{start_s,end_s,text}, ...]}
"""
if audio_input is None:
return {"error": "No audio provided."}
# Normalize to a filepath
created_tmp_input = False
if isinstance(audio_input, tuple):
audio_path = save_numpy_to_wav(audio_input) # we created this tmp file
created_tmp_input = True
else:
audio_path = audio_input
duration_s = get_duration_seconds(audio_path)
asr = get_asr_pipeline(model_id)
# Short audio: direct call
if duration_s <= 30:
out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=False)
text = out.get("text", out) if isinstance(out, dict) else str(out)
segments = [{"start_s": 0.0, "end_s": duration_s, "text": text}]
full_text = text
if created_tmp_input:
try: os.unlink(audio_path)
except: pass
return {"full_text": full_text, "segments": segments}
# Long audio (>30s)
if allow_longform_with_timestamps:
try:
out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=True)
# Attempt to parse common structures
full_text = out.get("text", None) if isinstance(out, dict) else str(out)
segments = []
if isinstance(out, dict):
if "chunks" in out and isinstance(out["chunks"], list):
for c in out["chunks"]:
# chunk may contain 'timestamp' e.g. [start, end] or 'start'/'end'
ts = c.get("timestamp", None)
if isinstance(ts, list) and len(ts) == 2:
start_s, end_s = ts[0], ts[1]
else:
start_s = c.get("start", None)
end_s = c.get("end", None)
segments.append({"start_s": start_s, "end_s": end_s, "text": c.get("text", "")})
elif "segments" in out and isinstance(out["segments"], list):
for s in out["segments"]:
segments.append({"start_s": s.get("start", None), "end_s": s.get("end", None), "text": s.get("text", "")})
elif "words" in out and isinstance(out["words"], list):
for w in out["words"]:
segments.append({"start_s": w.get("start", None), "end_s": w.get("end", None), "text": w.get("word", "")})
else:
# no detailed structure -> fall back to full text
if full_text is None:
full_text = str(out)
segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}]
else:
# pipeline returned just a string
full_text = str(out)
segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}]
if created_tmp_input:
try: os.unlink(audio_path)
except: pass
return {"full_text": full_text, "segments": segments}
except Exception as e:
# fallback to chunking
print("Long-form timestamps failed; falling back to chunking:", e)
# Chunking fallback
chunk_length_ms = int(chunk_length_seconds * 1000)
overlap_ms = int(overlap_seconds * 1000)
chunks = split_audio_file(audio_path, chunk_length_ms=chunk_length_ms, overlap_ms=overlap_ms)
stitched = []
segments = []
for chunk_path, start_ms, end_ms in chunks:
try:
out = transcribe_file_with_pipeline(asr, chunk_path, return_timestamps=False)
text = out.get("text", out) if isinstance(out, dict) else str(out)
except Exception as e:
text = f"[ERROR on chunk: {e}]"
start_s = start_ms / 1000.0
end_s = end_ms / 1000.0
segments.append({"start_s": start_s, "end_s": end_s, "text": text})
stitched.append(text)
try: os.unlink(chunk_path)
except: pass
if created_tmp_input:
try: os.unlink(audio_path)
except: pass
full_text = " ".join([s for s in stitched if s])
return {"full_text": full_text, "segments": segments}
# ---- Gradio UI ----
with gr.Blocks(title="EYEDOL ASR — Multi-model (Yoruba + Naija English)") as demo:
gr.Markdown("## EYEDOL ASR Demo\nSelect model, upload audio or use the microphone. Supports long audio via chunking or model long-form timestamps.")
with gr.Row():
with gr.Column(scale=2):
model_choice = gr.Dropdown(list(MODEL_CHOICES.keys()), value=list(MODEL_CHOICES.keys())[0], label="Choose model")
mic_input = gr.Audio(label="Record (click Record → Stop)", type="numpy")
file_input = gr.Audio(label="Or upload audio file", type="filepath")
source = gr.Radio(["Use microphone input", "Use uploaded file"], value="Use microphone input", label="Input source")
longform = gr.Checkbox(label="Try model's built-in long-form timestamps (if supported)", value=False)
chunk_len = gr.Slider(minimum=10, maximum=120, value=25, step=5, label="Chunk length (seconds)")
overlap = gr.Slider(minimum=0.0, maximum=5.0, value=0.5, step=0.5, label="Chunk overlap (seconds)")
transcribe_btn = gr.Button("Transcribe")
gr.Markdown("**Note:** If a model is private add `HF_TOKEN` as a secret in Space settings. GPU recommended for best performance.")
with gr.Column(scale=3):
full_text_out = gr.Textbox(label="Full transcription", lines=8)
segments_out = gr.JSON(label="Segments (start_s, end_s, text)")
def handle_transcription(mic_input, file_input, source_choice, model_label, use_longform, chunk_len_s, overlap_s):
model_id = MODEL_CHOICES.get(model_label)
audio_src = mic_input if source_choice == "Use microphone input" else file_input
res = transcribe(audio_src, model_id=model_id, allow_longform_with_timestamps=use_longform, chunk_length_seconds=chunk_len_s, overlap_seconds=overlap_s)
if "error" in res:
return res["error"], []
return res["full_text"], res["segments"]
transcribe_btn.click(
fn=handle_transcription,
inputs=[mic_input, file_input, source, model_choice, longform, chunk_len, overlap],
outputs=[full_text_out, segments_out],
)
if __name__ == "__main__":
demo.launch()