voxo / app.py
frankmcmahen's picture
Update app.py
d0f8862 verified
import os, time, re, shutil, zipfile, subprocess
import gradio as gr
from faster_whisper import WhisperModel
# ---------- Device & Model (lazy load so startup is instant) ----------
DEVICE = "cuda" if os.path.exists("/dev/nvidia0") else "cpu"
COMPUTE = "float16" if DEVICE == "cuda" else "int8"
MODEL_ID = os.getenv(
"VOXO_MODEL",
"Systran/faster-whisper-large-v3" if DEVICE == "cuda" else "Systran/faster-whisper-small"
)
_model = None
def get_model():
global _model
if _model is None:
_model = WhisperModel(MODEL_ID, device=DEVICE, compute_type=COMPUTE)
return _model
# ---------- Helpers ----------
def _ts(t: float) -> str:
m, s = divmod(int(t), 60); h, m = divmod(m, 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def _fmt_hms(seconds: float) -> str:
seconds = max(0, int(seconds))
m, s = divmod(seconds, 60); h, m = divmod(m, 60)
if h: return f"{h}h {m:02d}m {s:02d}s"
if m: return f"{m}m {s:02d}s"
return f"{s}s"
def _fmt_bytes(n: int) -> str:
for unit in ["B","KB","MB","GB","TB"]:
if n < 1024 or unit == "TB":
return f"{n:.1f} {unit}" if unit != "B" else f"{n} {unit}"
n /= 1024
def _safe(name: str) -> str:
return re.sub(r"[^A-Za-z0-9._-]+", "_", os.path.basename(name))
def _duration_secs(path: str) -> float:
try:
out = subprocess.check_output(
["ffprobe","-v","error","-show_entries","format=duration",
"-of","default=noprint_wrappers=1:nokey=1", path],
stderr=subprocess.STDOUT,
).decode().strip()
return float(out)
except Exception:
return 0.0
# ---------- Core Transcribe ----------
def transcribe(audio_path, language="auto", timestamps=True):
if not audio_path:
return ""
lang = None if language == "auto" else language
segments, _info = get_model().transcribe(
audio_path,
language=lang,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
beam_size=1, # fast; bump to 3–5 for more accuracy
best_of=1,
condition_on_previous_text=False,
no_speech_threshold=0.3,
)
lines = ([f"[{_ts(s.start)}{_ts(s.end)}] {s.text.strip()}" for s in segments]
if timestamps else [s.text.strip() for s in segments])
return "\n".join(lines)
# ---------- Batch with live ETA (streams updates) ----------
def files_added_status(file_paths, progress=gr.Progress(track_tqdm=True)):
if not file_paths:
return "No files yet. Add some audio to get started.", gr.update(interactive=False)
total_size, total_audio = 0, 0.0
lines = ["### Files added"]
for i, p in enumerate(file_paths, 1):
name = _safe(p)
progress(i/len(file_paths), desc=f"Scanning {name}")
size = os.path.getsize(p) if os.path.exists(p) else 0
dur = _duration_secs(p)
total_size += size
total_audio += dur
lines.append(f"- ✅ **{name}** — {(_fmt_hms(dur) if dur else '…')} | {_fmt_bytes(size)}")
lines += ["", f"**Total:** {len(file_paths)} files — {_fmt_hms(total_audio)} audio — {_fmt_bytes(total_size)}", "Ready to run the batch."]
return "\n".join(lines), gr.update(interactive=True)
def batch_transcribe_stream(file_paths, language="auto", timestamps=True, progress=gr.Progress(track_tqdm=True)):
if not file_paths:
yield "No files selected.", None
return
start = time.time()
n = len(file_paths)
durations = [(_duration_secs(p) or 0.0) for p in file_paths]
total_audio = sum(durations)
workdir = f"/tmp/voxo_batch_{int(start)}"
os.makedirs(workdir, exist_ok=True)
summary_parts, processed_audio, completed = [], 0.0, 0
def status_md(note: str = "") -> str:
elapsed = time.time() - start
rtf = elapsed / processed_audio if processed_audio > 0 else 0.0
remaining = max(0.0, total_audio - processed_audio)
eta = remaining * rtf if processed_audio > 0 else 0.0
header = [
"### Batch Progress",
f"- Files: **{completed}/{n}**",
f"- Elapsed: **{_fmt_hms(elapsed)}**",
f"- Audio processed: **{_fmt_hms(processed_audio)}** / {_fmt_hms(total_audio)}",
f"- Est. RTF: **{rtf:.2f}**" if processed_audio else "- Est. RTF: **…**",
f"- ETA: **{_fmt_hms(eta)}**" if processed_audio else "- ETA: **…**",
]
if note: header.append(f"\n{note}")
tail = "\n".join(summary_parts[-2:]) if summary_parts else ""
return "\n".join(header) + ("\n\n" + tail if tail else "")
progress(0.0, desc="Starting…")
yield status_md("Preparing files…"), None
for idx, path in enumerate(file_paths):
name = _safe(path)
file_dur = durations[idx]
t0 = time.time()
text = transcribe(path, language=language, timestamps=timestamps)
out_txt = os.path.join(workdir, f"{os.path.splitext(name)[0]}.txt")
with open(out_txt, "w", encoding="utf-8") as f:
f.write(text)
wall = time.time() - t0
summary_parts.append(
f"#### ✅ {name}\n- Audio: {_fmt_hms(file_dur)} | Wall: {_fmt_hms(wall)} | RTF: {(wall/max(1e-6,file_dur)):.2f}\n\n{text}\n"
)
processed_audio += file_dur
completed += 1
progress(completed / n, desc=f"Processed {completed}/{n}")
yield status_md(), None
# combined + zip
combined_path = os.path.join(workdir, "_ALL_TRANSCRIPTS.txt")
with open(combined_path, "w", encoding="utf-8") as f:
f.write("\n\n".join(summary_parts))
zip_path = f"{workdir}.zip"
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z:
for fname in os.listdir(workdir):
z.write(os.path.join(workdir, fname), arcname=fname)
shutil.rmtree(workdir, ignore_errors=True)
yield status_md("All done. Download the ZIP for every transcript."), zip_path
# ---------- UI ----------
with gr.Blocks(title="Voxo – Audio to Text") as demo:
gr.Markdown("# 🎧 Voxo\nDrop audio, get text. GPU = fast, CPU = free.")
with gr.Tabs():
# Single
with gr.Tab("Single file"):
with gr.Row():
audio = gr.Audio(sources=["upload","microphone"], type="filepath", label="Audio (mp3/wav)")
lang = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language")
ts = gr.Checkbox(value=True, label="Show timestamps")
btn = gr.Button("Transcribe", variant="primary")
out = gr.Textbox(lines=20, label="Transcript", show_copy_button=True)
btn.click(transcribe, inputs=[audio, lang, ts], outputs=out, concurrency_limit=1)
# Batch (simple uploader + live ETA)
with gr.Tab("Batch"):
files = gr.File(file_count="multiple", type="filepath", file_types=["audio"], label="Select multiple audio files")
upload_status = gr.Markdown("No files yet. Add some audio.")
with gr.Row():
lang2 = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language")
ts2 = gr.Checkbox(value=True, label="Show timestamps")
batch_btn = gr.Button("Run Batch", variant="primary", interactive=False)
batch_out = gr.Markdown("Ready.")
zip_out = gr.File(label="Download transcripts (ZIP)")
# Enable the Run button after files are added + show a summary
files.change(files_added_status, inputs=[files], outputs=[upload_status, batch_btn])
# Stream progress + final ZIP
batch_btn.click(
batch_transcribe_stream,
inputs=[files, lang2, ts2],
outputs=[batch_out, zip_out],
concurrency_limit=1
)
gr.Markdown(f"**Engine**: `{MODEL_ID}` on `{DEVICE}` ({COMPUTE}). Tip: Use an L4 GPU for large-v3 fast runs; switch back to CPU Basic to save dollars.")
# Start Gradio server (Spaces-friendly)
demo.queue(default_concurrency_limit=1).launch()