import os, time, re, shutil, zipfile, subprocess import gradio as gr from faster_whisper import WhisperModel # ---------- Device & Model (lazy load so startup is instant) ---------- DEVICE = "cuda" if os.path.exists("/dev/nvidia0") else "cpu" COMPUTE = "float16" if DEVICE == "cuda" else "int8" MODEL_ID = os.getenv( "VOXO_MODEL", "Systran/faster-whisper-large-v3" if DEVICE == "cuda" else "Systran/faster-whisper-small" ) _model = None def get_model(): global _model if _model is None: _model = WhisperModel(MODEL_ID, device=DEVICE, compute_type=COMPUTE) return _model # ---------- Helpers ---------- def _ts(t: float) -> str: m, s = divmod(int(t), 60); h, m = divmod(m, 60) return f"{h:02d}:{m:02d}:{s:02d}" def _fmt_hms(seconds: float) -> str: seconds = max(0, int(seconds)) m, s = divmod(seconds, 60); h, m = divmod(m, 60) if h: return f"{h}h {m:02d}m {s:02d}s" if m: return f"{m}m {s:02d}s" return f"{s}s" def _fmt_bytes(n: int) -> str: for unit in ["B","KB","MB","GB","TB"]: if n < 1024 or unit == "TB": return f"{n:.1f} {unit}" if unit != "B" else f"{n} {unit}" n /= 1024 def _safe(name: str) -> str: return re.sub(r"[^A-Za-z0-9._-]+", "_", os.path.basename(name)) def _duration_secs(path: str) -> float: try: out = subprocess.check_output( ["ffprobe","-v","error","-show_entries","format=duration", "-of","default=noprint_wrappers=1:nokey=1", path], stderr=subprocess.STDOUT, ).decode().strip() return float(out) except Exception: return 0.0 # ---------- Core Transcribe ---------- def transcribe(audio_path, language="auto", timestamps=True): if not audio_path: return "" lang = None if language == "auto" else language segments, _info = get_model().transcribe( audio_path, language=lang, vad_filter=True, vad_parameters=dict(min_silence_duration_ms=500), beam_size=1, # fast; bump to 3–5 for more accuracy best_of=1, condition_on_previous_text=False, no_speech_threshold=0.3, ) lines = ([f"[{_ts(s.start)} – {_ts(s.end)}] {s.text.strip()}" for s in segments] if timestamps else [s.text.strip() for s in segments]) return "\n".join(lines) # ---------- Batch with live ETA (streams updates) ---------- def files_added_status(file_paths, progress=gr.Progress(track_tqdm=True)): if not file_paths: return "No files yet. Add some audio to get started.", gr.update(interactive=False) total_size, total_audio = 0, 0.0 lines = ["### Files added"] for i, p in enumerate(file_paths, 1): name = _safe(p) progress(i/len(file_paths), desc=f"Scanning {name}") size = os.path.getsize(p) if os.path.exists(p) else 0 dur = _duration_secs(p) total_size += size total_audio += dur lines.append(f"- ✅ **{name}** — {(_fmt_hms(dur) if dur else '…')} | {_fmt_bytes(size)}") lines += ["", f"**Total:** {len(file_paths)} files — {_fmt_hms(total_audio)} audio — {_fmt_bytes(total_size)}", "Ready to run the batch."] return "\n".join(lines), gr.update(interactive=True) def batch_transcribe_stream(file_paths, language="auto", timestamps=True, progress=gr.Progress(track_tqdm=True)): if not file_paths: yield "No files selected.", None return start = time.time() n = len(file_paths) durations = [(_duration_secs(p) or 0.0) for p in file_paths] total_audio = sum(durations) workdir = f"/tmp/voxo_batch_{int(start)}" os.makedirs(workdir, exist_ok=True) summary_parts, processed_audio, completed = [], 0.0, 0 def status_md(note: str = "") -> str: elapsed = time.time() - start rtf = elapsed / processed_audio if processed_audio > 0 else 0.0 remaining = max(0.0, total_audio - processed_audio) eta = remaining * rtf if processed_audio > 0 else 0.0 header = [ "### Batch Progress", f"- Files: **{completed}/{n}**", f"- Elapsed: **{_fmt_hms(elapsed)}**", f"- Audio processed: **{_fmt_hms(processed_audio)}** / {_fmt_hms(total_audio)}", f"- Est. RTF: **{rtf:.2f}**" if processed_audio else "- Est. RTF: **…**", f"- ETA: **{_fmt_hms(eta)}**" if processed_audio else "- ETA: **…**", ] if note: header.append(f"\n{note}") tail = "\n".join(summary_parts[-2:]) if summary_parts else "" return "\n".join(header) + ("\n\n" + tail if tail else "") progress(0.0, desc="Starting…") yield status_md("Preparing files…"), None for idx, path in enumerate(file_paths): name = _safe(path) file_dur = durations[idx] t0 = time.time() text = transcribe(path, language=language, timestamps=timestamps) out_txt = os.path.join(workdir, f"{os.path.splitext(name)[0]}.txt") with open(out_txt, "w", encoding="utf-8") as f: f.write(text) wall = time.time() - t0 summary_parts.append( f"#### ✅ {name}\n- Audio: {_fmt_hms(file_dur)} | Wall: {_fmt_hms(wall)} | RTF: {(wall/max(1e-6,file_dur)):.2f}\n\n{text}\n" ) processed_audio += file_dur completed += 1 progress(completed / n, desc=f"Processed {completed}/{n}") yield status_md(), None # combined + zip combined_path = os.path.join(workdir, "_ALL_TRANSCRIPTS.txt") with open(combined_path, "w", encoding="utf-8") as f: f.write("\n\n".join(summary_parts)) zip_path = f"{workdir}.zip" with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z: for fname in os.listdir(workdir): z.write(os.path.join(workdir, fname), arcname=fname) shutil.rmtree(workdir, ignore_errors=True) yield status_md("All done. Download the ZIP for every transcript."), zip_path # ---------- UI ---------- with gr.Blocks(title="Voxo – Audio to Text") as demo: gr.Markdown("# 🎧 Voxo\nDrop audio, get text. GPU = fast, CPU = free.") with gr.Tabs(): # Single with gr.Tab("Single file"): with gr.Row(): audio = gr.Audio(sources=["upload","microphone"], type="filepath", label="Audio (mp3/wav)") lang = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language") ts = gr.Checkbox(value=True, label="Show timestamps") btn = gr.Button("Transcribe", variant="primary") out = gr.Textbox(lines=20, label="Transcript", show_copy_button=True) btn.click(transcribe, inputs=[audio, lang, ts], outputs=out, concurrency_limit=1) # Batch (simple uploader + live ETA) with gr.Tab("Batch"): files = gr.File(file_count="multiple", type="filepath", file_types=["audio"], label="Select multiple audio files") upload_status = gr.Markdown("No files yet. Add some audio.") with gr.Row(): lang2 = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language") ts2 = gr.Checkbox(value=True, label="Show timestamps") batch_btn = gr.Button("Run Batch", variant="primary", interactive=False) batch_out = gr.Markdown("Ready.") zip_out = gr.File(label="Download transcripts (ZIP)") # Enable the Run button after files are added + show a summary files.change(files_added_status, inputs=[files], outputs=[upload_status, batch_btn]) # Stream progress + final ZIP batch_btn.click( batch_transcribe_stream, inputs=[files, lang2, ts2], outputs=[batch_out, zip_out], concurrency_limit=1 ) gr.Markdown(f"**Engine**: `{MODEL_ID}` on `{DEVICE}` ({COMPUTE}). Tip: Use an L4 GPU for large-v3 fast runs; switch back to CPU Basic to save dollars.") # Start Gradio server (Spaces-friendly) demo.queue(default_concurrency_limit=1).launch()