chronisai / app.py
chronisai's picture
Update app.py
58730a4 verified
import os
os.environ["PYTHONUTF8"] = "1"
os.environ["PYTHONIOENCODING"] = "utf-8"
import sys
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
import re
import gc
import tempfile
import subprocess
import shutil
import threading
from pathlib import Path
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["OMP_NUM_THREADS"] = str(os.cpu_count() or 1)
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import gradio as gr
from pydub import AudioSegment
SECRET = os.environ.get("API_SECRET", "")
MODEL_DIR = Path(os.environ.get("XTTS_MODEL_DIR", r"C:\tmp\xtts-v2-weights"))
inference_lock = threading.Lock()
_tts_instance = None # lazy-loaded TTS object
print("=== Chronis XTTS-v2 Space Booting ===", flush=True)
def setup():
"""Install Coqui TTS if needed and download XTTS-v2 weights once."""
try:
import TTS # noqa: F401
print("[setup] TTS library already installed.", flush=True)
except ImportError:
print("[setup] Installing TTS library ...", flush=True)
subprocess.run([sys.executable, "-m", "pip", "install", "TTS", "-q"], check=True)
print("[setup] TTS library installed.", flush=True)
MODEL_DIR.mkdir(parents=True, exist_ok=True)
config_path = MODEL_DIR / "config.json"
if not config_path.exists():
print("[setup] Downloading XTTS-v2 weights ...", flush=True)
from huggingface_hub import snapshot_download
snapshot_download(
repo_id="coqui/XTTS-v2",
local_dir=str(MODEL_DIR),
local_dir_use_symlinks=False,
)
print("[setup] XTTS-v2 weights downloaded.", flush=True)
else:
print("[setup] XTTS-v2 weights already present.", flush=True)
def get_tts():
"""Lazy-load model once per process."""
global _tts_instance
if _tts_instance is None:
from TTS.api import TTS
print("[tts] Loading XTTS-v2 model ...", flush=True)
_tts_instance = TTS(
model_path=str(MODEL_DIR),
config_path=str(MODEL_DIR / "config.json"),
progress_bar=False,
gpu=False,
)
print("[tts] Model loaded", flush=True)
return _tts_instance
def clean_text(text: str) -> str:
text = re.sub(r"[^\x00-\x7F]+", " ", text)
text = re.sub(r"\s+", " ", text).strip()
if len(text) < 3:
return "I am here with you."
return text[:500]
def split_sentences(text: str, max_chars: int = 200) -> list[str]:
parts = re.split(r"(?<=[.!?])\s+", text)
chunks: list[str] = []
buf = ""
for p in parts:
if len(buf) + len(p) < max_chars:
buf = (buf + " " + p).strip()
else:
if buf:
chunks.append(buf)
buf = p
if buf:
chunks.append(buf)
return chunks or [text]
def prepare_ref_audio(ref_path: str) -> str:
"""Normalize to mono 24k WAV and cap to 10 seconds."""
audio = AudioSegment.from_file(ref_path)
audio = audio.set_channels(1).set_frame_rate(24000).normalize()
if len(audio) > 10000:
audio = audio[:10000]
elif len(audio) < 1000:
raise ValueError(f"Reference audio too short ({len(audio)} ms). Need at least 1 second.")
fd, tmp_path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
audio.export(tmp_path, format="wav")
return tmp_path
def run_chunk(tts, text: str, ref_audio: str, out_path: str):
tts.tts_to_file(
text=text,
speaker_wav=ref_audio,
language="en",
file_path=out_path,
)
def synthesize(text: str, ref_audio_path: str, secret: str):
with inference_lock:
if SECRET and secret != SECRET:
return None, "Unauthorized"
if not ref_audio_path or not Path(ref_audio_path).exists():
return None, "Reference audio missing or not uploaded"
try:
setup()
except Exception as e:
return None, f"Setup failed: {e}"
cleaned = clean_text(text)
chunks = split_sentences(cleaned)
workdir = Path(tempfile.mkdtemp(prefix="chronis_xtts_"))
clean_ref = None
tmp_out = None
try:
clean_ref = prepare_ref_audio(ref_audio_path)
tts = get_tts()
combined = AudioSegment.empty()
for i, chunk in enumerate(chunks):
print(f"[synth] chunk {i+1}/{len(chunks)}: {chunk[:80]!r}", flush=True)
out_wav = str(workdir / f"chunk_{i}.wav")
run_chunk(tts, chunk, clean_ref, out_wav)
combined += AudioSegment.from_wav(out_wav)
gc.collect()
fd, tmp_out = tempfile.mkstemp(suffix=".wav")
os.close(fd)
combined.export(tmp_out, format="wav")
final_audio_path = tmp_out
tmp_out = None
return final_audio_path, "ok"
except Exception as e:
print(f"[synth] ERROR: {e}", flush=True)
return None, str(e)
finally:
if clean_ref and Path(clean_ref).exists():
try:
os.unlink(clean_ref)
except OSError:
pass
if tmp_out and Path(tmp_out).exists():
try:
os.unlink(tmp_out)
except OSError:
pass
shutil.rmtree(workdir, ignore_errors=True)
demo = gr.Interface(
fn=synthesize,
inputs=[
gr.Textbox(label="Text to synthesise"),
gr.Audio(type="filepath", label="Reference Voice (3-10 second voice note)"),
gr.Textbox(label="Secret", type="password"),
],
outputs=[
gr.Audio(type="filepath", label="Generated Audio"),
gr.Textbox(label="Status"),
],
api_name="predict",
title="Chronis XTTS-v2",
description="Voice cloning TTS - send a voice note, get the cloned voice back.",
flagging_mode="never",
)
demo.queue()
demo.launch(server_name="0.0.0.0", server_port=7860)