| import os |
|
|
| os.environ["PYTHONUTF8"] = "1" |
| os.environ["PYTHONIOENCODING"] = "utf-8" |
| import sys |
|
|
| sys.stdout.reconfigure(encoding="utf-8") |
| sys.stderr.reconfigure(encoding="utf-8") |
|
|
| import re |
| import gc |
| import tempfile |
| import subprocess |
| import shutil |
| import threading |
| from pathlib import Path |
|
|
| os.environ["TOKENIZERS_PARALLELISM"] = "false" |
| os.environ["OMP_NUM_THREADS"] = str(os.cpu_count() or 1) |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" |
|
|
| import gradio as gr |
| from pydub import AudioSegment |
|
|
| SECRET = os.environ.get("API_SECRET", "") |
| MODEL_DIR = Path(os.environ.get("XTTS_MODEL_DIR", r"C:\tmp\xtts-v2-weights")) |
|
|
| inference_lock = threading.Lock() |
| _tts_instance = None |
|
|
| print("=== Chronis XTTS-v2 Space Booting ===", flush=True) |
|
|
|
|
| def setup(): |
| """Install Coqui TTS if needed and download XTTS-v2 weights once.""" |
| try: |
| import TTS |
| print("[setup] TTS library already installed.", flush=True) |
| except ImportError: |
| print("[setup] Installing TTS library ...", flush=True) |
| subprocess.run([sys.executable, "-m", "pip", "install", "TTS", "-q"], check=True) |
| print("[setup] TTS library installed.", flush=True) |
|
|
| MODEL_DIR.mkdir(parents=True, exist_ok=True) |
| config_path = MODEL_DIR / "config.json" |
| if not config_path.exists(): |
| print("[setup] Downloading XTTS-v2 weights ...", flush=True) |
| from huggingface_hub import snapshot_download |
|
|
| snapshot_download( |
| repo_id="coqui/XTTS-v2", |
| local_dir=str(MODEL_DIR), |
| local_dir_use_symlinks=False, |
| ) |
| print("[setup] XTTS-v2 weights downloaded.", flush=True) |
| else: |
| print("[setup] XTTS-v2 weights already present.", flush=True) |
|
|
|
|
| def get_tts(): |
| """Lazy-load model once per process.""" |
| global _tts_instance |
| if _tts_instance is None: |
| from TTS.api import TTS |
|
|
| print("[tts] Loading XTTS-v2 model ...", flush=True) |
| _tts_instance = TTS( |
| model_path=str(MODEL_DIR), |
| config_path=str(MODEL_DIR / "config.json"), |
| progress_bar=False, |
| gpu=False, |
| ) |
| print("[tts] Model loaded", flush=True) |
| return _tts_instance |
|
|
|
|
| def clean_text(text: str) -> str: |
| text = re.sub(r"[^\x00-\x7F]+", " ", text) |
| text = re.sub(r"\s+", " ", text).strip() |
| if len(text) < 3: |
| return "I am here with you." |
| return text[:500] |
|
|
|
|
| def split_sentences(text: str, max_chars: int = 200) -> list[str]: |
| parts = re.split(r"(?<=[.!?])\s+", text) |
| chunks: list[str] = [] |
| buf = "" |
| for p in parts: |
| if len(buf) + len(p) < max_chars: |
| buf = (buf + " " + p).strip() |
| else: |
| if buf: |
| chunks.append(buf) |
| buf = p |
| if buf: |
| chunks.append(buf) |
| return chunks or [text] |
|
|
|
|
| def prepare_ref_audio(ref_path: str) -> str: |
| """Normalize to mono 24k WAV and cap to 10 seconds.""" |
| audio = AudioSegment.from_file(ref_path) |
| audio = audio.set_channels(1).set_frame_rate(24000).normalize() |
|
|
| if len(audio) > 10000: |
| audio = audio[:10000] |
| elif len(audio) < 1000: |
| raise ValueError(f"Reference audio too short ({len(audio)} ms). Need at least 1 second.") |
|
|
| fd, tmp_path = tempfile.mkstemp(suffix=".wav") |
| os.close(fd) |
| audio.export(tmp_path, format="wav") |
| return tmp_path |
|
|
|
|
| def run_chunk(tts, text: str, ref_audio: str, out_path: str): |
| tts.tts_to_file( |
| text=text, |
| speaker_wav=ref_audio, |
| language="en", |
| file_path=out_path, |
| ) |
|
|
|
|
| def synthesize(text: str, ref_audio_path: str, secret: str): |
| with inference_lock: |
| if SECRET and secret != SECRET: |
| return None, "Unauthorized" |
|
|
| if not ref_audio_path or not Path(ref_audio_path).exists(): |
| return None, "Reference audio missing or not uploaded" |
|
|
| try: |
| setup() |
| except Exception as e: |
| return None, f"Setup failed: {e}" |
|
|
| cleaned = clean_text(text) |
| chunks = split_sentences(cleaned) |
| workdir = Path(tempfile.mkdtemp(prefix="chronis_xtts_")) |
|
|
| clean_ref = None |
| tmp_out = None |
| try: |
| clean_ref = prepare_ref_audio(ref_audio_path) |
| tts = get_tts() |
| combined = AudioSegment.empty() |
|
|
| for i, chunk in enumerate(chunks): |
| print(f"[synth] chunk {i+1}/{len(chunks)}: {chunk[:80]!r}", flush=True) |
| out_wav = str(workdir / f"chunk_{i}.wav") |
| run_chunk(tts, chunk, clean_ref, out_wav) |
| combined += AudioSegment.from_wav(out_wav) |
| gc.collect() |
|
|
| fd, tmp_out = tempfile.mkstemp(suffix=".wav") |
| os.close(fd) |
| combined.export(tmp_out, format="wav") |
| final_audio_path = tmp_out |
| tmp_out = None |
| return final_audio_path, "ok" |
| except Exception as e: |
| print(f"[synth] ERROR: {e}", flush=True) |
| return None, str(e) |
| finally: |
| if clean_ref and Path(clean_ref).exists(): |
| try: |
| os.unlink(clean_ref) |
| except OSError: |
| pass |
| if tmp_out and Path(tmp_out).exists(): |
| try: |
| os.unlink(tmp_out) |
| except OSError: |
| pass |
| shutil.rmtree(workdir, ignore_errors=True) |
|
|
|
|
| demo = gr.Interface( |
| fn=synthesize, |
| inputs=[ |
| gr.Textbox(label="Text to synthesise"), |
| gr.Audio(type="filepath", label="Reference Voice (3-10 second voice note)"), |
| gr.Textbox(label="Secret", type="password"), |
| ], |
| outputs=[ |
| gr.Audio(type="filepath", label="Generated Audio"), |
| gr.Textbox(label="Status"), |
| ], |
| api_name="predict", |
| title="Chronis XTTS-v2", |
| description="Voice cloning TTS - send a voice note, get the cloned voice back.", |
| flagging_mode="never", |
| ) |
|
|
| demo.queue() |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|