import os import csv import argparse import subprocess import random import unicodedata from phonemizer import phonemize from phonemizer.backend import EspeakBackend from tqdm import tqdm # --- Configuration --- INPUT_CSV = "/root/src/data/elevenlabs_generations_simple_ember_french_v2/mappings.csv" INPUT_DIR = "/root/src/data/elevenlabs_generations_simple_ember_french_v2" OUTPUT_WAV_DIR = os.path.join(INPUT_DIR, "wavs") TRAIN_LIST_OUTPUT = os.path.join(INPUT_DIR, "train_list.txt") VAL_LIST_OUTPUT = os.path.join(INPUT_DIR, "val_list.txt") DEFAULT_SPEAKER_ID = 3219 NASAL_VOWEL_MAP = { 'ɑ̃': 'ɑŋ', 'ɔ̃': 'ɔŋ', 'ɛ̃': 'ɛŋ', 'œ̃': 'œŋ' } def clean_phonemes(text): """ Clean phonemes the same way we cleaned train_list and val_list for emma French Voices. Operations: 1. Remove hyphens with trailing space (word separators) 2. Normalize Unicode (NFC) to merge combining tilde with vowels 3. Replace nasal vowels with approximations using existing symbols """ # Step 1: Remove hyphens with trailing space if '- ' in text: text = text.replace('- ', '') # Also remove hyphens without trailing space if '-' in text: text = text.replace('-', '') # Step 2: Normalize Unicode to merge combining characters text = unicodedata.normalize('NFC', text) # Step 3: Replace nasal vowels with their approximation for nasal_vowel, approximation in NASAL_VOWEL_MAP.items(): if nasal_vowel in text: text = text.replace(nasal_vowel, approximation) # Clean up multiple consecutive spaces text = ' '.join(text.split()) return text def convert_to_24khz(input_path, output_path): """Converts wav file to 24kHz mono using ffmpeg.""" try: cmd = [ "ffmpeg", "-y", # Overwrite output file without asking "-i", input_path, "-ar", "24000", "-ac", "1", # Mono output_path ] # Run ffmpeg, suppress output unless there's an error subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) return True except subprocess.CalledProcessError as e: print(f"Error converting {input_path}: {e.stderr.decode()}") return False def main(): parser = argparse.ArgumentParser(description="Process French dataset.") parser.add_argument("--input_csv", type=str, default=INPUT_CSV) parser.add_argument("--input_dir", type=str, default=INPUT_DIR) parser.add_argument("--output_wav_dir", type=str, default=OUTPUT_WAV_DIR) parser.add_argument("--train_list", type=str, default=TRAIN_LIST_OUTPUT) parser.add_argument("--val_list", type=str, default=VAL_LIST_OUTPUT) parser.add_argument("--speaker_id", type=int, default=DEFAULT_SPEAKER_ID) parser.add_argument("--split_ratio", type=float, default=0.9) args = parser.parse_args() print(f"Input CSV: {args.input_csv}") print(f"Input Dir: {args.input_dir}") print(f"Output Wav Dir: {args.output_wav_dir}") print(f"Speaker ID: {args.speaker_id}") os.makedirs(args.output_wav_dir, exist_ok=True) entries = [] # Read CSV with open(args.input_csv, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: entries.append(row) print(f"Found {len(entries)} entries.") processed_entries = [] texts_to_phonemize = [] rejected_count = 0 # First pass: Convert audio and collect texts print("Converting audio to 24kHz...") for row in tqdm(entries): # Check duration try: duration = float(row['duration_seconds']) if duration < 1.5 or duration > 18.0: rejected_count += 1 continue except (ValueError, KeyError): print(f"Warning: Invalid duration for {row.get('audio_file', 'unknown')}") rejected_count += 1 continue # Handle Windows-style paths in CSV orig_filename = row['audio_file'].replace('\\', '/') # The CSV path seems to include the folder name "elevenlabs_generations_simple_ellie_french/" # But the files are in args.input_dir. # If input_dir is ".../elevenlabs_generations_simple_ellie_french", and filename is "elevenlabs.../file.wav", # we might have a duplication or we need to take just the basename. # Check if the file exists as is relative to input_dir, or if we need to strip the dir prefix. # Based on the list_dir, the files are directly in input_dir. # The CSV says "elevenlabs_generations_simple_ellie_french\french_generation_1.wav" # So we should take the basename. basename = os.path.basename(orig_filename) input_wav_path = os.path.join(args.input_dir, basename) if not os.path.exists(input_wav_path): # Try the full relative path just in case input_wav_path_alt = os.path.join(os.path.dirname(args.input_dir), orig_filename) if os.path.exists(input_wav_path_alt): input_wav_path = input_wav_path_alt else: print(f"Warning: File not found: {input_wav_path}") continue output_wav_path = os.path.join(args.output_wav_dir, basename) if convert_to_24khz(input_wav_path, output_wav_path): # We'll use relative path for the list file: wavs/basename relative_path = os.path.join("wavs", basename) processed_entries.append({ "path": relative_path, "text": row['text'], "speaker_id": args.speaker_id }) texts_to_phonemize.append(row['text']) if not processed_entries: print("No entries processed successfully.") print(f"Rejected {rejected_count} files due to duration constraints (1.5s - 18s).") return print(f"Rejected {rejected_count} files due to duration constraints (1.5s - 18s).") print(f"Processing {len(processed_entries)} files.") # Phonemize print("Phonemizing text...") # Using phonemize library directly as requested # The user asked for: self.phonemizer = phonemizer.backend.EspeakBackend(language=language, preserve_punctuation=True, with_stress=True,language_switch='remove-flags') # Note: EspeakBackend is a class, we instantiate it and call phonemize method on list? # Actually phonemize function is easier if wrapper works, but user was specific about backend init. # But the simple phonemize function also takes backend arguments. # Let's try to use the phonemize function with correct args or Backend class if needed. # The phonemize function wraps the backend. try: # Using the simple interface but passing backend specific args is tricky with the wrapper sometimes. # Let's use the Backend class directly to match user request exactly. backend = EspeakBackend( language='fr-fr', preserve_punctuation=True, with_stress=True, language_switch='remove-flags' ) # backend.phonemize takes a list of texts phonemized_texts = backend.phonemize( texts_to_phonemize, strip=True, njobs=max(1, os.cpu_count() // 2) ) except Exception as e: print(f"Phonemization failed: {e}") # Fallback or exit return # Clean phonemes and combine final_lines = [] for i, entry in enumerate(processed_entries): raw_ph = phonemized_texts[i] clean_ph = clean_phonemes(raw_ph) line = f"{entry['path']}|{clean_ph}|{entry['speaker_id']}\n" final_lines.append(line) # Shuffle or sort? The previous script sorted by segment number. # These filenames have numbers too: french_generation_X.wav # Let's sort them numerically. def extract_number(line): # path|ph|id path = line.split('|')[0] # wavs/french_generation_123.wav filename = os.path.basename(path) # french_generation_123.wav try: num = int(filename.split('_')[-1].split('.')[0]) return num except: return 0 final_lines.sort(key=extract_number) # Split split_idx = int(len(final_lines) * args.split_ratio) train_data = final_lines[:split_idx] val_data = final_lines[split_idx:] print(f"Writing {len(train_data)} training lines to {args.train_list}") with open(args.train_list, 'w', encoding='utf-8') as f: f.writelines(train_data) print(f"Writing {len(val_data)} validation lines to {args.val_list}") with open(args.val_list, 'w', encoding='utf-8') as f: f.writelines(val_data) print("Done.") if __name__ == "__main__": main()