| """ |
| Voice Clone Bench β Chatterbox Multilingual zero-shot voice cloning. |
| |
| Standalone prototype to A/B open-weight voice cloning against ElevenLabs: |
| upload a reference voice -> type arbitrary text -> get speech in the cloned voice. |
| |
| Mirrors the official ResembleAI/Chatterbox-Multilingual-TTS inference path, with: |
| - a clone-first UI (reference upload is the primary input), |
| - long-text sentence chunking (so JOI-length scripts work, not just 300 chars), |
| - a clean programmatic endpoint (api_name="/clone") for later bot integration. |
| """ |
| import os |
| import random |
| import re |
| import tempfile |
| import threading |
| import uuid |
|
|
| import numpy as np |
| import soundfile as sf |
| import torch |
| import gradio as gr |
| import spaces |
|
|
| from src.chatterbox.mtl_tts import ChatterboxMultilingualTTS, SUPPORTED_LANGUAGES |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
| print(f"Running on device: {DEVICE}") |
|
|
| MODEL = None |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _MODEL_LOCK = threading.Lock() |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| DEFAULT_EXAGGERATION = 0.4 |
| DEFAULT_CFG_WEIGHT = 0.5 |
| DEFAULT_TEMPERATURE = 0.7 |
| DEFAULT_REPETITION_PENALTY = 2.0 |
| DEFAULT_MIN_P = 0.05 |
| DEFAULT_TOP_P = 1.0 |
|
|
| |
| LANGUAGE_CONFIG = { |
| "en": { |
| "audio": "https://storage.googleapis.com/chatterbox-demo-samples/mtl_prompts/en_f1.flac", |
| "text": "Last month, we reached a new milestone with two billion views on our YouTube channel.", |
| }, |
| "fr": { |
| "audio": "https://storage.googleapis.com/chatterbox-demo-samples/mtl_prompts/fr_f1.flac", |
| "text": "Le mois dernier, nous avons atteint un nouveau jalon avec deux milliards de vues.", |
| }, |
| "es": { |
| "audio": "https://storage.googleapis.com/chatterbox-demo-samples/mtl_prompts/es_f1.flac", |
| "text": "El mes pasado alcanzamos un nuevo hito: dos mil millones de visualizaciones.", |
| }, |
| "de": { |
| "audio": "https://storage.googleapis.com/chatterbox-demo-samples/mtl_prompts/de_f1.flac", |
| "text": "Letzten Monat haben wir einen neuen Meilenstein erreicht: zwei Milliarden Aufrufe.", |
| }, |
| } |
|
|
| |
| |
| CHUNK_CHARS = 280 |
|
|
|
|
| def get_or_load_model(): |
| global MODEL |
| if MODEL is None: |
| print("Loading ChatterboxMultilingualTTS ...") |
| MODEL = ChatterboxMultilingualTTS.from_pretrained(DEVICE) |
| if hasattr(MODEL, "to") and str(getattr(MODEL, "device", "")) != DEVICE: |
| MODEL.to(DEVICE) |
| print(f"Model loaded. Internal device: {getattr(MODEL, 'device', 'N/A')}") |
| return MODEL |
|
|
|
|
| |
| try: |
| get_or_load_model() |
| except Exception as e: |
| print(f"WARNING: model failed to load at startup: {e}") |
|
|
|
|
| def set_seed(seed: int): |
| torch.manual_seed(seed) |
| if DEVICE == "cuda": |
| torch.cuda.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| random.seed(seed) |
| np.random.seed(seed) |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _SEPARATOR_READY = None |
| _CLEAN_TRIM_SECONDS = 10.0 |
|
|
|
|
| def _ensure_separator(): |
| """Lazy-import demucs-onnx. Returns the callable or None if unavailable.""" |
| global _SEPARATOR_READY |
| if _SEPARATOR_READY is None: |
| try: |
| from demucs_onnx import separate_stem |
| _SEPARATOR_READY = separate_stem |
| except Exception as e: |
| print(f"WARNING: demucs-onnx unavailable, voice isolation disabled: {e}") |
| _SEPARATOR_READY = False |
| return _SEPARATOR_READY or None |
|
|
|
|
| def isolate_voice(audio_path: str) -> str: |
| """Return a path to a cleaned WAV with background music/noise removed. |
| |
| Falls back to the original clip (and warns) if separation is unavailable |
| or fails, so cloning never hard-breaks on a cleanup error. |
| """ |
| if not audio_path: |
| return audio_path |
| separate_stem = _ensure_separator() |
| if separate_stem is None: |
| raise gr.Error("Voice isolation is unavailable (demucs-onnx not installed).") |
|
|
| try: |
| sr = sf.info(audio_path).samplerate |
| except Exception: |
| sr = 44100 |
|
|
| |
| |
| |
| |
| |
| sep_input = audio_path |
| trim_path = None |
| try: |
| info = sf.info(audio_path) |
| max_frames = int(_CLEAN_TRIM_SECONDS * info.samplerate) |
| if info.frames > max_frames: |
| |
| |
| |
| |
| |
| |
| |
| full, file_sr = sf.read(audio_path, dtype="float32") |
| mono = full.mean(axis=1) if full.ndim == 2 else full |
| win = int(_CLEAN_TRIM_SECONDS * file_sr) |
| best_start = 0 |
| if mono.size > win: |
| step = max(1, int(0.25 * file_sr)) |
| power = mono.astype(np.float64) ** 2 |
| csum = np.concatenate([[0.0], np.cumsum(power)]) |
| best_energy = -1.0 |
| for start in range(0, mono.size - win + 1, step): |
| energy = csum[start + win] - csum[start] |
| if energy > best_energy: |
| best_energy = energy |
| best_start = start |
| data = full[best_start:best_start + win] |
| trim_path = os.path.join(tempfile.gettempdir(), f"cleantrim_{uuid.uuid4().hex}.wav") |
| sf.write(trim_path, data, file_sr) |
| sep_input = trim_path |
| print( |
| f"Trimmed reference for cleaning: {info.frames/info.samplerate:.1f}s " |
| f"-> {_CLEAN_TRIM_SECONDS:.1f}s (energy window @ {best_start/file_sr:.1f}s)" |
| ) |
| except Exception as e: |
| print(f"WARNING: reference trim failed, separating full clip: {e}") |
| sep_input = audio_path |
|
|
| |
| try: |
| vocals = separate_stem(sep_input, "vocals", providers="cpu") |
| finally: |
| if trim_path and os.path.exists(trim_path): |
| try: |
| os.remove(trim_path) |
| except OSError: |
| pass |
| vocals = np.asarray(vocals, dtype=np.float32) |
| if vocals.ndim == 2: |
| vocals = vocals.mean(axis=0) |
| peak = float(np.max(np.abs(vocals))) if vocals.size else 0.0 |
| if peak > 1.0: |
| vocals = vocals / peak |
|
|
| |
| |
| |
| out_path = os.path.join(tempfile.gettempdir(), f"isolated_{uuid.uuid4().hex}.wav") |
| sf.write(out_path, vocals, sr) |
| print(f"Isolated voice -> {out_path} ({len(vocals)/sr:.1f}s @ {sr}Hz)") |
| return out_path |
|
|
|
|
| def isolate_voice_ui(audio_path: str): |
| """UI/endpoint wrapper: preview the cleaned reference (api_name=/isolate_voice).""" |
| if not audio_path: |
| raise gr.Error("Upload a reference clip first.") |
| return isolate_voice(audio_path) |
|
|
|
|
| def default_audio_for_ui(lang: str): |
| return LANGUAGE_CONFIG.get(lang, {}).get("audio") |
|
|
|
|
| def default_text_for_ui(lang: str) -> str: |
| return LANGUAGE_CONFIG.get(lang, {}).get("text", "") |
|
|
|
|
| def split_into_chunks(text: str, max_chars: int = CHUNK_CHARS): |
| """Split text into <= max_chars chunks, preferring sentence boundaries.""" |
| text = " ".join((text or "").split()) |
| if not text: |
| return [] |
| if len(text) <= max_chars: |
| return [text] |
| sentences = re.split(r"(?<=[\.\!\?γοΌοΌ])\s+", text) |
| chunks, cur = [], "" |
| for sent in sentences: |
| |
| while len(sent) > max_chars: |
| head = sent[:max_chars].rsplit(" ", 1)[0] or sent[:max_chars] |
| chunks.append(head.strip()) |
| sent = sent[len(head):].strip() |
| if not cur: |
| cur = sent |
| elif len(cur) + 1 + len(sent) <= max_chars: |
| cur = f"{cur} {sent}" |
| else: |
| chunks.append(cur.strip()) |
| cur = sent |
| if cur.strip(): |
| chunks.append(cur.strip()) |
| return [c for c in chunks if c] |
|
|
|
|
| def _maybe_clean_reference(ref: str, clean_reference: bool) -> str: |
| """Optionally strip background music/noise from a user-supplied reference.""" |
| if not (clean_reference and ref): |
| return ref |
| try: |
| return isolate_voice(ref) |
| except Exception as e: |
| gr.Warning(f"Background-audio removal failed, using raw reference: {e}") |
| return ref |
|
|
|
|
| @spaces.GPU(duration=120) |
| def clone_and_speak( |
| text: str, |
| language_id: str = "en", |
| audio_prompt_path: str = None, |
| exaggeration: float = DEFAULT_EXAGGERATION, |
| cfg_weight: float = DEFAULT_CFG_WEIGHT, |
| temperature: float = DEFAULT_TEMPERATURE, |
| seed: int = 0, |
| clean_reference: bool = False, |
| repetition_penalty: float = DEFAULT_REPETITION_PENALTY, |
| min_p: float = DEFAULT_MIN_P, |
| top_p: float = DEFAULT_TOP_P, |
| ): |
| """ |
| Clone the voice in `audio_prompt_path` and speak `text` in language `language_id`. |
| |
| Args: |
| text: text to synthesize (long scripts are auto-chunked). |
| language_id: language code (en, fr, de, es, it, pt, hi, ja, zh, ...). |
| audio_prompt_path: path/URL to a reference voice clip. If omitted, a |
| built-in sample voice for the language is used. |
| exaggeration: expressiveness (0.25-2.0; ~0.4 = neutral/faithful clone). |
| cfg_weight: CFG / pacing (0.0-1.0; lower ~0.3 = faster pace, 0.0 for |
| cross-lingual transfer; 0.5 = balanced default). |
| temperature: sampling randomness (0.05-2.0; lower = more consistent). |
| seed: 0 for random, otherwise reproducible. |
| clean_reference: if True, isolate the voice (remove background music/ |
| noise) from the uploaded reference before cloning. |
| repetition_penalty: discourages repeated tokens (model default 2.0). |
| min_p: min-p nucleus floor (model default 0.05). |
| top_p: top-p nucleus threshold (model default 1.0). |
| |
| Returns: |
| (sample_rate, waveform) tuple consumable by gr.Audio. |
| """ |
| model = get_or_load_model() |
| if model is None: |
| raise RuntimeError("TTS model is not loaded.") |
|
|
| if not text or not text.strip(): |
| raise gr.Error("Please enter some text to speak.") |
|
|
| ref = audio_prompt_path or default_audio_for_ui(language_id) |
| if not ref: |
| raise gr.Error("Upload a reference audio clip to clone (or pick a language with a built-in sample).") |
|
|
| |
| |
| if audio_prompt_path: |
| ref = _maybe_clean_reference(ref, clean_reference) |
|
|
| lang = (language_id or "en").lower() |
| chunks = split_into_chunks(text) |
| print(f"Cloning voice | lang={lang} | chunks={len(chunks)} | clean_ref={clean_reference} | ref={ref}") |
|
|
| sr = model.sr |
| silence = np.zeros(int(0.15 * sr), dtype=np.float32) |
| pieces = [] |
|
|
| |
| |
| |
| |
| |
| |
| |
| with _MODEL_LOCK: |
| if seed and int(seed) != 0: |
| set_seed(int(seed)) |
|
|
| |
| |
| model.prepare_conditionals(ref, exaggeration=exaggeration) |
|
|
| for i, chunk in enumerate(chunks): |
| wav = model.generate( |
| chunk, |
| language_id=lang, |
| audio_prompt_path=None, |
| exaggeration=exaggeration, |
| cfg_weight=cfg_weight, |
| temperature=temperature, |
| repetition_penalty=repetition_penalty, |
| min_p=min_p, |
| top_p=top_p, |
| ) |
| arr = wav.squeeze(0).detach().cpu().numpy().astype(np.float32) |
| pieces.append(arr) |
| if i != len(chunks) - 1: |
| pieces.append(silence) |
|
|
| full = np.concatenate(pieces) if pieces else np.zeros(1, dtype=np.float32) |
| print("Generation complete.") |
| return (sr, full) |
|
|
|
|
| def on_language_change(lang): |
| return default_audio_for_ui(lang), default_text_for_ui(lang) |
|
|
|
|
| with gr.Blocks(title="Voice Clone Bench") as demo: |
| gr.Markdown( |
| """ |
| # ποΈ Voice Clone Bench β Chatterbox (zero-shot) |
| Upload a **reference voice**, type **any text**, get speech **in that cloned voice**. |
| Built to A/B against ElevenLabs. Model: Chatterbox Multilingual (Resemble AI, MIT). |
| """ |
| ) |
| with gr.Row(): |
| with gr.Column(): |
| ref_wav = gr.Audio( |
| sources=["upload", "microphone"], |
| type="filepath", |
| label="β Reference voice to clone (5β20s clean speech). Empty = built-in sample.", |
| value=default_audio_for_ui("en"), |
| ) |
| clean_reference = gr.Checkbox( |
| value=False, |
| label="π§Ή Remove background audio from reference (isolate voice before cloning)", |
| info="Strips music/noise with HT-Demucs so the clone is built from clean speech.", |
| ) |
| preview_btn = gr.Button("π§Ή Preview cleaned reference", size="sm") |
| cleaned_preview = gr.Audio(label="Isolated voice (preview)", visible=True) |
| language_id = gr.Dropdown( |
| choices=list(ChatterboxMultilingualTTS.get_supported_languages().keys()), |
| value="en", |
| label="β‘ Language", |
| ) |
| text = gr.Textbox( |
| value=default_text_for_ui("en"), |
| label="β’ Text to speak (long scripts are auto-chunked)", |
| lines=5, |
| max_lines=20, |
| ) |
| with gr.Accordion("Cloning controls (tuned for faithful voice cloning)", open=True): |
| exaggeration = gr.Slider( |
| 0.0, 2.0, step=0.05, value=DEFAULT_EXAGGERATION, |
| label="Exaggeration β lower = more neutral/faithful (β0.4); 0.7+ = expressive", |
| ) |
| cfg_weight = gr.Slider( |
| 0.0, 1.0, step=0.05, value=DEFAULT_CFG_WEIGHT, |
| label="CFG / Pace β 0.5 balanced; ~0.3 faster; 0.0 for cross-lingual", |
| ) |
| temperature = gr.Slider( |
| 0.05, 2.0, step=0.05, value=DEFAULT_TEMPERATURE, |
| label="Temperature β lower = more consistent/faithful (β0.7)", |
| ) |
| seed = gr.Number(value=0, label="Seed (0 = random)") |
| with gr.Accordion("Sampling (advanced)", open=False): |
| repetition_penalty = gr.Slider( |
| 1.0, 2.5, step=0.05, value=DEFAULT_REPETITION_PENALTY, |
| label="Repetition penalty (default 2.0)", |
| ) |
| min_p = gr.Slider(0.0, 0.5, step=0.01, value=DEFAULT_MIN_P, label="min_p (default 0.05)") |
| top_p = gr.Slider(0.1, 1.0, step=0.05, value=DEFAULT_TOP_P, label="top_p (default 1.0)") |
| run_btn = gr.Button("Clone & Speak", variant="primary") |
| with gr.Column(): |
| audio_output = gr.Audio(label="Cloned speech output") |
|
|
| language_id.change(fn=on_language_change, inputs=[language_id], outputs=[ref_wav, text], show_progress=False) |
|
|
| preview_btn.click( |
| fn=isolate_voice_ui, |
| inputs=[ref_wav], |
| outputs=[cleaned_preview], |
| api_name="isolate_voice", |
| ) |
|
|
| run_btn.click( |
| fn=clone_and_speak, |
| inputs=[ |
| text, language_id, ref_wav, exaggeration, cfg_weight, temperature, seed, |
| clean_reference, repetition_penalty, min_p, top_p, |
| ], |
| outputs=[audio_output], |
| api_name="clone", |
| ) |
|
|
| if __name__ == "__main__": |
| demo.queue(max_size=20).launch(mcp_server=True) |
|
|