| import gradio as gr |
| import torch |
| import soundfile as sf |
| import numpy as np |
| import os |
| import re |
| import warnings |
| from datetime import datetime |
| from typing import Tuple, Optional, Any |
| from huggingface_hub import snapshot_download, login |
|
|
| |
| HF_TOKEN = os.environ.get('HF_TOKEN') |
| if HF_TOKEN: |
| login(token=HF_TOKEN) |
|
|
| |
| try: |
| import spaces |
| HAS_SPACES = True |
| print(f"spaces package found, version: {getattr(spaces, '__version__', 'unknown')}") |
| except ImportError: |
| print("Warning: spaces package not found. Install: pip install spaces") |
| HAS_SPACES = False |
| |
| class spaces: |
| @staticmethod |
| def GPU(func=None, duration=None): |
| if func is None: |
| return lambda f: f |
| return func |
|
|
| |
| try: |
| import whisper |
| WHISPER_AVAILABLE = True |
| print("Whisper available") |
| except ImportError as e: |
| print(f"Whisper not available: {e}") |
| WHISPER_AVAILABLE = False |
|
|
| try: |
| import librosa |
| LIBROSA_AVAILABLE = True |
| print("Librosa available") |
| except ImportError as e: |
| print(f"Librosa not available: {e}") |
| LIBROSA_AVAILABLE = False |
|
|
| |
| |
| _GPU_MODEL = None |
| _GPU_MODEL_PATH = None |
|
|
| |
| DEFAULT_MODEL_PATH = "Qwen/Qwen3-TTS-12Hz-1.7B-Base" |
|
|
| |
|
|
| def chunk_text(text: str, max_chars: int = 200) -> list[str]: |
| """Chunking: Trenne bevorzugt an Absätzen, dann an Satzenden.""" |
| if not text or not text.strip(): |
| return [""] |
| |
| paragraphs = re.split(r"\n\s*\n", text.strip()) |
| chunks = [] |
| current_chunk = "" |
| |
| for para in paragraphs: |
| para = para.strip() |
| if not para: |
| continue |
| |
| sentences = [] |
| remaining = para |
| while remaining: |
| match = re.search(r"[.!?]+(?:\s+|$)", remaining) |
| if match: |
| sentence = remaining[: match.end()].strip() |
| if sentence: |
| sentences.append(sentence) |
| remaining = remaining[match.end() :] |
| else: |
| if remaining.strip(): |
| sentences.append(remaining.strip()) |
| break |
| |
| for s in sentences: |
| s = s.strip() |
| if not s: |
| continue |
| |
| if not current_chunk: |
| current_chunk = s |
| elif len(current_chunk) + 1 + len(s) <= max_chars: |
| current_chunk += " " + s |
| else: |
| if len(current_chunk) < max_chars or not chunks: |
| current_chunk += " " + s |
| chunks.append(current_chunk.strip()) |
| current_chunk = "" |
| else: |
| chunks.append(current_chunk.strip()) |
| current_chunk = s |
| |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| current_chunk = "" |
| |
| chunks = [c for c in chunks if c] |
| return chunks if chunks else [text] |
|
|
| def trim_and_fade(audio: np.ndarray, is_first: bool, is_last: bool, sr: int) -> np.ndarray: |
| """Am Ende jedes Chunks 30ms abschneiden, dann 80ms Stille anhängen.""" |
| result = audio.copy() |
| cut_samples = int(0.03 * sr) |
| if len(result) > cut_samples: |
| result = result[:-cut_samples] |
| |
| silence_samples = int(0.08 * sr) |
| silence = np.zeros(silence_samples) |
| result = np.concatenate([result, silence]) |
| return result |
|
|
| |
|
|
| def transcribe_audio_logic(audio_path: str, progress=None) -> Tuple[str, str]: |
| """Transcribes reference audio using Whisper.""" |
| if not WHISPER_AVAILABLE: |
| return "", "Error: Whisper not installed. Run: pip install openai-whisper" |
| |
| if not audio_path: |
| return "", "Error: No audio file provided." |
| |
| if not os.path.exists(audio_path): |
| return "", f"Error: Audio file not found: {audio_path}" |
|
|
| p = progress if progress is not None else gr.Progress() |
| |
| try: |
| p(0.2, desc="Loading Whisper model...") |
| whisper_model = whisper.load_model("base", device="cpu") |
| |
| p(0.5, desc="Reading audio file...") |
| audio_data, sample_rate = sf.read(audio_path, dtype="float32") |
| |
| if len(audio_data.shape) > 1: |
| audio_data = audio_data.mean(axis=1) |
| audio_data = audio_data.astype(np.float32) |
|
|
| if sample_rate != 16000: |
| if not LIBROSA_AVAILABLE: |
| return "", "Error: librosa not installed for resampling. Run: pip install librosa" |
| audio_data = librosa.resample(audio_data, orig_sr=sample_rate, target_sr=16000) |
|
|
| p(0.8, desc="Transcribing...") |
| result = whisper_model.transcribe(audio_data, fp16=False) |
| transcript = result["text"].strip() |
| |
| del whisper_model |
| |
| return transcript, "Transcription completed successfully!" |
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| print(f"Transcription error:\n{error_detail}") |
| return "", f"Transcription error: {str(e)}" |
|
|
| |
|
|
| def _load_model_gpu_only(model_path: str, progress_fn) -> Tuple[Any, str]: |
| """ |
| Load model inside GPU context only using snapshot_download. |
| NEVER call this from main process! |
| """ |
| global _GPU_MODEL, _GPU_MODEL_PATH |
| |
| |
| try: |
| from qwen_tts import Qwen3TTSModel |
| except ImportError as e: |
| return None, f"Error: qwen_tts library not found: {e}" |
| |
| p = progress_fn |
| |
| |
| p(0.1, desc="Initializing Torch settings for CUDA stability...") |
| |
| torch.backends.cuda.matmul.allow_tf32 = False |
| torch.backends.cudnn.allow_tf32 = False |
| torch.backends.cudnn.benchmark = False |
| torch.backends.cudnn.deterministic = True |
| torch.set_float32_matmul_precision("highest") |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" |
| |
| warnings.filterwarnings("ignore", message=".*flash_attention.*") |
| warnings.filterwarnings("ignore", message=".*FlashAttention.*") |
|
|
| try: |
| |
| p(0.2, desc="Locating model files...") |
| local_model_path = snapshot_download(repo_id=model_path) |
| |
| p(0.3, desc="Loading model weights (this may take a while on first run)...") |
| |
| device = "cuda" |
| print(f"Using device: {device}") |
| |
| capability = torch.cuda.get_device_capability() |
| major, minor = capability |
| print(f"GPU compute capability: {major}.{minor}") |
| print(f"GPU name: {torch.cuda.get_device_name(0)}") |
| |
| |
| dtype = torch.bfloat16 |
| print(f"Using {dtype} for stability and speed") |
|
|
| p(0.4, desc=f"Loading with dtype={dtype}...") |
|
|
| model = Qwen3TTSModel.from_pretrained( |
| local_model_path, |
| device_map=device, |
| dtype=dtype, |
| token=HF_TOKEN, |
| attn_implementation="eager", |
| ) |
|
|
| |
| p(0.9, desc="Model loaded, skipping warmup to save GPU quota...") |
|
|
| _GPU_MODEL = model |
| _GPU_MODEL_PATH = model_path |
| |
| p(1.0, desc="Model loaded successfully!") |
| return model, f"Status: Model Loaded Successfully ✓ (using {dtype} on {device})" |
| |
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| print(f"Detailed error during model loading:\n{error_detail}") |
| return None, f"Error loading model: {str(e)}\n\nDetails: {error_detail[:500]}" |
|
|
| def _ensure_model_loaded_gpu(model_path: str, progress_fn): |
| """Ensure model is loaded, inside GPU context only.""" |
| global _GPU_MODEL, _GPU_MODEL_PATH |
| |
| if _GPU_MODEL is None or _GPU_MODEL_PATH != model_path: |
| progress_fn(0.05, desc="Loading model (first run)...") |
| model, msg = _load_model_gpu_only(model_path, progress_fn) |
| if model is None: |
| return None, msg |
| return model, msg |
| return _GPU_MODEL, "Model already loaded" |
|
|
| def _generate_voice_gpu_only( |
| model_path: str, |
| cache_state: Optional[Tuple], |
| ref_audio: str, |
| ref_txt: str, |
| synth_txt: str, |
| lang: str, |
| chk: bool, |
| size: int, |
| fast: bool, |
| strm: bool, |
| progress_fn |
| ) -> Tuple[str, Optional[Tuple], str]: |
| """ |
| Generate voice inside GPU context only. |
| NEVER call this from main process! |
| """ |
| |
| |
| def move_to_device(obj, device): |
| if isinstance(obj, torch.Tensor): |
| return obj.to(device) |
| elif hasattr(obj, 'to'): |
| try: |
| return obj.to(device) |
| except Exception: |
| |
| return obj |
| return obj |
|
|
| |
| progress_fn(0, desc="Checking/loading model...") |
| model, status_msg = _ensure_model_loaded_gpu(model_path, progress_fn) |
| if model is None: |
| return None, cache_state, f"Model loading failed: {status_msg}" |
| |
| |
| if not ref_audio or not os.path.exists(ref_audio): |
| return None, cache_state, f"Error: Reference audio file not found: {ref_audio}" |
| if not ref_txt or not ref_txt.strip(): |
| return None, cache_state, "Error: Reference text is empty." |
| if not synth_txt or not synth_txt.strip(): |
| return None, cache_state, "Error: Synthesis text is empty." |
|
|
| try: |
| |
| current_cache_valid = False |
| if cache_state: |
| cached_prompt, c_audio, c_text, c_fast = cache_state |
| if (c_audio == ref_audio and |
| c_text == ref_txt and |
| c_fast == fast): |
| current_cache_valid = True |
| |
| voice_clone_prompt = None |
| if current_cache_valid: |
| progress_fn(0.1, desc="Using cached voice prompt...") |
| |
| voice_clone_prompt = move_to_device(cache_state[0], "cuda") |
| else: |
| progress_fn(0.1, desc="Creating voice clone prompt from reference audio...") |
| print(f"Creating voice clone with ref_audio={ref_audio}, fast_mode={fast}") |
| try: |
| voice_clone_prompt = model.create_voice_clone_prompt( |
| ref_audio=ref_audio, |
| ref_text=ref_txt if not fast else None, |
| x_vector_only_mode=fast, |
| ) |
| |
| |
| voice_clone_prompt_cpu = move_to_device(voice_clone_prompt, "cpu") |
| |
| cache_state = (voice_clone_prompt_cpu, ref_audio, ref_txt, fast) |
| print("Voice clone prompt created successfully and moved to CPU for caching") |
| except Exception as e: |
| print(f"Error creating voice clone prompt: {e}") |
| import traceback |
| print(traceback.format_exc()) |
| return None, cache_state, f"Error creating voice clone prompt: {str(e)}" |
|
|
| |
| progress_fn(0.2, desc="Processing text...") |
| if chk: |
| chunks = chunk_text(synth_txt, max_chars=min(max(size, 50), 1000)) |
| else: |
| chunks = [synth_txt] |
|
|
| print(f"Text split into {len(chunks)} chunks") |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| output_file = f"voice_clone_{timestamp}.wav" |
| |
| progress_fn(0.4, desc="Generating audio with model...") |
| |
| with torch.inference_mode(): |
| if len(chunks) > 1: |
| progress_fn(0.45, desc=f"Generating batch audio for {len(chunks)} chunks...") |
| languages = [lang] * len(chunks) |
| |
| print(f"Calling generate_voice_clone with {len(chunks)} chunks") |
| wavs, sr = model.generate_voice_clone( |
| text=chunks, |
| language=languages, |
| voice_clone_prompt=voice_clone_prompt, |
| ) |
| |
| progress_fn(0.8, desc="Post-processing and combining audio chunks...") |
| silence_duration = 0.1 |
| silence_samples = int(silence_duration * sr) |
| combined_audio = [] |
| |
| for i, wav in enumerate(wavs): |
| processed_wav = trim_and_fade(wav, i == 0, i == len(wavs) - 1, sr) |
| combined_audio.append(processed_wav) |
| if i < len(wavs) - 1: |
| combined_audio.append(np.zeros(silence_samples)) |
| |
| final_audio = np.concatenate(combined_audio) |
| sf.write(output_file, final_audio, sr) |
| print(f"Saved multi-chunk audio to {output_file}") |
| |
| else: |
| progress_fn(0.5, desc="Generating audio for single chunk...") |
| print(f"Calling generate_voice_clone with single chunk: {chunks[0][:50]}...") |
| wavs, sr = model.generate_voice_clone( |
| text=chunks[0], |
| language=lang, |
| voice_clone_prompt=voice_clone_prompt, |
| ) |
| sf.write(output_file, wavs[0], sr) |
| print(f"Saved single-chunk audio to {output_file}") |
|
|
| progress_fn(1.0, desc="Done!") |
| return output_file, cache_state, f"Success! Audio saved to {output_file} ({len(chunks)} chunk(s))" |
|
|
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| print(f"Generation error detail:\n{error_detail}") |
| return None, cache_state, f"Generation error: {str(e)}\n\nPlease check the console logs for more details." |
|
|
| |
| @spaces.GPU(duration=100) |
| def gpu_voice_pipeline( |
| model_path: str, |
| cache_state: Optional[Tuple], |
| ref_audio: str, |
| ref_txt: str, |
| synth_txt: str, |
| lang: str, |
| chk: bool, |
| size: int, |
| fast: bool, |
| strm: bool |
| ): |
| """ |
| SINGLE GPU entry point. ALL CUDA operations happen inside this function. |
| This runs in a separate process with GPU access. |
| """ |
| |
| class SubprocessProgress: |
| def __init__(self): |
| self.step = 0 |
| def __call__(self, value, desc=""): |
| self.step += 1 |
| print(f"[GPU {self.step}] {value:.2f}: {desc}") |
| |
| progress = SubprocessProgress() |
| |
| |
| return _generate_voice_gpu_only( |
| model_path, cache_state, ref_audio, ref_txt, synth_txt, |
| lang, chk, size, fast, strm, progress |
| ) |
|
|
| |
|
|
| def handle_transcribe(audio_file): |
| """Handler für Transkription (CPU).""" |
| if not audio_file: |
| return "", "Please upload or record reference audio first." |
| txt, msg = transcribe_audio_logic(audio_file) |
| return txt, msg |
|
|
| def handle_generate(model_path, cache, ref_audio, ref_txt, synth_txt, lang, chk, size, fast, strm, progress=gr.Progress()): |
| """ |
| Main handler that validates inputs and delegates to GPU function. |
| NO CUDA operations here! |
| """ |
| |
| |
| if not ref_audio: |
| return None, cache, "Error: Please upload or record reference audio first." |
| if not ref_txt or not ref_txt.strip(): |
| return None, cache, "Error: Please enter or transcribe reference text." |
| if not synth_txt or not synth_txt.strip(): |
| return None, cache, "Error: Please enter text to synthesize." |
| |
| progress(0, desc="Sending to GPU worker...") |
| |
| try: |
| |
| result = gpu_voice_pipeline( |
| model_path, cache, ref_audio, ref_txt, synth_txt, |
| lang, chk, int(size), fast, strm |
| ) |
| progress(0.9, desc="Processing result...") |
| return result |
| except Exception as e: |
| import traceback |
| error_detail = traceback.format_exc() |
| print(f"Error in handle_generate:\n{error_detail}") |
| |
| |
| |
| return None, cache, f"Error: {str(e)}\n\nDetails: {error_detail[:1000]}" |
|
|
| |
|
|
| def create_ui(): |
| with gr.Blocks() as demo: |
| gr.Markdown( |
| """ |
| # 🎙️ Qwen3-TTS Voice Cloning Studio |
| <p style='text-align: center; font-size: 0.9em; color: #666;'> |
| Built with <a href="https://huggingface.co/spaces/akhaliq/anycoder" target="_blank">anycoder</a> |
| </p> |
| """ |
| ) |
| |
| |
| status_box = gr.Textbox( |
| label="Status / Logs", |
| interactive=False, |
| lines=3, |
| value="Ready. Note: GPU duration is set to 100s. If you have low quota (<100s), please wait for it to refill." |
| ) |
| |
| |
| prompt_cache = gr.State(None) |
| |
| |
| model_path_input = gr.Textbox( |
| label="Model Path", |
| value=DEFAULT_MODEL_PATH, |
| visible=False |
| ) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| gr.Markdown("### 1. Reference Audio") |
| ref_audio_input = gr.Audio( |
| label="Upload or Record Reference Audio", |
| sources=["upload", "microphone"], |
| type="filepath" |
| ) |
| with gr.Row(): |
| transcribe_btn = gr.Button("📝 Transcribe Audio", variant="secondary") |
| |
| ref_text_input = gr.Textbox( |
| label="Reference Text (Transcribed or Manual)", |
| placeholder="The text spoken in the reference audio...", |
| lines=3, |
| info="Required for cloning. Click 'Transcribe Audio' or type manually." |
| ) |
|
|
| |
| gr.Markdown("### 2. Text to Synthesize") |
| synth_text_input = gr.Textbox( |
| label="Synthesis Text", |
| placeholder="Enter the text you want the cloned voice to speak...", |
| lines=5, |
| info="Required. The text that will be spoken in the cloned voice." |
| ) |
| |
| |
| with gr.Accordion("🛠️ Advanced Settings", open=False): |
| language = gr.Radio( |
| choices=["German", "English", "Chinese", "French", "Spanish", "Japanese", "Korean"], |
| value="German", |
| label="Language", |
| info="Language of the synthesis text." |
| ) |
| use_chunking = gr.Checkbox( |
| value=True, |
| label="Use Chunking", |
| info="Splits long text into smaller segments for better stability." |
| ) |
| chunk_size = gr.Slider( |
| minimum=50, |
| maximum=500, |
| value=200, |
| step=10, |
| label="Max Characters per Chunk", |
| visible=True |
| ) |
| fast_mode = gr.Checkbox( |
| value=True, |
| label="Fast Mode (x-vector only)", |
| info="Faster, but might lack prosody details. Recommended for quick tests." |
| ) |
| stream_mode = gr.Checkbox( |
| value=False, |
| label="Stream Mode", |
| info="Stream output generation (Experimental)." |
| ) |
| |
| |
| generate_btn = gr.Button("🚀 Generate Voice Clone", variant="primary", size="lg") |
|
|
| with gr.Column(scale=1): |
| |
| gr.Markdown("### 3. Output") |
| output_audio = gr.Audio(label="Generated Audio", autoplay=False) |
| |
| gr.Markdown( |
| """ |
| <div style="margin-top: 30px; font-size: 0.85em; color: #555; background: #f5f5f5; padding: 15px; border-radius: 8px;"> |
| <p><strong>📋 Instructions:</strong></p> |
| <ol> |
| <li>Upload or record reference audio (10-30 seconds recommended)</li> |
| <li>Click "Transcribe Audio" or type the reference text manually</li> |
| <li>Enter the text you want to synthesize</li> |
| <li>Click "Generate Voice Clone"</li> |
| </ol> |
| <p><strong>⏱️ Performance Note:</strong> GPU duration is currently set to 100s to fit lower quotas. If the model fails to load in time, please wait for your quota to reset.</p> |
| </div> |
| """ |
| ) |
|
|
| |
| |
| |
| transcribe_btn.click( |
| fn=handle_transcribe, |
| inputs=ref_audio_input, |
| outputs=[ref_text_input, status_box], |
| api_visibility="public" |
| ) |
| |
| |
| generate_btn.click( |
| fn=handle_generate, |
| inputs=[ |
| model_path_input, |
| prompt_cache, |
| ref_audio_input, |
| ref_text_input, |
| synth_text_input, |
| language, |
| use_chunking, |
| chunk_size, |
| fast_mode, |
| stream_mode |
| ], |
| outputs=[output_audio, prompt_cache, status_box], |
| api_visibility="public" |
| ) |
|
|
| return demo |
|
|
| |
| if __name__ == "__main__": |
| |
| print("=" * 50) |
| print("Qwen3-TTS Voice Cloning Studio Starting...") |
| print(f"PyTorch version: {torch.__version__}") |
| |
| |
| print("Note: CUDA status will be checked inside GPU worker process") |
| print(f"WHISPER_AVAILABLE: {WHISPER_AVAILABLE}") |
| print(f"LIBROSA_AVAILABLE: {LIBROSA_AVAILABLE}") |
| print(f"HAS_SPACES: {HAS_SPACES}") |
| print("=" * 50) |
| |
| demo = create_ui() |
| |
| |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| theme=gr.themes.Soft( |
| primary_hue="indigo", |
| secondary_hue="blue", |
| ), |
| footer_links=[{"label": "Built with anycoder", "url": "https://huggingface.co/spaces/akhaliq/anycoder"}] |
| ) |