|
|
|
|
|
""" |
|
|
F5-TTS Voice Cloning Script (Portuguese/Multi-lingual) |
|
|
Wraps AgentF5TTSChunk for convenient CLI usage. |
|
|
|
|
|
Usage: |
|
|
Single mode: python voice_clone.py --text "Olá mundo" --ref-audio voice.wav --checkpoint models/model.safetensors |
|
|
Batch mode: python voice_clone.py --srt subtitles.srt --ref-dir ./speakers --checkpoint models/model.safetensors |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
import logging |
|
|
import torch |
|
|
from typing import List, Dict, Optional, Tuple |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(levelname)s - %(message)s', |
|
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
|
) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
|
from tqdm import tqdm |
|
|
except ImportError: |
|
|
|
|
|
def tqdm(iterable, **kwargs): |
|
|
return iterable |
|
|
|
|
|
try: |
|
|
from AgentF5TTSChunk import AgentF5TTS |
|
|
except ImportError: |
|
|
|
|
|
sys.path.append(os.getcwd()) |
|
|
try: |
|
|
from AgentF5TTSChunk import AgentF5TTS |
|
|
except ImportError: |
|
|
logger.error("Error: AgentF5TTSChunk.py not found.") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
def parse_srt(srt_file: str) -> List[Dict]: |
|
|
""" |
|
|
Parse SRT file and extract subtitle entries |
|
|
Returns list of dicts with 'id', 'start', 'end', 'text' |
|
|
""" |
|
|
logger.info(f"Parsing SRT file: {srt_file}") |
|
|
|
|
|
with open(srt_file, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
|
|
|
content = content.replace('\r\n', '\n') |
|
|
|
|
|
|
|
|
blocks = re.split(r'\n{2,}', content.strip()) |
|
|
|
|
|
subtitles = [] |
|
|
for block in blocks: |
|
|
lines = [l.strip() for l in block.split('\n') if l.strip()] |
|
|
if len(lines) >= 2: |
|
|
try: |
|
|
|
|
|
if lines[0].isdigit(): |
|
|
subtitle_id = int(lines[0]) |
|
|
timestamp_line_idx = 1 |
|
|
else: |
|
|
|
|
|
subtitle_id = len(subtitles) + 1 |
|
|
timestamp_line_idx = 0 |
|
|
if '-->' not in lines[0]: |
|
|
logger.warning(f"Skipping malformed block (no timestamp): {block[:50]}...") |
|
|
continue |
|
|
|
|
|
timestamp = lines[timestamp_line_idx] |
|
|
|
|
|
text = ' '.join(lines[timestamp_line_idx + 1:]).strip() |
|
|
|
|
|
if text: |
|
|
subtitles.append({ |
|
|
'id': subtitle_id, |
|
|
'timestamp': timestamp, |
|
|
'text': text |
|
|
}) |
|
|
except (ValueError, IndexError) as e: |
|
|
logger.warning(f"Skipping malformed block: {block[:50]}... Error: {e}") |
|
|
continue |
|
|
|
|
|
logger.info(f"Parsed {len(subtitles)} subtitle entries") |
|
|
return subtitles |
|
|
|
|
|
|
|
|
def find_reference_audio(reference_dir: str, subtitle_id: int, audio_prefix: str = 'segment') -> Optional[str]: |
|
|
""" |
|
|
Fallback: Find reference audio by ID (e.g., segment_001.wav) |
|
|
""" |
|
|
if not reference_dir: |
|
|
return None |
|
|
|
|
|
patterns = [ |
|
|
f"{audio_prefix}_{subtitle_id:03d}.wav", |
|
|
f"{audio_prefix}_{subtitle_id:03d}.mp3", |
|
|
f"{audio_prefix}_{subtitle_id:03d}.MP4", |
|
|
f"{audio_prefix}_{subtitle_id}.wav", |
|
|
f"{audio_prefix}_{subtitle_id}.mp3", |
|
|
f"{audio_prefix}_{subtitle_id}.MP4", |
|
|
f"{audio_prefix}{subtitle_id:03d}.wav", |
|
|
f"{audio_prefix}{subtitle_id:03d}.mp3", |
|
|
f"{audio_prefix}{subtitle_id:03d}.MP4", |
|
|
] |
|
|
|
|
|
for pattern in patterns: |
|
|
audio_path = os.path.join(reference_dir, pattern) |
|
|
if os.path.exists(audio_path): |
|
|
return audio_path |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
def resolve_speaker_ref(agent: AgentF5TTS, text: str, reference_dir: str, default_ref: Optional[str] = None) -> Tuple[str, Optional[str]]: |
|
|
""" |
|
|
Use agent's logic to parse speaker/emotion, then resolve file. |
|
|
""" |
|
|
|
|
|
|
|
|
speaker, emotion = agent._determine_speaker_emotion(text) |
|
|
|
|
|
|
|
|
clean_text = re.sub(r'\[speaker:.*?\]\s*', '', text).strip() |
|
|
|
|
|
ref_audio = default_ref |
|
|
|
|
|
if speaker and reference_dir: |
|
|
|
|
|
candidates = [] |
|
|
if emotion and emotion != "neutral": |
|
|
candidates.append(f"{speaker}_{emotion}.wav") |
|
|
candidates.append(f"{speaker}_{emotion}.mp3") |
|
|
|
|
|
candidates.append(f"{speaker}.wav") |
|
|
candidates.append(f"{speaker}.mp3") |
|
|
|
|
|
|
|
|
if emotion and emotion != "neutral": |
|
|
candidates.append(f"{speaker.lower()}_{emotion.lower()}.wav") |
|
|
candidates.append(f"{speaker.lower()}.wav") |
|
|
|
|
|
found = False |
|
|
for cand in candidates: |
|
|
path = os.path.join(reference_dir, cand) |
|
|
if os.path.exists(path): |
|
|
ref_audio = path |
|
|
found = True |
|
|
break |
|
|
|
|
|
if found: |
|
|
logger.debug(f"Role matched: {os.path.basename(ref_audio)} (Speaker: {speaker}, Emotion: {emotion})") |
|
|
|
|
|
return clean_text, ref_audio |
|
|
|
|
|
|
|
|
def parse_args(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description='F5-TTS Voice Cloning Script (Wraps AgentF5TTS)', |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
EXAMPLES: |
|
|
# Single Mode |
|
|
python voice_clone.py --text "Olá, tudo bem?" --ref-audio ref.wav --checkpoint models/model.safetensors |
|
|
|
|
|
# Batch Mode (SRT) |
|
|
python voice_clone.py --srt subs.srt --ref-dir ./speakers --checkpoint models/model.safetensors |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
mode_group = parser.add_mutually_exclusive_group(required=True) |
|
|
mode_group.add_argument('--text', type=str, help='Text to synthesize') |
|
|
mode_group.add_argument('--srt', type=str, help='Path to SRT subtitle file') |
|
|
|
|
|
|
|
|
ref_group = parser.add_mutually_exclusive_group() |
|
|
ref_group.add_argument('--ref-audio', type=str, help='[Single] Reference audio path') |
|
|
ref_group.add_argument('--ref-dir', type=str, help='[Batch] Directory with reference audios (speakers or segments)') |
|
|
|
|
|
ref_group.add_argument('--reference-dir', dest='ref_dir', help=argparse.SUPPRESS) |
|
|
|
|
|
|
|
|
parser.add_argument('--ref-text', type=str, default="", help='Reference text for the reference audio (optional)') |
|
|
|
|
|
|
|
|
parser.add_argument('--checkpoint', type=str, required=True, help='Path to F5-TTS safetensors checkpoint') |
|
|
parser.add_argument('--vocoder', type=str, default='vocos', choices=['vocos', 'bigvgan'], help='Vocoder type') |
|
|
parser.add_argument('--device', type=str, default=None, help='Device (cuda:0, cpu, mps)') |
|
|
parser.add_argument('--speed', type=float, default=1.0, help='Speed factor for speech generation (default: 1.0)') |
|
|
|
|
|
|
|
|
parser.add_argument('--output', type=str, default='outputs', help='Output directory') |
|
|
parser.add_argument('--output-prefix', type=str, default='clone', help='Output filename prefix') |
|
|
parser.add_argument('--skip-existing', action='store_true', help='Skip existing output files') |
|
|
|
|
|
|
|
|
parser.add_argument('--audio-prefix', type=str, default='segment', help='Prefix for ID-based reference lookup') |
|
|
|
|
|
return parser.parse_args() |
|
|
|
|
|
|
|
|
def main(): |
|
|
args = parse_args() |
|
|
|
|
|
|
|
|
if args.device: |
|
|
device = args.device |
|
|
else: |
|
|
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" |
|
|
|
|
|
logger.info(f"Using device: {device}") |
|
|
|
|
|
|
|
|
os.makedirs(args.output, exist_ok=True) |
|
|
|
|
|
|
|
|
logger.info(f"Initializing AgentF5TTS with checkpoint: {args.checkpoint}") |
|
|
try: |
|
|
agent = AgentF5TTS( |
|
|
ckpt_file=args.checkpoint, |
|
|
vocoder_name=args.vocoder, |
|
|
device=device |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize agent: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
if args.text: |
|
|
logger.info("-" * 40) |
|
|
logger.info("SINGLE MODE PROCESSING") |
|
|
logger.info("-" * 40) |
|
|
|
|
|
if not args.ref_audio or not os.path.exists(args.ref_audio): |
|
|
logger.error(f"Reference audio not found: {args.ref_audio}") |
|
|
return |
|
|
|
|
|
|
|
|
clean_text, effective_ref = resolve_speaker_ref( |
|
|
agent, |
|
|
args.text, |
|
|
os.path.dirname(args.ref_audio), |
|
|
default_ref=args.ref_audio |
|
|
) |
|
|
|
|
|
output_path = os.path.join(args.output, "output_single.wav") |
|
|
logger.info(f"Text: {clean_text}") |
|
|
logger.info(f"Ref: {effective_ref}") |
|
|
|
|
|
try: |
|
|
agent.infer( |
|
|
ref_file=effective_ref, |
|
|
ref_text=args.ref_text, |
|
|
gen_text=clean_text, |
|
|
file_wave=output_path, |
|
|
remove_silence=True, |
|
|
speed=args.speed |
|
|
) |
|
|
logger.info(f"✓ Saved: {output_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"✗ Error: {e}") |
|
|
|
|
|
|
|
|
elif args.srt: |
|
|
logger.info("-" * 40) |
|
|
logger.info("BATCH MODE PROCESSING") |
|
|
logger.info("-" * 40) |
|
|
|
|
|
subtitles = parse_srt(args.srt) |
|
|
if not subtitles: |
|
|
logger.error("No subtitles found.") |
|
|
return |
|
|
|
|
|
logger.info(f"Processing {len(subtitles)} entries...") |
|
|
success = 0 |
|
|
errors = 0 |
|
|
skipped = 0 |
|
|
|
|
|
|
|
|
pbar = tqdm(subtitles, desc="Synthesizing", unit="line") |
|
|
|
|
|
for sub in pbar: |
|
|
sid = sub['id'] |
|
|
raw_text = sub['text'] |
|
|
|
|
|
|
|
|
pbar.set_description(f"Processing ID {sid}") |
|
|
|
|
|
|
|
|
out_name = f"{args.output_prefix}_{sid:03d}.wav" |
|
|
out_path = os.path.join(args.output, out_name) |
|
|
|
|
|
if args.skip_existing and os.path.exists(out_path): |
|
|
skipped += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
if args.ref_audio: |
|
|
default_ref = args.ref_audio |
|
|
else: |
|
|
default_ref = find_reference_audio(args.ref_dir, sid, args.audio_prefix) |
|
|
|
|
|
clean_text, ref_audio = resolve_speaker_ref(agent, raw_text, args.ref_dir, default_ref) |
|
|
|
|
|
if not ref_audio or not os.path.exists(ref_audio): |
|
|
logger.warning(f"ID {sid}: No reference audio found. Skipping.") |
|
|
errors += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
try: |
|
|
agent.infer( |
|
|
ref_file=ref_audio, |
|
|
ref_text=args.ref_text if args.ref_audio else "", |
|
|
gen_text=clean_text, |
|
|
file_wave=out_path, |
|
|
remove_silence=True, |
|
|
speed=args.speed |
|
|
) |
|
|
success += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"ID {sid}: Generation failed: {e}") |
|
|
errors += 1 |
|
|
|
|
|
logger.info("-" * 40) |
|
|
logger.info(f"Done. Success: {success}, Skipped: {skipped}, Errors: {errors}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|