Good.KTTS / app.py
Aid3445's picture
Update app.py
a37cabf verified
raw
history blame
27.9 kB
import gradio as gr
import os
import tempfile
import soundfile as sf
import numpy as np
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
from huggingface_hub import hf_hub_download
import json
import onnxruntime as ort
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")
# Fix for OpenMP duplicate library error
os.environ['KMP_DUPLICATE_LIB_OK'] = 'TRUE'
# Force CPU usage for ONNX Runtime to avoid GPU issues
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
class DirectKittenTTS:
"""Direct implementation of KittenTTS using ONNX Runtime"""
def __init__(self, model_path, voices_path):
"""Initialize with direct paths to model and voices files"""
self.session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self.voices_data = np.load(voices_path)
self.voice_list = list(self.voices_data.keys())
print(f"Loaded model with voices: {self.voice_list}")
def text_to_phonemes(self, text):
"""Convert text to phonemes with multiple fallback strategies"""
try:
# Try to use g2p_en for English phonemization
try:
from g2p_en import G2p
g2p = G2p()
phonemes = g2p(text)
# Convert to string of phonemes separated by spaces
phonemes = ' '.join(phonemes)
return phonemes
except ImportError:
print("g2p_en not available, trying phonemizer")
# Try to use phonemizer with espeak backend
try:
from phonemizer import phonemize
phonemes = phonemize(text, backend='espeak', language='en-us')
return phonemes
except ImportError:
print("phonemizer not available, using basic cleaning")
except Exception as e:
print(f"phonemizer failed: {e}")
# Fallback to basic cleaning
text = text.lower()
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
return text
except Exception as e:
print(f"Error in phoneme conversion: {e}")
# Last resort: return cleaned text
text = text.lower()
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
return text
def generate(self, text, voice='expr-voice-2-m', speed=1.0):
"""Generate audio from text with improved text processing"""
try:
# Get voice embedding
if voice not in self.voices_data:
print(f"Voice {voice} not found, using first available voice")
voice = self.voice_list[0]
voice_embedding = self.voices_data[voice]
# Convert text to phonemes
phonemes = self.text_to_phonemes(text)
# Prepare input for ONNX model
max_length = 512
# Try to use a proper tokenizer if available
try:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
text_encoded = tokenizer.encode(phonemes, truncation=True, max_length=max_length)
# Add padding if needed
text_encoded = text_encoded + [0] * (max_length - len(text_encoded))
except:
# Fallback to character-level encoding
text_encoded = [ord(c) for c in phonemes[:max_length]]
text_encoded = text_encoded + [0] * (max_length - len(text_encoded))
text_input = np.array([text_encoded], dtype=np.int64)
# Get input names from the model
input_names = [inp.name for inp in self.session.get_inputs()]
# Prepare inputs dict
inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
inputs[name] = text_input
elif 'voice' in name.lower() or 'speaker' in name.lower():
inputs[name] = voice_embedding.reshape(1, -1)
elif 'speed' in name.lower():
inputs[name] = np.array([[speed]], dtype=np.float32)
# Reset model state between generations
try:
dummy_inputs = {name: np.zeros_like(inputs[name]) for name in inputs}
self.session.run(None, dummy_inputs)
except:
pass
# Run inference
outputs = self.session.run(None, inputs)
# Get audio output (usually the first output)
audio = outputs[0]
# Ensure audio is 1D
if audio.ndim > 1:
audio = audio.squeeze()
# Apply speed adjustment if not handled by model
if speed != 1.0:
# Simple speed adjustment by resampling
original_length = len(audio)
new_length = int(original_length / speed)
indices = np.linspace(0, original_length - 1, new_length)
audio = np.interp(indices, np.arange(original_length), audio)
return audio
except Exception as e:
print(f"Error in generate: {e}")
# Return a simple sine wave as fallback
duration = 1.0
sample_rate = 24000
t = np.linspace(0, duration, int(sample_rate * duration))
audio = np.sin(2 * np.pi * 440 * t) * 0.3
return audio
class KittenTTSGradio:
def __init__(self):
"""Initialize the KittenTTS model and settings"""
self.model = None
self.available_voices = [
'expr-voice-2-m', 'expr-voice-2-f', 'expr-voice-3-m', 'expr-voice-3-f',
'expr-voice-4-m', 'expr-voice-4-f', 'expr-voice-5-m', 'expr-voice-5-f'
]
# Limit workers to avoid conflicts
self.max_workers = min(4, max(1, os.cpu_count() - 1)) if os.cpu_count() else 2
self.model_loaded = False
def ensure_model_loaded(self):
"""Ensure model is loaded before use"""
if not self.model_loaded:
self.load_model()
def download_and_load_model(self, repo_id):
"""Download model files and load them directly"""
try:
print(f"Downloading model files from {repo_id}...")
# Download config file
config_path = hf_hub_download(repo_id=repo_id, filename="config.json")
# Read config to get file names
with open(config_path, 'r') as f:
config = json.load(f)
# Get model filename from config or use defaults
model_filename = config.get("model_file")
if not model_filename:
# Try to guess based on repo name
if "mini" in repo_id:
model_filename = "kitten_tts_mini_v0_1.onnx"
elif "nano" in repo_id and "0.2" in repo_id:
model_filename = "kitten_tts_nano_v0_2.onnx"
else:
model_filename = "kitten_tts_nano_v0_1.onnx"
# Download model file
print(f"Downloading model file: {model_filename}")
model_path = hf_hub_download(repo_id=repo_id, filename=model_filename)
# Download voices file
voices_filename = config.get("voices", "voices.npz")
print(f"Downloading voices file: {voices_filename}")
voices_path = hf_hub_download(repo_id=repo_id, filename=voices_filename)
print(f"Files downloaded: {model_path}, {voices_path}")
# Create our direct ONNX model
self.model = DirectKittenTTS(model_path, voices_path)
# Update available voices based on what's actually in the file
if hasattr(self.model, 'voice_list'):
self.available_voices = self.model.voice_list
return True
except Exception as e:
print(f"Failed to download and load {repo_id}: {e}")
return False
def load_model(self):
"""Load the TTS model with multiple fallback options"""
if self.model_loaded:
return
try:
print("Loading KittenTTS model...")
# First, try to import and use KittenTTS if available
try:
from kittentts import KittenTTS
# Try loading with the library first
for repo_id in ["KittenML/kitten-tts-mini-0.1", "KittenML/kitten-tts-nano-0.2"]:
try:
print(f"Trying to load {repo_id} with KittenTTS library...")
self.model = KittenTTS(repo_id)
self.model_loaded = True
print(f"Successfully loaded {repo_id} with KittenTTS!")
return
except:
continue
except ImportError:
print("KittenTTS library not available, using direct ONNX loading")
# If library loading failed, use our direct implementation
strategies = [
("KittenML/kitten-tts-mini-0.1", "mini"),
("KittenML/kitten-tts-nano-0.2", "nano v0.2"),
("KittenML/kitten-tts-nano-0.1", "nano v0.1"),
]
for repo_id, name in strategies:
print(f"Trying to load {name} model directly...")
if self.download_and_load_model(repo_id):
self.model_loaded = True
print(f"Successfully loaded {name} model!")
return
# If all strategies failed
raise Exception("Failed to load any KittenTTS model")
except Exception as e:
print(f"Error loading model: {e}")
self.model_loaded = False
raise e
def split_into_sentences(self, text):
"""Split text into sentences"""
text = re.sub(r'\s+', ' ', text)
text = text.strip()
sentences = re.split(r'(?<=[.!?])\s+', text)
processed_sentences = []
for sentence in sentences:
sentence = sentence.strip()
if sentence:
if not sentence.endswith(('.', '!', '?')):
sentence += '.'
processed_sentences.append(sentence)
return processed_sentences
def group_sentences_into_chunks(self, sentences, chunk_size):
"""Group sentences into chunks of specified size"""
if chunk_size <= 0:
chunk_size = 1
chunks = []
for i in range(0, len(sentences), chunk_size):
chunk = ' '.join(sentences[i:i + chunk_size])
chunks.append(chunk)
return chunks
def clean_text_for_model(self, text):
"""Clean text for the TTS model"""
if not text:
return "Hello."
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\'\"]', '', text)
text = re.sub(r'\s+', ' ', text)
text = text.strip()
if len(text) < 5:
text = "Hello."
return text
def safe_generate_audio(self, text, voice, speed):
"""Generate audio with fallback strategies"""
self.ensure_model_loaded()
if not self.model:
raise Exception("Model not loaded")
# Try original text
try:
audio = self.model.generate(text, voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Original attempt failed: {e}")
# Try cleaned text
try:
cleaned_text = self.clean_text_for_model(text)
audio = self.model.generate(cleaned_text, voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Cleaned attempt failed: {e}")
# Try basic fallback
try:
words = text.split()[:5]
basic_text = ' '.join(words)
if not basic_text.endswith(('.', '!', '?')):
basic_text += '.'
audio = self.model.generate(basic_text or "Hello.", voice=voice, speed=speed)
return audio
except Exception as e:
print(f"Basic attempt failed: {e}")
raise Exception("All audio generation attempts failed")
def process_single_sentence(self, sentence, voice, speed):
"""Process a single sentence with better error handling"""
try:
# Clean the sentence
cleaned_sentence = self.clean_text_for_model(sentence)
# Add a small delay between processing to avoid potential state issues
time.sleep(0.1)
# Generate audio
audio = self.safe_generate_audio(cleaned_sentence, voice=voice, speed=speed)
# Explicit garbage collection
gc.collect()
return audio
except Exception as e:
print(f"Error processing sentence: '{sentence[:30]}...': {e}")
# Return a short silence as fallback
sample_rate = 24000
silence_duration = 0.5 # seconds
silence = np.zeros(int(sample_rate * silence_duration))
return silence
def convert_text_to_speech(self, text, voice, speed, chunk_size, use_multithreading, progress=gr.Progress()):
"""Main conversion function for Gradio with model state reset"""
try:
self.ensure_model_loaded()
except Exception as e:
raise gr.Error(f"Failed to load model: {str(e)}")
if not text or not text.strip():
raise gr.Error("Please enter some text to convert.")
try:
sentences = self.split_into_sentences(text)
if not sentences:
raise gr.Error("No valid sentences found in the text.")
chunks = self.group_sentences_into_chunks(sentences, chunk_size)
total_chunks = len(chunks)
total_sentences = len(sentences)
chunk_label = "chunk" if chunk_size == 1 else f"chunk ({chunk_size} sentences each)"
progress(0, desc=f"Processing {total_sentences} sentences in {total_chunks} {chunk_label}s...")
# Reset model state before starting
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
# Create a list to hold results in the correct order
audio_chunks = [None] * total_chunks
if use_multithreading and total_chunks > 1:
# Process chunks in parallel with limited workers
with ThreadPoolExecutor(max_workers=min(self.max_workers, 4)) as executor:
# Submit all tasks
future_to_index = {
executor.submit(self.process_single_sentence, chunk, voice, speed): i
for i, chunk in enumerate(chunks)
}
completed = 0
# Process as they complete
for future in as_completed(future_to_index):
index = future_to_index[future]
try:
audio = future.result()
audio_chunks[index] = audio # Place at the correct index
completed += 1
progress(completed / total_chunks,
desc=f"Processed {completed}/{total_chunks} {chunk_label}s")
# Reset model state after each chunk
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
except Exception as e:
print(f"Error processing chunk at index {index}: {e}")
# Generate silence for failed chunks
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[index] = silence
completed += 1
progress(completed / total_chunks,
desc=f"Processed {completed}/{total_chunks} {chunk_label}s")
else:
# Process chunks sequentially
for i, chunk in enumerate(chunks):
try:
audio = self.process_single_sentence(chunk, voice, speed)
audio_chunks[i] = audio
progress((i + 1) / total_chunks,
desc=f"Processed {i + 1}/{total_chunks} {chunk_label}s")
# Reset model state after each chunk
if hasattr(self.model, 'session'):
try:
input_names = [inp.name for inp in self.model.session.get_inputs()]
dummy_inputs = {}
for name in input_names:
if 'text' in name.lower() or 'input' in name.lower():
dummy_inputs[name] = np.zeros((1, 512), dtype=np.int64)
else:
dummy_inputs[name] = np.zeros((1, 256), dtype=np.float32)
self.model.session.run(None, dummy_inputs)
except:
pass
except Exception as e:
print(f"Error processing chunk at index {i}: {e}")
# Generate silence for failed chunks
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[i] = silence
progress((i + 1) / total_chunks,
desc=f"Processed {i + 1}/{total_chunks} {chunk_label}s")
# Check if we have any None values (shouldn't happen with the error handling)
if any(chunk is None for chunk in audio_chunks):
print("Warning: Some audio chunks were not generated properly")
# Replace any None values with silence
for i, chunk in enumerate(audio_chunks):
if chunk is None:
sample_rate = 24000
silence_duration = 0.5
silence = np.zeros(int(sample_rate * silence_duration))
audio_chunks[i] = silence
progress(0.9, desc="Concatenating audio...")
if len(audio_chunks) == 1:
final_audio = audio_chunks[0]
else:
final_audio = np.concatenate(audio_chunks)
output_file = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
sf.write(output_file.name, final_audio, 24000)
output_file.close()
progress(1.0, desc="Complete!")
gc.collect()
processing_method = "multithreading" if use_multithreading else "sequential"
chunk_description = f"{chunk_size} sentence(s) per chunk" if chunk_size > 1 else "sentence-by-sentence"
status_message = f"βœ… Successfully converted {total_sentences} sentences ({total_chunks} chunks) using {processing_method} processing with {chunk_description}!"
return output_file.name, status_message
except Exception as e:
raise gr.Error(f"Conversion failed: {str(e)}")
# Initialize the app
print("Initializing KittenTTS app...")
app = KittenTTSGradio()
print("App initialized, model will load on first use")
# Create Gradio interface
def create_interface():
with gr.Blocks(title="KittenTTS - Text to Speech") as demo:
gr.Markdown("""
# πŸŽ™οΈ KittenTTS Text-to-Speech Converter
Convert text to natural-sounding speech using KittenTTS - a lightweight TTS model that runs on CPU.
**Note:** First conversion will download and load the model (~170MB for mini, ~25MB for nano).
If you encounter issues, please try refreshing the page.
""")
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="Text to Convert",
placeholder="Enter your text here or upload a file...",
lines=10,
max_lines=20,
value=""
)
with gr.Row():
file_upload = gr.File(
label="Or Upload Text File",
file_types=[".txt"],
type="filepath"
)
clear_btn = gr.Button("Clear Text", size="sm")
def load_file(file_path):
if file_path:
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 50000:
display_text = content[:50000] + "\n\n... (truncated for display)"
else:
display_text = content
return display_text
except Exception as e:
return f"Error loading file: {str(e)}"
return ""
def clear_text():
return ""
file_upload.change(fn=load_file, inputs=[file_upload], outputs=[text_input])
clear_btn.click(fn=clear_text, inputs=[], outputs=[text_input])
with gr.Column(scale=1):
voice_dropdown = gr.Dropdown(
choices=app.available_voices,
value=app.available_voices[0],
label="Voice Selection",
info="Choose the voice for speech synthesis"
)
speed_slider = gr.Slider(
minimum=0.5,
maximum=2.0,
value=1.0,
step=0.1,
label="Speech Speed",
info="Adjust the speed of speech (1.0 = normal)"
)
chunk_size_slider = gr.Slider(
minimum=1,
maximum=10,
value=1,
step=1,
label="Sentences per Chunk",
info="Group sentences together (1 = best quality, higher = faster processing)"
)
multithread_checkbox = gr.Checkbox(
value=True,
label=f"Enable Multithreading ({app.max_workers} workers)",
info="Process multiple chunks in parallel"
)
convert_btn = gr.Button(
"🎀 Convert to Speech",
variant="primary",
size="lg"
)
with gr.Row():
with gr.Column():
audio_output = gr.Audio(
label="Generated Audio",
type="filepath",
autoplay=False
)
status_output = gr.Markdown(
value="Ready to convert text to speech."
)
gr.Examples(
examples=[
["Hello! This is a test of the KittenTTS system. It can convert text to natural sounding speech."],
["The quick brown fox jumps over the lazy dog. This sentence contains every letter of the alphabet."],
["Welcome to our presentation. Today we'll discuss artificial intelligence. Let's begin with the basics."]
],
inputs=text_input,
label="Example Texts"
)
convert_btn.click(
fn=app.convert_text_to_speech,
inputs=[text_input, voice_dropdown, speed_slider, chunk_size_slider, multithread_checkbox],
outputs=[audio_output, status_output]
)
gr.Markdown("""
---
### βš™οΈ Chunk Size Guide:
- **1 sentence**: Best quality, natural pauses (recommended for short texts)
- **2-3 sentences**: Good balance of speed and quality
- **5+ sentences**: Faster processing for long texts (may sound more continuous)
### 🎭 Available Voices:
- **expr-voice-2-m/f**: Expressive male/female voices
- **expr-voice-3-m/f**: Natural male/female voices
- **expr-voice-4-m/f**: Clear male/female voices
- **expr-voice-5-m/f**: Warm male/female voices
### πŸ“ Notes:
- For best quality with longer texts, use chunk size 1
- The model uses phoneme conversion for more natural speech
- First use will download the model (may take a moment)
""")
return demo
# Create and launch the interface
print("Creating Gradio interface...")
demo = create_interface()
print("Launching app...")
if __name__ == "__main__":
demo.queue(max_size=5)
demo.launch(
share=False,
show_error=True,
server_name="0.0.0.0",
server_port=7860
)