qwen3tts / app.py
fLausch's picture
Update app.py from anycoder
eb0cb41 verified
import gradio as gr
import torch
import soundfile as sf
import numpy as np
import os
import re
import warnings
from datetime import datetime
from typing import Tuple, Optional, Any
from huggingface_hub import snapshot_download, login
# --- Hugging Face Authentication ---
HF_TOKEN = os.environ.get('HF_TOKEN')
if HF_TOKEN:
login(token=HF_TOKEN)
# --- Hugging Face Spaces GPU Support ---
try:
import spaces
HAS_SPACES = True
print(f"spaces package found, version: {getattr(spaces, '__version__', 'unknown')}")
except ImportError:
print("Warning: spaces package not found. Install: pip install spaces")
HAS_SPACES = False
# Dummy für lokale Entwicklung
class spaces:
@staticmethod
def GPU(func=None, duration=None):
if func is None:
return lambda f: f
return func
# Attempt to import optional dependencies
try:
import whisper
WHISPER_AVAILABLE = True
print("Whisper available")
except ImportError as e:
print(f"Whisper not available: {e}")
WHISPER_AVAILABLE = False
try:
import librosa
LIBROSA_AVAILABLE = True
print("Librosa available")
except ImportError as e:
print(f"Librosa not available: {e}")
LIBROSA_AVAILABLE = False
# --- Global Model State ---
# IMPORTANT: These are only accessed inside GPU-wrapped functions
_GPU_MODEL = None
_GPU_MODEL_PATH = None
# --- Configuration ---
DEFAULT_MODEL_PATH = "Qwen/Qwen3-TTS-12Hz-1.7B-Base"
# --- Helper Functions (CPU-safe) ---
def chunk_text(text: str, max_chars: int = 200) -> list[str]:
"""Chunking: Trenne bevorzugt an Absätzen, dann an Satzenden."""
if not text or not text.strip():
return [""]
paragraphs = re.split(r"\n\s*\n", text.strip())
chunks = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
sentences = []
remaining = para
while remaining:
match = re.search(r"[.!?]+(?:\s+|$)", remaining)
if match:
sentence = remaining[: match.end()].strip()
if sentence:
sentences.append(sentence)
remaining = remaining[match.end() :]
else:
if remaining.strip():
sentences.append(remaining.strip())
break
for s in sentences:
s = s.strip()
if not s:
continue
if not current_chunk:
current_chunk = s
elif len(current_chunk) + 1 + len(s) <= max_chars:
current_chunk += " " + s
else:
if len(current_chunk) < max_chars or not chunks:
current_chunk += " " + s
chunks.append(current_chunk.strip())
current_chunk = ""
else:
chunks.append(current_chunk.strip())
current_chunk = s
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = ""
chunks = [c for c in chunks if c]
return chunks if chunks else [text]
def trim_and_fade(audio: np.ndarray, is_first: bool, is_last: bool, sr: int) -> np.ndarray:
"""Am Ende jedes Chunks 30ms abschneiden, dann 80ms Stille anhängen."""
result = audio.copy()
cut_samples = int(0.03 * sr)
if len(result) > cut_samples:
result = result[:-cut_samples]
silence_samples = int(0.08 * sr)
silence = np.zeros(silence_samples)
result = np.concatenate([result, silence])
return result
# --- CPU-SAFE: Transcription (runs on CPU) ---
def transcribe_audio_logic(audio_path: str, progress=None) -> Tuple[str, str]:
"""Transcribes reference audio using Whisper."""
if not WHISPER_AVAILABLE:
return "", "Error: Whisper not installed. Run: pip install openai-whisper"
if not audio_path:
return "", "Error: No audio file provided."
if not os.path.exists(audio_path):
return "", f"Error: Audio file not found: {audio_path}"
p = progress if progress is not None else gr.Progress()
try:
p(0.2, desc="Loading Whisper model...")
whisper_model = whisper.load_model("base", device="cpu")
p(0.5, desc="Reading audio file...")
audio_data, sample_rate = sf.read(audio_path, dtype="float32")
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
audio_data = audio_data.astype(np.float32)
if sample_rate != 16000:
if not LIBROSA_AVAILABLE:
return "", "Error: librosa not installed for resampling. Run: pip install librosa"
audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000)
p(0.8, desc="Transcribing...")
result = whisper_model.transcribe(audio_data, fp16=False)
transcript = result["text"].strip()
del whisper_model
return transcript, "Transcription completed successfully!"
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"Transcription error:\n{error_detail}")
return "", f"Transcription error: {str(e)}"
# --- GPU-ONLY FUNCTIONS: Everything below runs ONLY inside @spaces.GPU ---
def _load_model_gpu_only(model_path: str, progress_fn) -> Tuple[Any, str]:
"""
Load model inside GPU context only using snapshot_download.
NEVER call this from main process!
"""
global _GPU_MODEL, _GPU_MODEL_PATH
# Now safe to import and use CUDA
try:
from qwen_tts import Qwen3TTSModel
except ImportError as e:
return None, f"Error: qwen_tts library not found: {e}"
p = progress_fn
# CUDA settings - safe to do here inside GPU context
p(0.1, desc="Initializing Torch settings for CUDA stability...")
torch.backends.cuda.matmul.allow_tf32 = False
torch.backends.cudnn.allow_tf32 = False
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
torch.set_float32_matmul_precision("highest")
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
warnings.filterwarnings("ignore", message=".*flash_attention.*")
warnings.filterwarnings("ignore", message=".*FlashAttention.*")
try:
# Use snapshot_download as in the reference space
p(0.2, desc="Locating model files...")
local_model_path = snapshot_download(repo_id=model_path)
p(0.3, desc="Loading model weights (this may take a while on first run)...")
device = "cuda" # We know we're in GPU context
print(f"Using device: {device}")
capability = torch.cuda.get_device_capability()
major, minor = capability
print(f"GPU compute capability: {major}.{minor}")
print(f"GPU name: {torch.cuda.get_device_name(0)}")
# Use bfloat16 as in the reference space (faster and memory efficient)
dtype = torch.bfloat16
print(f"Using {dtype} for stability and speed")
p(0.4, desc=f"Loading with dtype={dtype}...")
model = Qwen3TTSModel.from_pretrained(
local_model_path,
device_map=device,
dtype=dtype,
token=HF_TOKEN,
attn_implementation="eager", # Using eager for stability on Zero GPU
)
# SKIP WARMUP to save GPU time
p(0.9, desc="Model loaded, skipping warmup to save GPU quota...")
_GPU_MODEL = model
_GPU_MODEL_PATH = model_path
p(1.0, desc="Model loaded successfully!")
return model, f"Status: Model Loaded Successfully ✓ (using {dtype} on {device})"
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"Detailed error during model loading:\n{error_detail}")
return None, f"Error loading model: {str(e)}\n\nDetails: {error_detail[:500]}"
def _ensure_model_loaded_gpu(model_path: str, progress_fn):
"""Ensure model is loaded, inside GPU context only."""
global _GPU_MODEL, _GPU_MODEL_PATH
if _GPU_MODEL is None or _GPU_MODEL_PATH != model_path:
progress_fn(0.05, desc="Loading model (first run)...")
model, msg = _load_model_gpu_only(model_path, progress_fn)
if model is None:
return None, msg
return model, msg
return _GPU_MODEL, "Model already loaded"
def _generate_voice_gpu_only(
model_path: str,
cache_state: Optional[Tuple],
ref_audio: str,
ref_txt: str,
synth_txt: str,
lang: str,
chk: bool,
size: int,
fast: bool,
strm: bool,
progress_fn
) -> Tuple[str, Optional[Tuple], str]:
"""
Generate voice inside GPU context only.
NEVER call this from main process!
"""
# Helper to move objects between devices safely
def move_to_device(obj, device):
if isinstance(obj, torch.Tensor):
return obj.to(device)
elif hasattr(obj, 'to'):
try:
return obj.to(device)
except Exception:
# If it doesn't support .to(), return as is
return obj
return obj
# Step 1: Ensure model is loaded (inside GPU context)
progress_fn(0, desc="Checking/loading model...")
model, status_msg = _ensure_model_loaded_gpu(model_path, progress_fn)
if model is None:
return None, cache_state, f"Model loading failed: {status_msg}"
# Validate inputs
if not ref_audio or not os.path.exists(ref_audio):
return None, cache_state, f"Error: Reference audio file not found: {ref_audio}"
if not ref_txt or not ref_txt.strip():
return None, cache_state, "Error: Reference text is empty."
if not synth_txt or not synth_txt.strip():
return None, cache_state, "Error: Synthesis text is empty."
try:
# Step 2: Create voice clone prompt
current_cache_valid = False
if cache_state:
cached_prompt, c_audio, c_text, c_fast = cache_state
if (c_audio == ref_audio and
c_text == ref_txt and
c_fast == fast):
current_cache_valid = True
voice_clone_prompt = None
if current_cache_valid:
progress_fn(0.1, desc="Using cached voice prompt...")
# CRITICAL FIX: Move cached prompt from CPU to GPU for generation
voice_clone_prompt = move_to_device(cache_state[0], "cuda")
else:
progress_fn(0.1, desc="Creating voice clone prompt from reference audio...")
print(f"Creating voice clone with ref_audio={ref_audio}, fast_mode={fast}")
try:
voice_clone_prompt = model.create_voice_clone_prompt(
ref_audio=ref_audio,
ref_text=ref_txt if not fast else None,
x_vector_only_mode=fast,
)
# CRITICAL FIX: Move prompt to CPU before caching to prevent CUDA init in main process
voice_clone_prompt_cpu = move_to_device(voice_clone_prompt, "cpu")
cache_state = (voice_clone_prompt_cpu, ref_audio, ref_txt, fast)
print("Voice clone prompt created successfully and moved to CPU for caching")
except Exception as e:
print(f"Error creating voice clone prompt: {e}")
import traceback
print(traceback.format_exc())
return None, cache_state, f"Error creating voice clone prompt: {str(e)}"
# Step 3: Process text
progress_fn(0.2, desc="Processing text...")
if chk:
chunks = chunk_text(synth_txt, max_chars=min(max(size, 50), 1000))
else:
chunks = [synth_txt]
print(f"Text split into {len(chunks)} chunks")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"voice_clone_{timestamp}.wav"
progress_fn(0.4, desc="Generating audio with model...")
with torch.inference_mode():
if len(chunks) > 1:
progress_fn(0.45, desc=f"Generating batch audio for {len(chunks)} chunks...")
languages = [lang] * len(chunks)
print(f"Calling generate_voice_clone with {len(chunks)} chunks")
wavs, sr = model.generate_voice_clone(
text=chunks,
language=languages,
voice_clone_prompt=voice_clone_prompt,
)
progress_fn(0.8, desc="Post-processing and combining audio chunks...")
silence_duration = 0.1
silence_samples = int(silence_duration * sr)
combined_audio = []
for i, wav in enumerate(wavs):
processed_wav = trim_and_fade(wav, i == 0, i == len(wavs) - 1, sr)
combined_audio.append(processed_wav)
if i < len(wavs) - 1:
combined_audio.append(np.zeros(silence_samples))
final_audio = np.concatenate(combined_audio)
sf.write(output_file, final_audio, sr)
print(f"Saved multi-chunk audio to {output_file}")
else:
progress_fn(0.5, desc="Generating audio for single chunk...")
print(f"Calling generate_voice_clone with single chunk: {chunks[0][:50]}...")
wavs, sr = model.generate_voice_clone(
text=chunks[0],
language=lang,
voice_clone_prompt=voice_clone_prompt,
)
sf.write(output_file, wavs[0], sr)
print(f"Saved single-chunk audio to {output_file}")
progress_fn(1.0, desc="Done!")
return output_file, cache_state, f"Success! Audio saved to {output_file} ({len(chunks)} chunk(s))"
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"Generation error detail:\n{error_detail}")
return None, cache_state, f"Generation error: {str(e)}\n\nPlease check the console logs for more details."
# --- GPU WRAPPER: Reduced duration to 100s to fit low quota ---
@spaces.GPU(duration=100)
def gpu_voice_pipeline(
model_path: str,
cache_state: Optional[Tuple],
ref_audio: str,
ref_txt: str,
synth_txt: str,
lang: str,
chk: bool,
size: int,
fast: bool,
strm: bool
):
"""
SINGLE GPU entry point. ALL CUDA operations happen inside this function.
This runs in a separate process with GPU access.
"""
# Simple progress that works in subprocess
class SubprocessProgress:
def __init__(self):
self.step = 0
def __call__(self, value, desc=""):
self.step += 1
print(f"[GPU {self.step}] {value:.2f}: {desc}")
progress = SubprocessProgress()
# ALL model loading and generation happens HERE, inside GPU context
return _generate_voice_gpu_only(
model_path, cache_state, ref_audio, ref_txt, synth_txt,
lang, chk, size, fast, strm, progress
)
# --- CPU HANDLERS: These run in main process, NO CUDA here ---
def handle_transcribe(audio_file):
"""Handler für Transkription (CPU)."""
if not audio_file:
return "", "Please upload or record reference audio first."
txt, msg = transcribe_audio_logic(audio_file)
return txt, msg
def handle_generate(model_path, cache, ref_audio, ref_txt, synth_txt, lang, chk, size, fast, strm, progress=gr.Progress()):
"""
Main handler that validates inputs and delegates to GPU function.
NO CUDA operations here!
"""
# Validate inputs before GPU call
if not ref_audio:
return None, cache, "Error: Please upload or record reference audio first."
if not ref_txt or not ref_txt.strip():
return None, cache, "Error: Please enter or transcribe reference text."
if not synth_txt or not synth_txt.strip():
return None, cache, "Error: Please enter text to synthesize."
progress(0, desc="Sending to GPU worker...")
try:
# This call transfers to GPU subprocess - NO CUDA in main process!
result = gpu_voice_pipeline(
model_path, cache, ref_audio, ref_txt, synth_txt,
lang, chk, int(size), fast, strm
)
progress(0.9, desc="Processing result...")
return result
except Exception as e:
import traceback
error_detail = traceback.format_exc()
print(f"Error in handle_generate:\n{error_detail}")
# REMOVED the specific "quota" override to avoid false positives.
# Returning the raw error allows the user to see if it's a Timeout, Queue Issue, or actual Quota problem.
return None, cache, f"Error: {str(e)}\n\nDetails: {error_detail[:1000]}"
# --- Gradio Interface ---
def create_ui():
with gr.Blocks() as demo:
gr.Markdown(
"""
# 🎙️ Qwen3-TTS Voice Cloning Studio
<p style='text-align: center; font-size: 0.9em; color: #666;'>
Built with <a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank">anycoder</a>
</p>
"""
)
# --- Status Box ---
status_box = gr.Textbox(
label="Status / Logs",
interactive=False,
lines=3,
value="Ready. Note: GPU duration is set to 100s. If you have low quota (<100s), please wait for it to refill."
)
# State for caching the voice prompt
prompt_cache = gr.State(None)
# --- Model Path Input (Hidden but needed for the function) ---
model_path_input = gr.Textbox(
label="Model Path",
value=DEFAULT_MODEL_PATH,
visible=False
)
with gr.Row():
with gr.Column(scale=1):
# --- Section 2: Reference Audio ---
gr.Markdown("### 1. Reference Audio")
ref_audio_input = gr.Audio(
label="Upload or Record Reference Audio",
sources=["upload", "microphone"],
type="filepath"
)
with gr.Row():
transcribe_btn = gr.Button("📝 Transcribe Audio", variant="secondary")
ref_text_input = gr.Textbox(
label="Reference Text (Transcribed or Manual)",
placeholder="The text spoken in the reference audio...",
lines=3,
info="Required for cloning. Click 'Transcribe Audio' or type manually."
)
# --- Section 3: Synthesis ---
gr.Markdown("### 2. Text to Synthesize")
synth_text_input = gr.Textbox(
label="Synthesis Text",
placeholder="Enter the text you want the cloned voice to speak...",
lines=5,
info="Required. The text that will be spoken in the cloned voice."
)
# --- Section 4: Settings ---
with gr.Accordion("🛠️ Advanced Settings", open=False):
language = gr.Radio(
choices=["German", "English", "Chinese", "French", "Spanish", "Japanese", "Korean"],
value="German",
label="Language",
info="Language of the synthesis text."
)
use_chunking = gr.Checkbox(
value=True,
label="Use Chunking",
info="Splits long text into smaller segments for better stability."
)
chunk_size = gr.Slider(
minimum=50,
maximum=500,
value=200,
step=10,
label="Max Characters per Chunk",
visible=True
)
fast_mode = gr.Checkbox(
value=True,
label="Fast Mode (x-vector only)",
info="Faster, but might lack prosody details. Recommended for quick tests."
)
stream_mode = gr.Checkbox(
value=False,
label="Stream Mode",
info="Stream output generation (Experimental)."
)
# Generate Button
generate_btn = gr.Button("🚀 Generate Voice Clone", variant="primary", size="lg")
with gr.Column(scale=1):
# --- Section 5: Output ---
gr.Markdown("### 3. Output")
output_audio = gr.Audio(label="Generated Audio", autoplay=False)
gr.Markdown(
"""
<div style="margin-top: 30px; font-size: 0.85em; color: #555; background: #f5f5f5; padding: 15px; border-radius: 8px;">
<p><strong>📋 Instructions:</strong></p>
<ol>
<li>Upload or record reference audio (10-30 seconds recommended)</li>
<li>Click "Transcribe Audio" or type the reference text manually</li>
<li>Enter the text you want to synthesize</li>
<li>Click "Generate Voice Clone"</li>
</ol>
<p><strong>⏱️ Performance Note:</strong> GPU duration is currently set to 100s to fit lower quotas. If the model fails to load in time, please wait for your quota to reset.</p>
</div>
"""
)
# --- Event Listeners ---
# Transcribe Audio
transcribe_btn.click(
fn=handle_transcribe,
inputs=ref_audio_input,
outputs=[ref_text_input, status_box],
api_visibility="public"
)
# Generate Voice
generate_btn.click(
fn=handle_generate,
inputs=[
model_path_input,
prompt_cache,
ref_audio_input,
ref_text_input,
synth_text_input,
language,
use_chunking,
chunk_size,
fast_mode,
stream_mode
],
outputs=[output_audio, prompt_cache, status_box],
api_visibility="public"
)
return demo
# --- Main Entry Point ---
if __name__ == "__main__":
# SAFE startup diagnostics - NO CUDA calls here!
print("=" * 50)
print("Qwen3-TTS Voice Cloning Studio Starting...")
print(f"PyTorch version: {torch.__version__}")
# DEFERRED: Don't check CUDA availability here!
# It will be checked inside the GPU subprocess
print("Note: CUDA status will be checked inside GPU worker process")
print(f"WHISPER_AVAILABLE: {WHISPER_AVAILABLE}")
print(f"LIBROSA_AVAILABLE: {LIBROSA_AVAILABLE}")
print(f"HAS_SPACES: {HAS_SPACES}")
print("=" * 50)
demo = create_ui()
# Gradio 6: ALL app parameters go in launch()!
demo.launch(
server_name="0.0.0.0",
server_port=7860,
theme=gr.themes.Soft(
primary_hue="indigo",
secondary_hue="blue",
),
footer_links=[{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}]
)