MoTTS / app.py
Mo2294's picture
Update app.py
24b3a19 verified
#!/usr/bin/env python3
"""
HuggingFace Spaces app.py for IndexTTS2 with Auto-Processing and Combined Audio
"""
import os
import sys
import subprocess
import gradio as gr
import torch
import numpy as np
import soundfile as sf
from huggingface_hub import (
HfApi,
hf_hub_download,
CommitOperationAdd,
list_repo_files,
CommitOperationDelete,
)
import threading
import time
from pathlib import Path
import tempfile
# Set environment variables for HF Spaces
os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0"
os.environ["GRADIO_SERVER_PORT"] = "7860"
# Set up paths
current_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.append(current_dir)
# Global state for auto-processing
auto_process_running = False
auto_process_thread = None
current_status = "Ready"
tts_model = None
# Constants
MAX_COMBINED_DURATION = 30 * 60 # 30 minutes in seconds
PAUSE_DURATION = 3.0 # 3 seconds pause between audios
def download_models():
"""Download models if they don't exist"""
checkpoints_dir = "./checkpoints"
if not os.path.exists(checkpoints_dir):
print("Downloading IndexTTS2 models...")
try:
from huggingface_hub import snapshot_download
snapshot_download(
repo_id="IndexTeam/IndexTTS-2",
local_dir=checkpoints_dir,
allow_patterns=[
"*.pth",
"*.pt",
"*.yaml",
"*.model",
"*.vocab",
"qwen0.6bemo4-merge/**",
],
)
print("Models downloaded successfully!")
except Exception as e:
print(f"Failed to download models: {e}")
print(
"Please download models manually from: "
"https://huggingface.co/IndexTeam/IndexTTS-2"
)
# Download models on startup
download_models()
# Initialize IndexTTS2 model after download
try:
from indextts.infer_v2 import IndexTTS2
tts_model = IndexTTS2(
cfg_path="checkpoints/config.yaml",
model_dir="checkpoints",
use_fp16=True, # Use FP16 for lower VRAM usage
use_cuda_kernel=False,
use_deepspeed=False,
)
print("IndexTTS2 model loaded successfully!")
except Exception as e:
print(f"Error loading IndexTTS2 model: {e}")
tts_model = None
def add_silence(duration_sec: float, sample_rate: int = 24000) -> np.ndarray:
"""Generate silence of specified duration in seconds."""
return np.zeros(int(duration_sec * sample_rate), dtype=np.float32)
def parse_audio_duration_from_log(log_line: str):
"""Parse audio duration from log line like '>> Generated audio length: 4.89 seconds'"""
if "Generated audio length:" in log_line:
try:
duration_str = (
log_line.split("Generated audio length:")[1]
.split("seconds")[0]
.strip()
)
return float(duration_str)
except Exception:
return None
return None
def create_combined_audios(audio_files_info):
"""
Create combined audio file(s) with 3-second pauses,
without changing pitch, samplerate or bitdepth.
audio_files_info: List[(file_path, duration_in_seconds)]
"""
# 1) Samplerate der ersten Datei korrekt auslesen (z.B. 22050 Hz von BigVGAN)
first_file = audio_files_info[0][0]
_, sr = sf.read(first_file, dtype="int16")
# 3 Sekunden Stille in ORIGINAL-SAMPLERATE erzeugen
silence_3s = np.zeros(int(sr * PAUSE_DURATION), dtype=np.int16)
combined_files = []
current_files = []
current_duration = 0.0
combined_index = 1
for file_path, duration in audio_files_info:
# "Was wäre die Länge, wenn wir diese Datei hinzufügen?"
new_length = current_duration
if current_files:
new_length += PAUSE_DURATION
new_length += duration
# Wenn zu lang → speichern & neue Combined beginnen
if new_length > MAX_COMBINED_DURATION and current_files:
combined_name = (
"temp_combined.wav"
if combined_index == 1 and len(audio_files_info) <= 30
else f"temp_combined_{combined_index:03d}.wav"
)
audio_out = []
# 1.5 Sekunden Intro-Stille vor der ersten Audio
silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16)
audio_out.append(silence_intro)
for i, fp in enumerate(current_files):
data, _ = sf.read(fp, dtype='int16')
audio_out.append(data)
# Zwischen Affirmationen 3 Sekunden Pause
if i < len(current_files) - 1:
audio_out.append(silence_3s)
final_audio = np.concatenate(audio_out)
sf.write(combined_name, final_audio, sr, subtype="PCM_16")
combined_files.append((combined_name, current_duration))
print(
f"Created combined file {combined_index}: "
f"{int(current_duration // 60)}:{int(current_duration % 60):02d}"
)
combined_index += 1
# Neue Combined-Gruppe beginnen mit aktueller Datei
current_files = [file_path]
current_duration = duration
else:
current_files.append(file_path)
if len(current_files) == 1:
current_duration = duration
else:
current_duration += PAUSE_DURATION + duration
# Letzte Combined-Datei speichern
if current_files:
combined_name = (
"temp_combined.wav"
if combined_index == 1 and len(audio_files_info) <= 30
else f"temp_combined_{combined_index:03d}.wav"
)
audio_out = []
# 1.5 Sekunden Intro-Stille vor der ersten Audio
silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16)
audio_out.append(silence_intro)
for i, fp in enumerate(current_files):
data, _ = sf.read(fp, dtype='int16')
audio_out.append(data)
# Zwischen Affirmationen 3 Sekunden Pause
if i < len(current_files) - 1:
audio_out.append(silence_3s)
final_audio = np.concatenate(audio_out)
sf.write(combined_name, final_audio, sr, subtype="PCM_16")
combined_files.append((combined_name, current_duration))
print(
f"Created combined file {combined_index}: "
f"{int(current_duration // 60)}:{int(current_duration % 60):02d}"
)
return combined_files
def auto_process_dataset():
"""
Auto-process TXT files from Monarchtaba22/rawAffirmation
Generate audio for each sentence (split by .-) and upload to output dataset
Create combined audio(s) with 3s pauses, max 30 min each
Move processed TXT files to /done folder
"""
global auto_process_running, current_status, tts_model
if tts_model is None:
current_status = "Error: TTS model not loaded"
return
try:
token = os.getenv("HF_TOKEN")
if not token:
current_status = "Error: HF_TOKEN not found in environment"
return
api = HfApi(token=token)
input_dataset_id = "Mo2294/rawAffirmation"
output_dataset_id = "Mo2294/outputAffirmation"
# Download reference voice
current_status = "Downloading reference voice Mo.wav..."
reference_voice_path = hf_hub_download(
repo_id=output_dataset_id,
filename="Mo.wav",
repo_type="dataset",
token=token,
)
# Get list of TXT files from input dataset (excluding /done folder)
current_status = "Scanning for TXT files..."
try:
repo_files = list_repo_files(
repo_id=input_dataset_id, repo_type="dataset", token=token
)
# Filter for TXT files not in /done folder
txt_files = [
f
for f in repo_files
if f.endswith(".txt") and not f.startswith("done/")
]
except Exception as e:
current_status = f"Error listing files: {e}"
return
if not txt_files:
current_status = "No TXT files found to process"
return
current_status = f"Found {len(txt_files)} TXT files to process"
# Process each TXT file
for txt_file in txt_files:
if not auto_process_running:
current_status = "Processing stopped by user"
break
txt_name = Path(txt_file).stem
current_status = f"Processing: {txt_name}"
try:
# Download TXT file
txt_path = hf_hub_download(
repo_id=input_dataset_id,
filename=txt_file,
repo_type="dataset",
token=token,
)
# Read and parse TXT content
with open(txt_path, "r", encoding="utf-8") as f:
content = f.read()
# IMPROVED SPLITTING - preserve the actual text
raw_sentences = content.split(".-")
sentences = []
for s in raw_sentences:
cleaned = s.strip()
if cleaned:
# Remove only trailing punctuation if it's a single dash or dot
if cleaned.endswith("-") or cleaned.endswith("."):
cleaned = cleaned[:-1].rstrip()
sentences.append(cleaned)
if not sentences:
current_status = f"No sentences found in {txt_name}"
continue
current_status = (
f"Found {len(sentences)} sentences in {txt_name}"
)
print(f"Processing sentences from {txt_name}:")
temp_files = []
audio_files_info = [] # Store (filepath, duration) tuples
commit_operations = []
# Process each sentence
for idx, sentence in enumerate(sentences):
if not auto_process_running:
break
current_status = (
f"Processing {txt_name}: sentence "
f"{idx + 1}/{len(sentences)}"
)
try:
if not sentence: # Skip empty sentences
continue
# Add a period at the end if missing (helps with TTS prosody)
if sentence[-1] not in ".!?":
sentence = sentence + "."
print(f" Sentence {idx+1}: '{sentence}'")
# Generate audio using IndexTTS2
output_filename = f"temp_{txt_name}_{idx+1:03d}.wav"
# Capture stdout to get audio duration
import io
from contextlib import redirect_stdout
buf = io.StringIO()
with redirect_stdout(buf):
tts_model.infer(
spk_audio_prompt=reference_voice_path,
text=sentence,
output_path=output_filename,
verbose=True, # Enable verbose to get duration
)
# Parse duration from output
output_log = buf.getvalue()
duration = None
for line in output_log.split("\n"):
dur = parse_audio_duration_from_log(line)
if dur:
duration = dur
break
if duration is None:
# Fallback: read the file to get duration
audio_data, sr = sf.read(output_filename)
duration = len(audio_data) / sr
print(f" Generated audio: {duration:.2f} seconds")
# Store file info for combined audio
audio_files_info.append((output_filename, duration))
temp_files.append(output_filename)
# Prepare upload operation for individual file
output_path = (
f"Affirmations/{txt_name}/"
f"{txt_name}_{idx+1:03d}.wav"
)
commit_operations.append(
CommitOperationAdd(
path_in_repo=output_path,
path_or_fileobj=output_filename,
)
)
except Exception as e:
current_status = (
f"Error generating audio for sentence {idx+1}: {e}"
)
print(f"Generation error: {e}")
continue
# Create combined audio file(s)
if audio_files_info and auto_process_running:
current_status = (
f"Creating combined audio(s) for {txt_name}..."
)
combined_files = create_combined_audios(audio_files_info)
# Add combined files to upload operations
for i, (combined_file, duration) in enumerate(
combined_files
):
if len(combined_files) == 1:
combined_path = (
f"Affirmations/{txt_name}/"
f"{txt_name}_combined.wav"
)
else:
combined_path = (
f"Affirmations/{txt_name}/"
f"{txt_name}_combined_{i+1:03d}.wav"
)
commit_operations.append(
CommitOperationAdd(
path_in_repo=combined_path,
path_or_fileobj=combined_file,
)
)
temp_files.append(combined_file)
duration_min = int(duration // 60)
duration_sec = int(duration % 60)
print(
f" Combined file {i+1}: "
f"{duration_min}:{duration_sec:02d}"
)
# Upload all generated files
if commit_operations and auto_process_running:
total_individual = len(audio_files_info)
total_combined = (
len(combined_files) if audio_files_info else 0
)
current_status = (
f"Uploading {total_individual} individual + "
f"{total_combined} combined files for {txt_name}..."
)
try:
api.create_commit(
repo_id=output_dataset_id,
repo_type="dataset",
operations=commit_operations,
commit_message=(
f"Add audio files for {txt_name} - "
f"{total_individual} individual + "
f"{total_combined} combined"
),
token=token,
)
current_status = (
f"Successfully uploaded files for {txt_name}"
)
# Move TXT file to /done folder
current_status = (
f"Moving {txt_name}.txt to /done folder..."
)
# Read file content
with open(txt_path, "rb") as f:
file_content = f.read()
# Create operations to move file
move_operations = [
CommitOperationAdd(
path_in_repo=f"done/{txt_file}",
path_or_fileobj=file_content,
),
CommitOperationDelete(path_in_repo=txt_file),
]
api.create_commit(
repo_id=input_dataset_id,
repo_type="dataset",
operations=move_operations,
commit_message=(
f"Move {txt_name}.txt to /done after processing"
),
token=token,
)
current_status = (
f"✅ Completed {txt_name}: "
f"{total_individual} individual + "
f"{total_combined} combined audio files"
)
except Exception as e:
current_status = (
f"Upload/Move error for {txt_name}: {e}"
)
print(f"Error: {e}")
# Cleanup temporary files
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception:
pass
time.sleep(2) # Small delay between files
except Exception as e:
current_status = f"Error processing {txt_name}: {e}"
print(f"Error: {e}")
continue
if auto_process_running:
current_status = "✅ Auto-processing completed successfully!"
else:
current_status = "⏹️ Auto-processing stopped"
except Exception as e:
current_status = f"❌ Fatal error: {str(e)}"
print(f"Fatal error: {e}")
finally:
auto_process_running = False
def start_auto_process():
"""Start the auto-processing thread"""
global auto_process_running, auto_process_thread
if auto_process_running:
return "Auto-processing already running!", current_status
auto_process_running = True
auto_process_thread = threading.Thread(target=auto_process_dataset)
auto_process_thread.start()
return "✅ Auto-processing started!", "Starting..."
def stop_auto_process():
"""Stop the auto-processing"""
global auto_process_running
auto_process_running = False
return "⏹️ Stop signal sent!", current_status
def get_status():
"""Get current processing status"""
global auto_process_running
if auto_process_running:
return current_status + " 🔄"
return current_status
def manual_generate(text, reference_audio, emotion_audio, emo_alpha, use_emo_text):
"""Manual TTS generation"""
global tts_model
if tts_model is None:
return None
if not reference_audio:
return None
try:
output_path = "manual_output.wav"
if emotion_audio:
tts_model.infer(
spk_audio_prompt=reference_audio,
text=text,
output_path=output_path,
emo_audio_prompt=emotion_audio,
emo_alpha=emo_alpha,
verbose=False,
)
else:
tts_model.infer(
spk_audio_prompt=reference_audio,
text=text,
output_path=output_path,
use_emo_text=use_emo_text,
emo_alpha=emo_alpha if use_emo_text else 1.0,
verbose=False,
)
# Read the generated file
audio_data, sample_rate = sf.read(output_path)
return (sample_rate, audio_data)
except Exception as e:
print(f"Generation error: {e}")
return None
# Create Gradio interface
with gr.Blocks(title="IndexTTS2 with Auto-Processing") as demo:
gr.Markdown("# 🎤 IndexTTS2 Voice Synthesis")
gr.Markdown(
"State-of-the-art TTS with auto-processing and combined audio generation"
)
# Manual tab
with gr.Tab("Manual Processing"):
with gr.Row():
with gr.Column():
text_input = gr.Textbox(
label="Text to synthesize",
placeholder="Enter text here...",
lines=3,
value="大家好,我现在正在体验AI科技!",
)
reference_audio = gr.Audio(
sources=["upload"],
type="filepath",
label="Voice reference (required)",
)
emotion_audio = gr.Audio(
sources=["upload"],
type="filepath",
label="Emotion reference (optional)",
)
with gr.Row():
emo_alpha = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.6,
step=0.1,
label="Emotion strength",
)
use_emo_text = gr.Checkbox(
label="Use text-based emotion", value=False
)
with gr.Column():
generate_btn = gr.Button(
"🎙️ Generate", variant="primary", size="lg"
)
output_audio = gr.Audio(label="Generated audio", type="numpy")
generate_btn.click(
manual_generate,
inputs=[
text_input,
reference_audio,
emotion_audio,
emo_alpha,
use_emo_text,
],
outputs=output_audio,
)
# Auto-processing tab
with gr.Tab("Auto Processing"):
gr.Markdown("### 🚀 Automatic Dataset Processing with Combined Audio")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(
"""
**Configuration:**
- 📁 Input: `Mo2294/rawAffirmation`
- 📂 Output: `Mo2294/outputAffirmation`
- 🎙️ Voice: `Mo.wav`
- ✂️ Delimiter: `.-`
- 📝 Structure: `/Affirmations/[name]/`
- ⏰ Combined: Max 30 min chunks
- ⏸️ Pauses: 3 seconds between audios
"""
)
with gr.Column(scale=2):
status_display = gr.Textbox(
label="📊 Processing Status",
value=get_status(),
interactive=False,
lines=3,
)
with gr.Row():
start_btn = gr.Button(
"▶️ Start Processing", variant="primary", scale=2
)
stop_btn = gr.Button("⏹️ Stop", variant="stop", scale=1)
refresh_btn = gr.Button("🔄 Refresh", scale=1)
message_display = gr.Textbox(
label="Message", interactive=False, visible=False
)
# Event handlers
start_btn.click(
start_auto_process, outputs=[message_display, status_display]
)
stop_btn.click(
stop_auto_process, outputs=[message_display, status_display]
)
refresh_btn.click(get_status, outputs=status_display)
# Footer
gr.Markdown(
"""
---
<div align="center">
<a href="https://github.com/index-tts/index-tts">GitHub</a> |
<a href="https://arxiv.org/abs/2506.21619">Paper</a> |
<a href="https://index-tts.github.io/index-tts2.github.io/">Demo</a>
</div>
"""
)
if __name__ == "__main__":
demo.launch()