|
|
import os |
|
|
import tempfile |
|
|
import math |
|
|
import torch |
|
|
import soundfile as sf |
|
|
from transformers import pipeline |
|
|
import gradio as gr |
|
|
from pydub import AudioSegment |
|
|
|
|
|
|
|
|
MODEL_CHOICES = { |
|
|
"Yoruba (EYEDOL/Yoruba-ASRNEW)": "EYEDOL/Yoruba-ASRNEW", |
|
|
"Naija English (EYEDOL/NAIJA_ENG-ASRNEW)": "EYEDOL/NAIJA_ENG-ASRNEW", |
|
|
} |
|
|
|
|
|
|
|
|
DEVICE = 0 if torch.cuda.is_available() else -1 |
|
|
|
|
|
|
|
|
PIPELINE_CACHE = {} |
|
|
|
|
|
def get_asr_pipeline(model_id: str): |
|
|
"""Return a cached pipeline for model_id or create a new one.""" |
|
|
if model_id in PIPELINE_CACHE: |
|
|
return PIPELINE_CACHE[model_id] |
|
|
|
|
|
asr = pipeline("automatic-speech-recognition", model=model_id, device=DEVICE) |
|
|
PIPELINE_CACHE[model_id] = asr |
|
|
return asr |
|
|
|
|
|
|
|
|
def save_numpy_to_wav(np_tuple): |
|
|
samplerate, data = np_tuple |
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
sf.write(tmp.name, data, samplerate) |
|
|
return tmp.name |
|
|
|
|
|
def get_duration_seconds(path): |
|
|
try: |
|
|
info = sf.info(path) |
|
|
return info.duration |
|
|
except Exception: |
|
|
seg = AudioSegment.from_file(path) |
|
|
return len(seg) / 1000.0 |
|
|
|
|
|
def split_audio_file(path, chunk_length_ms=25000, overlap_ms=500): |
|
|
audio = AudioSegment.from_file(path) |
|
|
duration_ms = len(audio) |
|
|
chunks = [] |
|
|
start = 0 |
|
|
while start < duration_ms: |
|
|
end = min(start + chunk_length_ms, duration_ms) |
|
|
chunk = audio[start:end] |
|
|
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
|
|
chunk.export(tmp.name, format="wav") |
|
|
chunks.append((tmp.name, start, end)) |
|
|
start += max(1, chunk_length_ms - overlap_ms) |
|
|
return chunks |
|
|
|
|
|
def transcribe_file_with_pipeline(asr_pipeline, path, return_timestamps=False): |
|
|
|
|
|
if return_timestamps: |
|
|
return asr_pipeline(path, return_timestamps=True) |
|
|
else: |
|
|
return asr_pipeline(path) |
|
|
|
|
|
def transcribe(audio_input, model_id, allow_longform_with_timestamps=False, chunk_length_seconds=25, overlap_seconds=0.5): |
|
|
""" |
|
|
audio_input: either (sr, numpy_array) from mic (type="numpy") or filepath from upload (type="filepath") |
|
|
model_id: Hugging Face model id string |
|
|
Returns dict: {"full_text": str, "segments": [{start_s,end_s,text}, ...]} |
|
|
""" |
|
|
if audio_input is None: |
|
|
return {"error": "No audio provided."} |
|
|
|
|
|
|
|
|
created_tmp_input = False |
|
|
if isinstance(audio_input, tuple): |
|
|
audio_path = save_numpy_to_wav(audio_input) |
|
|
created_tmp_input = True |
|
|
else: |
|
|
audio_path = audio_input |
|
|
|
|
|
duration_s = get_duration_seconds(audio_path) |
|
|
asr = get_asr_pipeline(model_id) |
|
|
|
|
|
|
|
|
if duration_s <= 30: |
|
|
out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=False) |
|
|
text = out.get("text", out) if isinstance(out, dict) else str(out) |
|
|
segments = [{"start_s": 0.0, "end_s": duration_s, "text": text}] |
|
|
full_text = text |
|
|
if created_tmp_input: |
|
|
try: os.unlink(audio_path) |
|
|
except: pass |
|
|
return {"full_text": full_text, "segments": segments} |
|
|
|
|
|
|
|
|
if allow_longform_with_timestamps: |
|
|
try: |
|
|
out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=True) |
|
|
|
|
|
full_text = out.get("text", None) if isinstance(out, dict) else str(out) |
|
|
segments = [] |
|
|
|
|
|
if isinstance(out, dict): |
|
|
if "chunks" in out and isinstance(out["chunks"], list): |
|
|
for c in out["chunks"]: |
|
|
|
|
|
ts = c.get("timestamp", None) |
|
|
if isinstance(ts, list) and len(ts) == 2: |
|
|
start_s, end_s = ts[0], ts[1] |
|
|
else: |
|
|
start_s = c.get("start", None) |
|
|
end_s = c.get("end", None) |
|
|
segments.append({"start_s": start_s, "end_s": end_s, "text": c.get("text", "")}) |
|
|
elif "segments" in out and isinstance(out["segments"], list): |
|
|
for s in out["segments"]: |
|
|
segments.append({"start_s": s.get("start", None), "end_s": s.get("end", None), "text": s.get("text", "")}) |
|
|
elif "words" in out and isinstance(out["words"], list): |
|
|
for w in out["words"]: |
|
|
segments.append({"start_s": w.get("start", None), "end_s": w.get("end", None), "text": w.get("word", "")}) |
|
|
else: |
|
|
|
|
|
if full_text is None: |
|
|
full_text = str(out) |
|
|
segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] |
|
|
else: |
|
|
|
|
|
full_text = str(out) |
|
|
segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] |
|
|
|
|
|
if created_tmp_input: |
|
|
try: os.unlink(audio_path) |
|
|
except: pass |
|
|
return {"full_text": full_text, "segments": segments} |
|
|
except Exception as e: |
|
|
|
|
|
print("Long-form timestamps failed; falling back to chunking:", e) |
|
|
|
|
|
|
|
|
chunk_length_ms = int(chunk_length_seconds * 1000) |
|
|
overlap_ms = int(overlap_seconds * 1000) |
|
|
chunks = split_audio_file(audio_path, chunk_length_ms=chunk_length_ms, overlap_ms=overlap_ms) |
|
|
stitched = [] |
|
|
segments = [] |
|
|
for chunk_path, start_ms, end_ms in chunks: |
|
|
try: |
|
|
out = transcribe_file_with_pipeline(asr, chunk_path, return_timestamps=False) |
|
|
text = out.get("text", out) if isinstance(out, dict) else str(out) |
|
|
except Exception as e: |
|
|
text = f"[ERROR on chunk: {e}]" |
|
|
start_s = start_ms / 1000.0 |
|
|
end_s = end_ms / 1000.0 |
|
|
segments.append({"start_s": start_s, "end_s": end_s, "text": text}) |
|
|
stitched.append(text) |
|
|
try: os.unlink(chunk_path) |
|
|
except: pass |
|
|
|
|
|
if created_tmp_input: |
|
|
try: os.unlink(audio_path) |
|
|
except: pass |
|
|
|
|
|
full_text = " ".join([s for s in stitched if s]) |
|
|
return {"full_text": full_text, "segments": segments} |
|
|
|
|
|
|
|
|
with gr.Blocks(title="EYEDOL ASR — Multi-model (Yoruba + Naija English)") as demo: |
|
|
gr.Markdown("## EYEDOL ASR Demo\nSelect model, upload audio or use the microphone. Supports long audio via chunking or model long-form timestamps.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
model_choice = gr.Dropdown(list(MODEL_CHOICES.keys()), value=list(MODEL_CHOICES.keys())[0], label="Choose model") |
|
|
mic_input = gr.Audio(label="Record (click Record → Stop)", type="numpy") |
|
|
file_input = gr.Audio(label="Or upload audio file", type="filepath") |
|
|
source = gr.Radio(["Use microphone input", "Use uploaded file"], value="Use microphone input", label="Input source") |
|
|
longform = gr.Checkbox(label="Try model's built-in long-form timestamps (if supported)", value=False) |
|
|
chunk_len = gr.Slider(minimum=10, maximum=120, value=25, step=5, label="Chunk length (seconds)") |
|
|
overlap = gr.Slider(minimum=0.0, maximum=5.0, value=0.5, step=0.5, label="Chunk overlap (seconds)") |
|
|
transcribe_btn = gr.Button("Transcribe") |
|
|
gr.Markdown("**Note:** If a model is private add `HF_TOKEN` as a secret in Space settings. GPU recommended for best performance.") |
|
|
with gr.Column(scale=3): |
|
|
full_text_out = gr.Textbox(label="Full transcription", lines=8) |
|
|
segments_out = gr.JSON(label="Segments (start_s, end_s, text)") |
|
|
|
|
|
def handle_transcription(mic_input, file_input, source_choice, model_label, use_longform, chunk_len_s, overlap_s): |
|
|
model_id = MODEL_CHOICES.get(model_label) |
|
|
audio_src = mic_input if source_choice == "Use microphone input" else file_input |
|
|
res = transcribe(audio_src, model_id=model_id, allow_longform_with_timestamps=use_longform, chunk_length_seconds=chunk_len_s, overlap_seconds=overlap_s) |
|
|
if "error" in res: |
|
|
return res["error"], [] |
|
|
return res["full_text"], res["segments"] |
|
|
|
|
|
transcribe_btn.click( |
|
|
fn=handle_transcription, |
|
|
inputs=[mic_input, file_input, source, model_choice, longform, chunk_len, overlap], |
|
|
outputs=[full_text_out, segments_out], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|