|
|
|
|
|
import os |
|
|
import csv |
|
|
import argparse |
|
|
import subprocess |
|
|
import random |
|
|
import unicodedata |
|
|
from phonemizer import phonemize |
|
|
from phonemizer.backend import EspeakBackend |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
INPUT_CSV = "/root/src/data/elevenlabs_generations_simple_ember_french_v2/mappings.csv" |
|
|
INPUT_DIR = "/root/src/data/elevenlabs_generations_simple_ember_french_v2" |
|
|
OUTPUT_WAV_DIR = os.path.join(INPUT_DIR, "wavs") |
|
|
TRAIN_LIST_OUTPUT = os.path.join(INPUT_DIR, "train_list.txt") |
|
|
VAL_LIST_OUTPUT = os.path.join(INPUT_DIR, "val_list.txt") |
|
|
DEFAULT_SPEAKER_ID = 3219 |
|
|
|
|
|
NASAL_VOWEL_MAP = { |
|
|
'ɑ̃': 'ɑŋ', |
|
|
'ɔ̃': 'ɔŋ', |
|
|
'ɛ̃': 'ɛŋ', |
|
|
'œ̃': 'œŋ' |
|
|
} |
|
|
|
|
|
def clean_phonemes(text): |
|
|
""" |
|
|
Clean phonemes the same way we cleaned train_list and val_list for emma French Voices. |
|
|
|
|
|
Operations: |
|
|
1. Remove hyphens with trailing space (word separators) |
|
|
2. Normalize Unicode (NFC) to merge combining tilde with vowels |
|
|
3. Replace nasal vowels with approximations using existing symbols |
|
|
""" |
|
|
|
|
|
if '- ' in text: |
|
|
text = text.replace('- ', '') |
|
|
|
|
|
|
|
|
if '-' in text: |
|
|
text = text.replace('-', '') |
|
|
|
|
|
|
|
|
text = unicodedata.normalize('NFC', text) |
|
|
|
|
|
|
|
|
for nasal_vowel, approximation in NASAL_VOWEL_MAP.items(): |
|
|
if nasal_vowel in text: |
|
|
text = text.replace(nasal_vowel, approximation) |
|
|
|
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
return text |
|
|
|
|
|
def convert_to_24khz(input_path, output_path): |
|
|
"""Converts wav file to 24kHz mono using ffmpeg.""" |
|
|
try: |
|
|
cmd = [ |
|
|
"ffmpeg", |
|
|
"-y", |
|
|
"-i", input_path, |
|
|
"-ar", "24000", |
|
|
"-ac", "1", |
|
|
output_path |
|
|
] |
|
|
|
|
|
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) |
|
|
return True |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"Error converting {input_path}: {e.stderr.decode()}") |
|
|
return False |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser(description="Process French dataset.") |
|
|
parser.add_argument("--input_csv", type=str, default=INPUT_CSV) |
|
|
parser.add_argument("--input_dir", type=str, default=INPUT_DIR) |
|
|
parser.add_argument("--output_wav_dir", type=str, default=OUTPUT_WAV_DIR) |
|
|
parser.add_argument("--train_list", type=str, default=TRAIN_LIST_OUTPUT) |
|
|
parser.add_argument("--val_list", type=str, default=VAL_LIST_OUTPUT) |
|
|
parser.add_argument("--speaker_id", type=int, default=DEFAULT_SPEAKER_ID) |
|
|
parser.add_argument("--split_ratio", type=float, default=0.9) |
|
|
args = parser.parse_args() |
|
|
|
|
|
print(f"Input CSV: {args.input_csv}") |
|
|
print(f"Input Dir: {args.input_dir}") |
|
|
print(f"Output Wav Dir: {args.output_wav_dir}") |
|
|
print(f"Speaker ID: {args.speaker_id}") |
|
|
|
|
|
os.makedirs(args.output_wav_dir, exist_ok=True) |
|
|
|
|
|
entries = [] |
|
|
|
|
|
|
|
|
with open(args.input_csv, 'r', encoding='utf-8') as f: |
|
|
reader = csv.DictReader(f) |
|
|
for row in reader: |
|
|
entries.append(row) |
|
|
|
|
|
print(f"Found {len(entries)} entries.") |
|
|
|
|
|
processed_entries = [] |
|
|
texts_to_phonemize = [] |
|
|
rejected_count = 0 |
|
|
|
|
|
|
|
|
print("Converting audio to 24kHz...") |
|
|
for row in tqdm(entries): |
|
|
|
|
|
try: |
|
|
duration = float(row['duration_seconds']) |
|
|
if duration < 1.5 or duration > 18.0: |
|
|
rejected_count += 1 |
|
|
continue |
|
|
except (ValueError, KeyError): |
|
|
print(f"Warning: Invalid duration for {row.get('audio_file', 'unknown')}") |
|
|
rejected_count += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
orig_filename = row['audio_file'].replace('\\', '/') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
basename = os.path.basename(orig_filename) |
|
|
input_wav_path = os.path.join(args.input_dir, basename) |
|
|
|
|
|
if not os.path.exists(input_wav_path): |
|
|
|
|
|
input_wav_path_alt = os.path.join(os.path.dirname(args.input_dir), orig_filename) |
|
|
if os.path.exists(input_wav_path_alt): |
|
|
input_wav_path = input_wav_path_alt |
|
|
else: |
|
|
print(f"Warning: File not found: {input_wav_path}") |
|
|
continue |
|
|
|
|
|
output_wav_path = os.path.join(args.output_wav_dir, basename) |
|
|
|
|
|
if convert_to_24khz(input_wav_path, output_wav_path): |
|
|
|
|
|
relative_path = os.path.join("wavs", basename) |
|
|
processed_entries.append({ |
|
|
"path": relative_path, |
|
|
"text": row['text'], |
|
|
"speaker_id": args.speaker_id |
|
|
}) |
|
|
texts_to_phonemize.append(row['text']) |
|
|
|
|
|
if not processed_entries: |
|
|
print("No entries processed successfully.") |
|
|
print(f"Rejected {rejected_count} files due to duration constraints (1.5s - 18s).") |
|
|
return |
|
|
|
|
|
print(f"Rejected {rejected_count} files due to duration constraints (1.5s - 18s).") |
|
|
print(f"Processing {len(processed_entries)} files.") |
|
|
|
|
|
|
|
|
print("Phonemizing text...") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
backend = EspeakBackend( |
|
|
language='fr-fr', |
|
|
preserve_punctuation=True, |
|
|
with_stress=True, |
|
|
language_switch='remove-flags' |
|
|
) |
|
|
|
|
|
phonemized_texts = backend.phonemize( |
|
|
texts_to_phonemize, |
|
|
strip=True, |
|
|
njobs=max(1, os.cpu_count() // 2) |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Phonemization failed: {e}") |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
final_lines = [] |
|
|
for i, entry in enumerate(processed_entries): |
|
|
raw_ph = phonemized_texts[i] |
|
|
clean_ph = clean_phonemes(raw_ph) |
|
|
line = f"{entry['path']}|{clean_ph}|{entry['speaker_id']}\n" |
|
|
final_lines.append(line) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_number(line): |
|
|
|
|
|
path = line.split('|')[0] |
|
|
|
|
|
filename = os.path.basename(path) |
|
|
|
|
|
try: |
|
|
num = int(filename.split('_')[-1].split('.')[0]) |
|
|
return num |
|
|
except: |
|
|
return 0 |
|
|
|
|
|
final_lines.sort(key=extract_number) |
|
|
|
|
|
|
|
|
split_idx = int(len(final_lines) * args.split_ratio) |
|
|
train_data = final_lines[:split_idx] |
|
|
val_data = final_lines[split_idx:] |
|
|
|
|
|
print(f"Writing {len(train_data)} training lines to {args.train_list}") |
|
|
with open(args.train_list, 'w', encoding='utf-8') as f: |
|
|
f.writelines(train_data) |
|
|
|
|
|
print(f"Writing {len(val_data)} validation lines to {args.val_list}") |
|
|
with open(args.val_list, 'w', encoding='utf-8') as f: |
|
|
f.writelines(val_data) |
|
|
|
|
|
print("Done.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|