#!/usr/bin/env python3 """ HuggingFace Spaces app.py for IndexTTS2 with Auto-Processing and Combined Audio """ import os import sys import subprocess import gradio as gr import torch import numpy as np import soundfile as sf from huggingface_hub import ( HfApi, hf_hub_download, CommitOperationAdd, list_repo_files, CommitOperationDelete, ) import threading import time from pathlib import Path import tempfile # Set environment variables for HF Spaces os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" os.environ["GRADIO_SERVER_PORT"] = "7860" # Set up paths current_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.append(current_dir) # Global state for auto-processing auto_process_running = False auto_process_thread = None current_status = "Ready" tts_model = None # Constants MAX_COMBINED_DURATION = 30 * 60 # 30 minutes in seconds PAUSE_DURATION = 3.0 # 3 seconds pause between audios def download_models(): """Download models if they don't exist""" checkpoints_dir = "./checkpoints" if not os.path.exists(checkpoints_dir): print("Downloading IndexTTS2 models...") try: from huggingface_hub import snapshot_download snapshot_download( repo_id="IndexTeam/IndexTTS-2", local_dir=checkpoints_dir, allow_patterns=[ "*.pth", "*.pt", "*.yaml", "*.model", "*.vocab", "qwen0.6bemo4-merge/**", ], ) print("Models downloaded successfully!") except Exception as e: print(f"Failed to download models: {e}") print( "Please download models manually from: " "https://huggingface.co/IndexTeam/IndexTTS-2" ) # Download models on startup download_models() # Initialize IndexTTS2 model after download try: from indextts.infer_v2 import IndexTTS2 tts_model = IndexTTS2( cfg_path="checkpoints/config.yaml", model_dir="checkpoints", use_fp16=True, # Use FP16 for lower VRAM usage use_cuda_kernel=False, use_deepspeed=False, ) print("IndexTTS2 model loaded successfully!") except Exception as e: print(f"Error loading IndexTTS2 model: {e}") tts_model = None def add_silence(duration_sec: float, sample_rate: int = 24000) -> np.ndarray: """Generate silence of specified duration in seconds.""" return np.zeros(int(duration_sec * sample_rate), dtype=np.float32) def parse_audio_duration_from_log(log_line: str): """Parse audio duration from log line like '>> Generated audio length: 4.89 seconds'""" if "Generated audio length:" in log_line: try: duration_str = ( log_line.split("Generated audio length:")[1] .split("seconds")[0] .strip() ) return float(duration_str) except Exception: return None return None def create_combined_audios(audio_files_info): """ Create combined audio file(s) with 3-second pauses, without changing pitch, samplerate or bitdepth. audio_files_info: List[(file_path, duration_in_seconds)] """ # 1) Samplerate der ersten Datei korrekt auslesen (z.B. 22050 Hz von BigVGAN) first_file = audio_files_info[0][0] _, sr = sf.read(first_file, dtype="int16") # 3 Sekunden Stille in ORIGINAL-SAMPLERATE erzeugen silence_3s = np.zeros(int(sr * PAUSE_DURATION), dtype=np.int16) combined_files = [] current_files = [] current_duration = 0.0 combined_index = 1 for file_path, duration in audio_files_info: # "Was wäre die Länge, wenn wir diese Datei hinzufügen?" new_length = current_duration if current_files: new_length += PAUSE_DURATION new_length += duration # Wenn zu lang → speichern & neue Combined beginnen if new_length > MAX_COMBINED_DURATION and current_files: combined_name = ( "temp_combined.wav" if combined_index == 1 and len(audio_files_info) <= 30 else f"temp_combined_{combined_index:03d}.wav" ) audio_out = [] # 1.5 Sekunden Intro-Stille vor der ersten Audio silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16) audio_out.append(silence_intro) for i, fp in enumerate(current_files): data, _ = sf.read(fp, dtype='int16') audio_out.append(data) # Zwischen Affirmationen 3 Sekunden Pause if i < len(current_files) - 1: audio_out.append(silence_3s) final_audio = np.concatenate(audio_out) sf.write(combined_name, final_audio, sr, subtype="PCM_16") combined_files.append((combined_name, current_duration)) print( f"Created combined file {combined_index}: " f"{int(current_duration // 60)}:{int(current_duration % 60):02d}" ) combined_index += 1 # Neue Combined-Gruppe beginnen mit aktueller Datei current_files = [file_path] current_duration = duration else: current_files.append(file_path) if len(current_files) == 1: current_duration = duration else: current_duration += PAUSE_DURATION + duration # Letzte Combined-Datei speichern if current_files: combined_name = ( "temp_combined.wav" if combined_index == 1 and len(audio_files_info) <= 30 else f"temp_combined_{combined_index:03d}.wav" ) audio_out = [] # 1.5 Sekunden Intro-Stille vor der ersten Audio silence_intro = np.zeros(int(sr * 1.5), dtype=np.int16) audio_out.append(silence_intro) for i, fp in enumerate(current_files): data, _ = sf.read(fp, dtype='int16') audio_out.append(data) # Zwischen Affirmationen 3 Sekunden Pause if i < len(current_files) - 1: audio_out.append(silence_3s) final_audio = np.concatenate(audio_out) sf.write(combined_name, final_audio, sr, subtype="PCM_16") combined_files.append((combined_name, current_duration)) print( f"Created combined file {combined_index}: " f"{int(current_duration // 60)}:{int(current_duration % 60):02d}" ) return combined_files def auto_process_dataset(): """ Auto-process TXT files from Monarchtaba22/rawAffirmation Generate audio for each sentence (split by .-) and upload to output dataset Create combined audio(s) with 3s pauses, max 30 min each Move processed TXT files to /done folder """ global auto_process_running, current_status, tts_model if tts_model is None: current_status = "Error: TTS model not loaded" return try: token = os.getenv("HF_TOKEN") if not token: current_status = "Error: HF_TOKEN not found in environment" return api = HfApi(token=token) input_dataset_id = "Mo2294/rawAffirmation" output_dataset_id = "Mo2294/outputAffirmation" # Download reference voice current_status = "Downloading reference voice Mo.wav..." reference_voice_path = hf_hub_download( repo_id=output_dataset_id, filename="Mo.wav", repo_type="dataset", token=token, ) # Get list of TXT files from input dataset (excluding /done folder) current_status = "Scanning for TXT files..." try: repo_files = list_repo_files( repo_id=input_dataset_id, repo_type="dataset", token=token ) # Filter for TXT files not in /done folder txt_files = [ f for f in repo_files if f.endswith(".txt") and not f.startswith("done/") ] except Exception as e: current_status = f"Error listing files: {e}" return if not txt_files: current_status = "No TXT files found to process" return current_status = f"Found {len(txt_files)} TXT files to process" # Process each TXT file for txt_file in txt_files: if not auto_process_running: current_status = "Processing stopped by user" break txt_name = Path(txt_file).stem current_status = f"Processing: {txt_name}" try: # Download TXT file txt_path = hf_hub_download( repo_id=input_dataset_id, filename=txt_file, repo_type="dataset", token=token, ) # Read and parse TXT content with open(txt_path, "r", encoding="utf-8") as f: content = f.read() # IMPROVED SPLITTING - preserve the actual text raw_sentences = content.split(".-") sentences = [] for s in raw_sentences: cleaned = s.strip() if cleaned: # Remove only trailing punctuation if it's a single dash or dot if cleaned.endswith("-") or cleaned.endswith("."): cleaned = cleaned[:-1].rstrip() sentences.append(cleaned) if not sentences: current_status = f"No sentences found in {txt_name}" continue current_status = ( f"Found {len(sentences)} sentences in {txt_name}" ) print(f"Processing sentences from {txt_name}:") temp_files = [] audio_files_info = [] # Store (filepath, duration) tuples commit_operations = [] # Process each sentence for idx, sentence in enumerate(sentences): if not auto_process_running: break current_status = ( f"Processing {txt_name}: sentence " f"{idx + 1}/{len(sentences)}" ) try: if not sentence: # Skip empty sentences continue # Add a period at the end if missing (helps with TTS prosody) if sentence[-1] not in ".!?": sentence = sentence + "." print(f" Sentence {idx+1}: '{sentence}'") # Generate audio using IndexTTS2 output_filename = f"temp_{txt_name}_{idx+1:03d}.wav" # Capture stdout to get audio duration import io from contextlib import redirect_stdout buf = io.StringIO() with redirect_stdout(buf): tts_model.infer( spk_audio_prompt=reference_voice_path, text=sentence, output_path=output_filename, verbose=True, # Enable verbose to get duration ) # Parse duration from output output_log = buf.getvalue() duration = None for line in output_log.split("\n"): dur = parse_audio_duration_from_log(line) if dur: duration = dur break if duration is None: # Fallback: read the file to get duration audio_data, sr = sf.read(output_filename) duration = len(audio_data) / sr print(f" Generated audio: {duration:.2f} seconds") # Store file info for combined audio audio_files_info.append((output_filename, duration)) temp_files.append(output_filename) # Prepare upload operation for individual file output_path = ( f"Affirmations/{txt_name}/" f"{txt_name}_{idx+1:03d}.wav" ) commit_operations.append( CommitOperationAdd( path_in_repo=output_path, path_or_fileobj=output_filename, ) ) except Exception as e: current_status = ( f"Error generating audio for sentence {idx+1}: {e}" ) print(f"Generation error: {e}") continue # Create combined audio file(s) if audio_files_info and auto_process_running: current_status = ( f"Creating combined audio(s) for {txt_name}..." ) combined_files = create_combined_audios(audio_files_info) # Add combined files to upload operations for i, (combined_file, duration) in enumerate( combined_files ): if len(combined_files) == 1: combined_path = ( f"Affirmations/{txt_name}/" f"{txt_name}_combined.wav" ) else: combined_path = ( f"Affirmations/{txt_name}/" f"{txt_name}_combined_{i+1:03d}.wav" ) commit_operations.append( CommitOperationAdd( path_in_repo=combined_path, path_or_fileobj=combined_file, ) ) temp_files.append(combined_file) duration_min = int(duration // 60) duration_sec = int(duration % 60) print( f" Combined file {i+1}: " f"{duration_min}:{duration_sec:02d}" ) # Upload all generated files if commit_operations and auto_process_running: total_individual = len(audio_files_info) total_combined = ( len(combined_files) if audio_files_info else 0 ) current_status = ( f"Uploading {total_individual} individual + " f"{total_combined} combined files for {txt_name}..." ) try: api.create_commit( repo_id=output_dataset_id, repo_type="dataset", operations=commit_operations, commit_message=( f"Add audio files for {txt_name} - " f"{total_individual} individual + " f"{total_combined} combined" ), token=token, ) current_status = ( f"Successfully uploaded files for {txt_name}" ) # Move TXT file to /done folder current_status = ( f"Moving {txt_name}.txt to /done folder..." ) # Read file content with open(txt_path, "rb") as f: file_content = f.read() # Create operations to move file move_operations = [ CommitOperationAdd( path_in_repo=f"done/{txt_file}", path_or_fileobj=file_content, ), CommitOperationDelete(path_in_repo=txt_file), ] api.create_commit( repo_id=input_dataset_id, repo_type="dataset", operations=move_operations, commit_message=( f"Move {txt_name}.txt to /done after processing" ), token=token, ) current_status = ( f"✅ Completed {txt_name}: " f"{total_individual} individual + " f"{total_combined} combined audio files" ) except Exception as e: current_status = ( f"Upload/Move error for {txt_name}: {e}" ) print(f"Error: {e}") # Cleanup temporary files for temp_file in temp_files: try: if os.path.exists(temp_file): os.remove(temp_file) except Exception: pass time.sleep(2) # Small delay between files except Exception as e: current_status = f"Error processing {txt_name}: {e}" print(f"Error: {e}") continue if auto_process_running: current_status = "✅ Auto-processing completed successfully!" else: current_status = "⏹️ Auto-processing stopped" except Exception as e: current_status = f"❌ Fatal error: {str(e)}" print(f"Fatal error: {e}") finally: auto_process_running = False def start_auto_process(): """Start the auto-processing thread""" global auto_process_running, auto_process_thread if auto_process_running: return "Auto-processing already running!", current_status auto_process_running = True auto_process_thread = threading.Thread(target=auto_process_dataset) auto_process_thread.start() return "✅ Auto-processing started!", "Starting..." def stop_auto_process(): """Stop the auto-processing""" global auto_process_running auto_process_running = False return "⏹️ Stop signal sent!", current_status def get_status(): """Get current processing status""" global auto_process_running if auto_process_running: return current_status + " 🔄" return current_status def manual_generate(text, reference_audio, emotion_audio, emo_alpha, use_emo_text): """Manual TTS generation""" global tts_model if tts_model is None: return None if not reference_audio: return None try: output_path = "manual_output.wav" if emotion_audio: tts_model.infer( spk_audio_prompt=reference_audio, text=text, output_path=output_path, emo_audio_prompt=emotion_audio, emo_alpha=emo_alpha, verbose=False, ) else: tts_model.infer( spk_audio_prompt=reference_audio, text=text, output_path=output_path, use_emo_text=use_emo_text, emo_alpha=emo_alpha if use_emo_text else 1.0, verbose=False, ) # Read the generated file audio_data, sample_rate = sf.read(output_path) return (sample_rate, audio_data) except Exception as e: print(f"Generation error: {e}") return None # Create Gradio interface with gr.Blocks(title="IndexTTS2 with Auto-Processing") as demo: gr.Markdown("# 🎤 IndexTTS2 Voice Synthesis") gr.Markdown( "State-of-the-art TTS with auto-processing and combined audio generation" ) # Manual tab with gr.Tab("Manual Processing"): with gr.Row(): with gr.Column(): text_input = gr.Textbox( label="Text to synthesize", placeholder="Enter text here...", lines=3, value="大家好,我现在正在体验AI科技!", ) reference_audio = gr.Audio( sources=["upload"], type="filepath", label="Voice reference (required)", ) emotion_audio = gr.Audio( sources=["upload"], type="filepath", label="Emotion reference (optional)", ) with gr.Row(): emo_alpha = gr.Slider( minimum=0.0, maximum=1.0, value=0.6, step=0.1, label="Emotion strength", ) use_emo_text = gr.Checkbox( label="Use text-based emotion", value=False ) with gr.Column(): generate_btn = gr.Button( "🎙️ Generate", variant="primary", size="lg" ) output_audio = gr.Audio(label="Generated audio", type="numpy") generate_btn.click( manual_generate, inputs=[ text_input, reference_audio, emotion_audio, emo_alpha, use_emo_text, ], outputs=output_audio, ) # Auto-processing tab with gr.Tab("Auto Processing"): gr.Markdown("### 🚀 Automatic Dataset Processing with Combined Audio") with gr.Row(): with gr.Column(scale=1): gr.Markdown( """ **Configuration:** - 📁 Input: `Mo2294/rawAffirmation` - 📂 Output: `Mo2294/outputAffirmation` - 🎙️ Voice: `Mo.wav` - ✂️ Delimiter: `.-` - 📝 Structure: `/Affirmations/[name]/` - ⏰ Combined: Max 30 min chunks - ⏸️ Pauses: 3 seconds between audios """ ) with gr.Column(scale=2): status_display = gr.Textbox( label="📊 Processing Status", value=get_status(), interactive=False, lines=3, ) with gr.Row(): start_btn = gr.Button( "▶️ Start Processing", variant="primary", scale=2 ) stop_btn = gr.Button("⏹️ Stop", variant="stop", scale=1) refresh_btn = gr.Button("🔄 Refresh", scale=1) message_display = gr.Textbox( label="Message", interactive=False, visible=False ) # Event handlers start_btn.click( start_auto_process, outputs=[message_display, status_display] ) stop_btn.click( stop_auto_process, outputs=[message_display, status_display] ) refresh_btn.click(get_status, outputs=status_display) # Footer gr.Markdown( """ ---
""" ) if __name__ == "__main__": demo.launch()