|
|
import argparse |
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import List |
|
|
import numpy as np |
|
|
import soundfile as sf |
|
|
import torch |
|
|
from vieneu_tts import VieNeuTTS |
|
|
|
|
|
|
|
|
def split_text_into_chunks(text: str, max_chars: int = 256) -> List[str]: |
|
|
""" |
|
|
Split raw text into chunks no longer than max_chars. |
|
|
Preference is given to sentence boundaries; otherwise falls back to word-based splitting. |
|
|
""" |
|
|
sentences = re.split(r"(?<=[\.\!\?\…])\s+", text.strip()) |
|
|
chunks: List[str] = [] |
|
|
buffer = "" |
|
|
|
|
|
def flush_buffer(): |
|
|
nonlocal buffer |
|
|
if buffer: |
|
|
chunks.append(buffer.strip()) |
|
|
buffer = "" |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
|
|
|
if len(sentence) <= max_chars: |
|
|
candidate = f"{buffer} {sentence}".strip() if buffer else sentence |
|
|
if len(candidate) <= max_chars: |
|
|
buffer = candidate |
|
|
else: |
|
|
flush_buffer() |
|
|
buffer = sentence |
|
|
continue |
|
|
|
|
|
|
|
|
flush_buffer() |
|
|
words = sentence.split() |
|
|
current = "" |
|
|
for word in words: |
|
|
candidate = f"{current} {word}".strip() if current else word |
|
|
if len(candidate) > max_chars and current: |
|
|
chunks.append(current.strip()) |
|
|
current = word |
|
|
else: |
|
|
current = candidate |
|
|
if current: |
|
|
chunks.append(current.strip()) |
|
|
|
|
|
flush_buffer() |
|
|
return [chunk for chunk in chunks if chunk] |
|
|
|
|
|
|
|
|
def infer_long_text( |
|
|
text: str, |
|
|
ref_audio_path: str, |
|
|
ref_text_path: str, |
|
|
output_path: str, |
|
|
chunk_dir: str | None = None, |
|
|
max_chars: int = 256, |
|
|
backbone_repo: str = "pnnbao-ump/VieNeu-TTS", |
|
|
codec_repo: str = "neuphonic/neucodec", |
|
|
device: str | None = None, |
|
|
) -> str: |
|
|
""" |
|
|
Generate speech for long-form text by chunking into manageable segments. |
|
|
|
|
|
Returns: |
|
|
The path to the combined audio file. |
|
|
""" |
|
|
|
|
|
device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
if device not in {"cuda", "cpu"}: |
|
|
raise ValueError("Device must be either 'cuda' or 'cpu'.") |
|
|
|
|
|
raw_text = text.strip() |
|
|
if not raw_text: |
|
|
raise ValueError("Input text is empty.") |
|
|
|
|
|
chunks = split_text_into_chunks(raw_text, max_chars=max_chars) |
|
|
if not chunks: |
|
|
raise ValueError("Text could not be segmented into valid chunks.") |
|
|
|
|
|
print(f"📄 Total chunks: {len(chunks)} (≤ {max_chars} chars each)") |
|
|
|
|
|
if chunk_dir: |
|
|
os.makedirs(chunk_dir, exist_ok=True) |
|
|
|
|
|
ref_text_raw = Path(ref_text_path).read_text(encoding="utf-8") |
|
|
|
|
|
tts = VieNeuTTS( |
|
|
backbone_repo=backbone_repo, |
|
|
backbone_device=device, |
|
|
codec_repo=codec_repo, |
|
|
codec_device=device, |
|
|
) |
|
|
|
|
|
print("🎧 Encoding reference audio...") |
|
|
ref_codes = tts.encode_reference(ref_audio_path) |
|
|
|
|
|
generated_segments: List[np.ndarray] = [] |
|
|
|
|
|
for idx, chunk in enumerate(chunks, start=1): |
|
|
print(f"🎙️ Chunk {idx}/{len(chunks)} | {len(chunk)} chars") |
|
|
wav = tts.infer(chunk, ref_codes, ref_text_raw) |
|
|
generated_segments.append(wav) |
|
|
|
|
|
if chunk_dir: |
|
|
chunk_path = os.path.join(chunk_dir, f"chunk_{idx:03d}.wav") |
|
|
sf.write(chunk_path, wav, 24_000) |
|
|
|
|
|
combined_audio = np.concatenate(generated_segments) |
|
|
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) |
|
|
sf.write(output_path, combined_audio, 24_000) |
|
|
|
|
|
print(f"✅ Saved combined audio to: {output_path}") |
|
|
return output_path |
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace: |
|
|
parser = argparse.ArgumentParser(description="Infer long text with VieNeu-TTS") |
|
|
text_group = parser.add_mutually_exclusive_group(required=True) |
|
|
text_group.add_argument( |
|
|
"--text", |
|
|
help="Raw UTF-8 text content to synthesize.", |
|
|
) |
|
|
text_group.add_argument( |
|
|
"--text-file", |
|
|
help="Path to a UTF-8 text file to synthesize.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--ref-audio", |
|
|
default="./sample/Vĩnh (nam miền Nam).wav", |
|
|
help="Path to reference audio (.wav). Default: ./sample/Vĩnh (nam miền Nam).wav" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--ref-text", |
|
|
default="./sample/Vĩnh (nam miền Nam).txt", |
|
|
help="Path to reference text (UTF-8). Default: ./sample/Vĩnh (nam miền Nam).txt" |
|
|
) |
|
|
parser.add_argument( |
|
|
"--output", |
|
|
default="./output_audio/long_text.wav", |
|
|
help="Path to save the combined audio output.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--chunk-output-dir", |
|
|
default=None, |
|
|
help="Optional directory to save individual chunk audio files.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--max-chars", |
|
|
type=int, |
|
|
default=256, |
|
|
help="Maximum characters per chunk before TTS inference.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--device", |
|
|
choices=["auto", "cuda", "cpu"], |
|
|
default="auto", |
|
|
help="Device to run inference on (auto=CUDA if available).", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--backbone", |
|
|
default="pnnbao-ump/VieNeu-TTS", |
|
|
help="Backbone repository ID or local path.", |
|
|
) |
|
|
parser.add_argument( |
|
|
"--codec", |
|
|
default="neuphonic/neucodec", |
|
|
help="Codec repository ID or local path.", |
|
|
) |
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = parse_args() |
|
|
ref_audio_path = Path(args.ref_audio) |
|
|
if not ref_audio_path.exists(): |
|
|
raise FileNotFoundError(f"Reference audio not found: {ref_audio_path}") |
|
|
|
|
|
ref_text_path = Path(args.ref_text) |
|
|
if not ref_text_path.exists(): |
|
|
raise FileNotFoundError(f"Reference text not found: {ref_text_path}") |
|
|
|
|
|
if args.text_file: |
|
|
text_path = Path(args.text_file) |
|
|
if not text_path.exists(): |
|
|
raise FileNotFoundError(f"Text file not found: {text_path}") |
|
|
raw_text = text_path.read_text(encoding="utf-8") |
|
|
else: |
|
|
raw_text = args.text.strip() |
|
|
if not raw_text: |
|
|
raise ValueError("Provided text is empty.") |
|
|
device = ( |
|
|
"cuda" |
|
|
if args.device == "auto" and torch.cuda.is_available() |
|
|
else ("cpu" if args.device == "auto" else args.device) |
|
|
) |
|
|
|
|
|
infer_long_text( |
|
|
text=raw_text, |
|
|
ref_audio_path=str(ref_audio_path), |
|
|
ref_text_path=str(ref_text_path), |
|
|
output_path=args.output, |
|
|
chunk_dir=args.chunk_output_dir, |
|
|
max_chars=args.max_chars, |
|
|
backbone_repo=args.backbone, |
|
|
codec_repo=args.codec, |
|
|
device=device, |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|