Spaces:
Sleeping
Sleeping
| import os, time, re, shutil, zipfile, subprocess | |
| import gradio as gr | |
| from faster_whisper import WhisperModel | |
| # ---------- Device & Model (lazy load so startup is instant) ---------- | |
| DEVICE = "cuda" if os.path.exists("/dev/nvidia0") else "cpu" | |
| COMPUTE = "float16" if DEVICE == "cuda" else "int8" | |
| MODEL_ID = os.getenv( | |
| "VOXO_MODEL", | |
| "Systran/faster-whisper-large-v3" if DEVICE == "cuda" else "Systran/faster-whisper-small" | |
| ) | |
| _model = None | |
| def get_model(): | |
| global _model | |
| if _model is None: | |
| _model = WhisperModel(MODEL_ID, device=DEVICE, compute_type=COMPUTE) | |
| return _model | |
| # ---------- Helpers ---------- | |
| def _ts(t: float) -> str: | |
| m, s = divmod(int(t), 60); h, m = divmod(m, 60) | |
| return f"{h:02d}:{m:02d}:{s:02d}" | |
| def _fmt_hms(seconds: float) -> str: | |
| seconds = max(0, int(seconds)) | |
| m, s = divmod(seconds, 60); h, m = divmod(m, 60) | |
| if h: return f"{h}h {m:02d}m {s:02d}s" | |
| if m: return f"{m}m {s:02d}s" | |
| return f"{s}s" | |
| def _fmt_bytes(n: int) -> str: | |
| for unit in ["B","KB","MB","GB","TB"]: | |
| if n < 1024 or unit == "TB": | |
| return f"{n:.1f} {unit}" if unit != "B" else f"{n} {unit}" | |
| n /= 1024 | |
| def _safe(name: str) -> str: | |
| return re.sub(r"[^A-Za-z0-9._-]+", "_", os.path.basename(name)) | |
| def _duration_secs(path: str) -> float: | |
| try: | |
| out = subprocess.check_output( | |
| ["ffprobe","-v","error","-show_entries","format=duration", | |
| "-of","default=noprint_wrappers=1:nokey=1", path], | |
| stderr=subprocess.STDOUT, | |
| ).decode().strip() | |
| return float(out) | |
| except Exception: | |
| return 0.0 | |
| # ---------- Core Transcribe ---------- | |
| def transcribe(audio_path, language="auto", timestamps=True): | |
| if not audio_path: | |
| return "" | |
| lang = None if language == "auto" else language | |
| segments, _info = get_model().transcribe( | |
| audio_path, | |
| language=lang, | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500), | |
| beam_size=1, # fast; bump to 3–5 for more accuracy | |
| best_of=1, | |
| condition_on_previous_text=False, | |
| no_speech_threshold=0.3, | |
| ) | |
| lines = ([f"[{_ts(s.start)} – {_ts(s.end)}] {s.text.strip()}" for s in segments] | |
| if timestamps else [s.text.strip() for s in segments]) | |
| return "\n".join(lines) | |
| # ---------- Batch with live ETA (streams updates) ---------- | |
| def files_added_status(file_paths, progress=gr.Progress(track_tqdm=True)): | |
| if not file_paths: | |
| return "No files yet. Add some audio to get started.", gr.update(interactive=False) | |
| total_size, total_audio = 0, 0.0 | |
| lines = ["### Files added"] | |
| for i, p in enumerate(file_paths, 1): | |
| name = _safe(p) | |
| progress(i/len(file_paths), desc=f"Scanning {name}") | |
| size = os.path.getsize(p) if os.path.exists(p) else 0 | |
| dur = _duration_secs(p) | |
| total_size += size | |
| total_audio += dur | |
| lines.append(f"- ✅ **{name}** — {(_fmt_hms(dur) if dur else '…')} | {_fmt_bytes(size)}") | |
| lines += ["", f"**Total:** {len(file_paths)} files — {_fmt_hms(total_audio)} audio — {_fmt_bytes(total_size)}", "Ready to run the batch."] | |
| return "\n".join(lines), gr.update(interactive=True) | |
| def batch_transcribe_stream(file_paths, language="auto", timestamps=True, progress=gr.Progress(track_tqdm=True)): | |
| if not file_paths: | |
| yield "No files selected.", None | |
| return | |
| start = time.time() | |
| n = len(file_paths) | |
| durations = [(_duration_secs(p) or 0.0) for p in file_paths] | |
| total_audio = sum(durations) | |
| workdir = f"/tmp/voxo_batch_{int(start)}" | |
| os.makedirs(workdir, exist_ok=True) | |
| summary_parts, processed_audio, completed = [], 0.0, 0 | |
| def status_md(note: str = "") -> str: | |
| elapsed = time.time() - start | |
| rtf = elapsed / processed_audio if processed_audio > 0 else 0.0 | |
| remaining = max(0.0, total_audio - processed_audio) | |
| eta = remaining * rtf if processed_audio > 0 else 0.0 | |
| header = [ | |
| "### Batch Progress", | |
| f"- Files: **{completed}/{n}**", | |
| f"- Elapsed: **{_fmt_hms(elapsed)}**", | |
| f"- Audio processed: **{_fmt_hms(processed_audio)}** / {_fmt_hms(total_audio)}", | |
| f"- Est. RTF: **{rtf:.2f}**" if processed_audio else "- Est. RTF: **…**", | |
| f"- ETA: **{_fmt_hms(eta)}**" if processed_audio else "- ETA: **…**", | |
| ] | |
| if note: header.append(f"\n{note}") | |
| tail = "\n".join(summary_parts[-2:]) if summary_parts else "" | |
| return "\n".join(header) + ("\n\n" + tail if tail else "") | |
| progress(0.0, desc="Starting…") | |
| yield status_md("Preparing files…"), None | |
| for idx, path in enumerate(file_paths): | |
| name = _safe(path) | |
| file_dur = durations[idx] | |
| t0 = time.time() | |
| text = transcribe(path, language=language, timestamps=timestamps) | |
| out_txt = os.path.join(workdir, f"{os.path.splitext(name)[0]}.txt") | |
| with open(out_txt, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| wall = time.time() - t0 | |
| summary_parts.append( | |
| f"#### ✅ {name}\n- Audio: {_fmt_hms(file_dur)} | Wall: {_fmt_hms(wall)} | RTF: {(wall/max(1e-6,file_dur)):.2f}\n\n{text}\n" | |
| ) | |
| processed_audio += file_dur | |
| completed += 1 | |
| progress(completed / n, desc=f"Processed {completed}/{n}") | |
| yield status_md(), None | |
| # combined + zip | |
| combined_path = os.path.join(workdir, "_ALL_TRANSCRIPTS.txt") | |
| with open(combined_path, "w", encoding="utf-8") as f: | |
| f.write("\n\n".join(summary_parts)) | |
| zip_path = f"{workdir}.zip" | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as z: | |
| for fname in os.listdir(workdir): | |
| z.write(os.path.join(workdir, fname), arcname=fname) | |
| shutil.rmtree(workdir, ignore_errors=True) | |
| yield status_md("All done. Download the ZIP for every transcript."), zip_path | |
| # ---------- UI ---------- | |
| with gr.Blocks(title="Voxo – Audio to Text") as demo: | |
| gr.Markdown("# 🎧 Voxo\nDrop audio, get text. GPU = fast, CPU = free.") | |
| with gr.Tabs(): | |
| # Single | |
| with gr.Tab("Single file"): | |
| with gr.Row(): | |
| audio = gr.Audio(sources=["upload","microphone"], type="filepath", label="Audio (mp3/wav)") | |
| lang = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language") | |
| ts = gr.Checkbox(value=True, label="Show timestamps") | |
| btn = gr.Button("Transcribe", variant="primary") | |
| out = gr.Textbox(lines=20, label="Transcript", show_copy_button=True) | |
| btn.click(transcribe, inputs=[audio, lang, ts], outputs=out, concurrency_limit=1) | |
| # Batch (simple uploader + live ETA) | |
| with gr.Tab("Batch"): | |
| files = gr.File(file_count="multiple", type="filepath", file_types=["audio"], label="Select multiple audio files") | |
| upload_status = gr.Markdown("No files yet. Add some audio.") | |
| with gr.Row(): | |
| lang2 = gr.Dropdown(["auto","en","es","fr","de","it","pt","ja","ko","zh"], value="auto", label="Language") | |
| ts2 = gr.Checkbox(value=True, label="Show timestamps") | |
| batch_btn = gr.Button("Run Batch", variant="primary", interactive=False) | |
| batch_out = gr.Markdown("Ready.") | |
| zip_out = gr.File(label="Download transcripts (ZIP)") | |
| # Enable the Run button after files are added + show a summary | |
| files.change(files_added_status, inputs=[files], outputs=[upload_status, batch_btn]) | |
| # Stream progress + final ZIP | |
| batch_btn.click( | |
| batch_transcribe_stream, | |
| inputs=[files, lang2, ts2], | |
| outputs=[batch_out, zip_out], | |
| concurrency_limit=1 | |
| ) | |
| gr.Markdown(f"**Engine**: `{MODEL_ID}` on `{DEVICE}` ({COMPUTE}). Tip: Use an L4 GPU for large-v3 fast runs; switch back to CPU Basic to save dollars.") | |
| # Start Gradio server (Spaces-friendly) | |
| demo.queue(default_concurrency_limit=1).launch() | |