| import os |
| import sys |
| import json |
| import argparse |
| from pathlib import Path |
| from multiprocessing import Pool |
| from datasets.arrow_writer import ArrowWriter |
| from f5_tts.model.utils import convert_char_to_pinyin |
| from tqdm import tqdm |
|
|
| sys.path.append(os.getcwd()) |
|
|
| |
| import csv |
| csv.field_size_limit(sys.maxsize) |
|
|
|
|
| |
| |
| |
| |
|
|
| import subprocess |
|
|
| def get_audio_duration(audio_path): |
| """Use ffprobe for accurate duration retrieval without header issues.""" |
| try: |
| result = subprocess.run( |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", |
| "default=noprint_wrappers=1:nokey=1", audio_path], |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| text=True |
| ) |
| return float(result.stdout.strip()) if result.stdout.strip() else 0 |
| except Exception as e: |
| print(f"Error processing {audio_path}: {e}") |
| return 0 |
|
|
|
|
|
|
| def read_audio_text_pairs(csv_file_path): |
| """Use AWK to quickly process CSV""" |
| awk_cmd = f"awk -F '|' 'NR > 1 {{ print $1, $2 }}' {csv_file_path}" |
| output = os.popen(awk_cmd).read().strip().split("\n") |
|
|
| parent = Path(csv_file_path).parent |
| return [(str(parent / line.split(" ")[0]), " ".join(line.split(" ")[1:])) for line in output if len(line.split(" ")) >= 2] |
|
|
|
|
| def process_audio(audio_path_text): |
| """Processes an audio file: checks existence, computes duration, and converts text to Pinyin""" |
| audio_path, text = audio_path_text |
| if not Path(audio_path).exists(): |
| return None |
|
|
| duration = get_audio_duration(audio_path) |
| if duration < 0.1 or duration > 30: |
| return None |
|
|
| text = convert_char_to_pinyin([text], polyphone=True)[0] |
| return {"audio_path": audio_path, "text": text, "duration": duration}, duration |
|
|
|
|
| def prepare_csv_wavs_dir(input_dir, num_processes=32): |
| """Parallelized processing of audio-text pairs using multiprocessing""" |
| input_dir = Path(input_dir) |
| metadata_path = input_dir / "metadata.csv" |
| audio_path_text_pairs = read_audio_text_pairs(metadata_path.as_posix()) |
|
|
| with Pool(num_processes) as pool: |
| results = list(tqdm(pool.imap(process_audio, audio_path_text_pairs), total=len(audio_path_text_pairs), desc="Processing audio files")) |
|
|
| sub_result, durations, vocab_set = [], [], set() |
| for result in results: |
| if result: |
| sub_result.append(result[0]) |
| durations.append(result[1]) |
| vocab_set.update(list(result[0]['text'])) |
|
|
| return sub_result, durations, vocab_set |
|
|
|
|
| def save_prepped_dataset(out_dir, result, duration_list, text_vocab_set): |
| """Writes the processed dataset to disk efficiently""" |
| out_dir = Path(out_dir) |
| out_dir.mkdir(exist_ok=True, parents=True) |
| print(f"\nSaving to {out_dir} ...") |
|
|
| raw_arrow_path = out_dir / "raw.arrow" |
| with ArrowWriter(path=raw_arrow_path.as_posix(), writer_batch_size=1) as writer: |
| for line in tqdm(result, desc="Writing to raw.arrow"): |
| writer.write(line) |
|
|
| dur_json_path = out_dir / "duration.json" |
| with open(dur_json_path.as_posix(), "w", encoding="utf-8") as f: |
| json.dump({"duration": duration_list}, f, ensure_ascii=False) |
|
|
| voca_out_path = out_dir / "new_vocab.txt" |
| with open(voca_out_path.as_posix(), "w") as f: |
| f.writelines(f"{vocab}\n" for vocab in sorted(text_vocab_set)) |
|
|
| dataset_name = out_dir.stem |
| print(f"\nFor {dataset_name}, sample count: {len(result)}") |
| print(f"For {dataset_name}, total {sum(duration_list)/3600:.2f} hours") |
|
|
|
|
| def prepare_and_save_set(inp_dir, out_dir): |
| """Runs the dataset preparation pipeline""" |
| sub_result, durations, vocab_set = prepare_csv_wavs_dir(inp_dir) |
| save_prepped_dataset(out_dir, sub_result, durations, vocab_set) |
|
|
|
|
| def cli(): |
| """Command-line interface for the script""" |
| parser = argparse.ArgumentParser(description="Prepare and save dataset.") |
| parser.add_argument("inp_dir", type=str, help="Input directory containing the data.") |
| parser.add_argument("out_dir", type=str, help="Output directory to save the prepared data.") |
|
|
| args = parser.parse_args() |
| prepare_and_save_set(args.inp_dir, args.out_dir) |
|
|
|
|
| if __name__ == "__main__": |
| cli() |
|
|