Spaces:
Running
on
Zero
Running
on
Zero
| import argparse | |
| import os | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import List | |
| import numpy as np | |
| import soundfile as sf | |
| import torch | |
| from vieneu_tts import VieNeuTTS | |
| def split_text_into_chunks(text: str, max_chars: int = 256) -> List[str]: | |
| """ | |
| Split raw text into chunks no longer than max_chars. | |
| Preference is given to sentence boundaries; otherwise falls back to word-based splitting. | |
| """ | |
| sentences = re.split(r"(?<=[\.\!\?\…])\s+", text.strip()) | |
| chunks: List[str] = [] | |
| buffer = "" | |
| def flush_buffer(): | |
| nonlocal buffer | |
| if buffer: | |
| chunks.append(buffer.strip()) | |
| buffer = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # If single sentence already fits, try to append to current buffer | |
| if len(sentence) <= max_chars: | |
| candidate = f"{buffer} {sentence}".strip() if buffer else sentence | |
| if len(candidate) <= max_chars: | |
| buffer = candidate | |
| else: | |
| flush_buffer() | |
| buffer = sentence | |
| continue | |
| # Fallback: sentence too long, break by words | |
| flush_buffer() | |
| words = sentence.split() | |
| current = "" | |
| for word in words: | |
| candidate = f"{current} {word}".strip() if current else word | |
| if len(candidate) > max_chars and current: | |
| chunks.append(current.strip()) | |
| current = word | |
| else: | |
| current = candidate | |
| if current: | |
| chunks.append(current.strip()) | |
| flush_buffer() | |
| return [chunk for chunk in chunks if chunk] | |
| def infer_long_text( | |
| text: str, | |
| ref_audio_path: str, | |
| ref_text_path: str, | |
| output_path: str, | |
| chunk_dir: str | None = None, | |
| max_chars: int = 256, | |
| backbone_repo: str = "pnnbao-ump/VieNeu-TTS", | |
| codec_repo: str = "neuphonic/neucodec", | |
| device: str | None = None, | |
| ) -> str: | |
| """ | |
| Generate speech for long-form text by chunking into manageable segments. | |
| Returns: | |
| The path to the combined audio file. | |
| """ | |
| device = device or ("cuda" if torch.cuda.is_available() else "cpu") | |
| if device not in {"cuda", "cpu"}: | |
| raise ValueError("Device must be either 'cuda' or 'cpu'.") | |
| raw_text = text.strip() | |
| if not raw_text: | |
| raise ValueError("Input text is empty.") | |
| chunks = split_text_into_chunks(raw_text, max_chars=max_chars) | |
| if not chunks: | |
| raise ValueError("Text could not be segmented into valid chunks.") | |
| print(f"📄 Total chunks: {len(chunks)} (≤ {max_chars} chars each)") | |
| if chunk_dir: | |
| os.makedirs(chunk_dir, exist_ok=True) | |
| ref_text_raw = Path(ref_text_path).read_text(encoding="utf-8") | |
| tts = VieNeuTTS( | |
| backbone_repo=backbone_repo, | |
| backbone_device=device, | |
| codec_repo=codec_repo, | |
| codec_device=device, | |
| ) | |
| print("🎧 Encoding reference audio...") | |
| ref_codes = tts.encode_reference(ref_audio_path) | |
| generated_segments: List[np.ndarray] = [] | |
| for idx, chunk in enumerate(chunks, start=1): | |
| print(f"🎙️ Chunk {idx}/{len(chunks)} | {len(chunk)} chars") | |
| wav = tts.infer(chunk, ref_codes, ref_text_raw) | |
| generated_segments.append(wav) | |
| if chunk_dir: | |
| chunk_path = os.path.join(chunk_dir, f"chunk_{idx:03d}.wav") | |
| sf.write(chunk_path, wav, 24_000) | |
| combined_audio = np.concatenate(generated_segments) | |
| os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) | |
| sf.write(output_path, combined_audio, 24_000) | |
| print(f"✅ Saved combined audio to: {output_path}") | |
| return output_path | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Infer long text with VieNeu-TTS") | |
| text_group = parser.add_mutually_exclusive_group(required=True) | |
| text_group.add_argument( | |
| "--text", | |
| help="Raw UTF-8 text content to synthesize.", | |
| ) | |
| text_group.add_argument( | |
| "--text-file", | |
| help="Path to a UTF-8 text file to synthesize.", | |
| ) | |
| parser.add_argument( | |
| "--ref-audio", | |
| default="./sample/Vĩnh (nam miền Nam).wav", | |
| help="Path to reference audio (.wav). Default: ./sample/Vĩnh (nam miền Nam).wav" | |
| ) | |
| parser.add_argument( | |
| "--ref-text", | |
| default="./sample/Vĩnh (nam miền Nam).txt", | |
| help="Path to reference text (UTF-8). Default: ./sample/Vĩnh (nam miền Nam).txt" | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| default="./output_audio/long_text.wav", | |
| help="Path to save the combined audio output.", | |
| ) | |
| parser.add_argument( | |
| "--chunk-output-dir", | |
| default=None, | |
| help="Optional directory to save individual chunk audio files.", | |
| ) | |
| parser.add_argument( | |
| "--max-chars", | |
| type=int, | |
| default=256, | |
| help="Maximum characters per chunk before TTS inference.", | |
| ) | |
| parser.add_argument( | |
| "--device", | |
| choices=["auto", "cuda", "cpu"], | |
| default="auto", | |
| help="Device to run inference on (auto=CUDA if available).", | |
| ) | |
| parser.add_argument( | |
| "--backbone", | |
| default="pnnbao-ump/VieNeu-TTS", | |
| help="Backbone repository ID or local path.", | |
| ) | |
| parser.add_argument( | |
| "--codec", | |
| default="neuphonic/neucodec", | |
| help="Codec repository ID or local path.", | |
| ) | |
| return parser.parse_args() | |
| def main(): | |
| args = parse_args() | |
| ref_audio_path = Path(args.ref_audio) | |
| if not ref_audio_path.exists(): | |
| raise FileNotFoundError(f"Reference audio not found: {ref_audio_path}") | |
| ref_text_path = Path(args.ref_text) | |
| if not ref_text_path.exists(): | |
| raise FileNotFoundError(f"Reference text not found: {ref_text_path}") | |
| if args.text_file: | |
| text_path = Path(args.text_file) | |
| if not text_path.exists(): | |
| raise FileNotFoundError(f"Text file not found: {text_path}") | |
| raw_text = text_path.read_text(encoding="utf-8") | |
| else: | |
| raw_text = args.text.strip() | |
| if not raw_text: | |
| raise ValueError("Provided text is empty.") | |
| device = ( | |
| "cuda" | |
| if args.device == "auto" and torch.cuda.is_available() | |
| else ("cpu" if args.device == "auto" else args.device) | |
| ) | |
| infer_long_text( | |
| text=raw_text, | |
| ref_audio_path=str(ref_audio_path), | |
| ref_text_path=str(ref_text_path), | |
| output_path=args.output, | |
| chunk_dir=args.chunk_output_dir, | |
| max_chars=args.max_chars, | |
| backbone_repo=args.backbone, | |
| codec_repo=args.codec, | |
| device=device, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |