Spaces:
Sleeping
Sleeping
| import os | |
| import sys | |
| import json | |
| import argparse | |
| from pathlib import Path | |
| from multiprocessing import Pool | |
| from datasets.arrow_writer import ArrowWriter | |
| from f5_tts.model.utils import convert_char_to_pinyin | |
| from tqdm import tqdm | |
| sys.path.append(os.getcwd()) | |
| # Increase CSV field size limit | |
| import csv | |
| csv.field_size_limit(sys.maxsize) | |
| # def get_audio_duration(audio_path): | |
| # """Use SoX for instant audio duration retrieval""" | |
| # result = os.popen(f"soxi -D {audio_path}").read().strip() | |
| # return float(result) if result else 0 | |
| import subprocess | |
| def get_audio_duration(audio_path): | |
| """Use ffprobe for accurate duration retrieval without header issues.""" | |
| try: | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", | |
| "default=noprint_wrappers=1:nokey=1", audio_path], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| return float(result.stdout.strip()) if result.stdout.strip() else 0 | |
| except Exception as e: | |
| print(f"Error processing {audio_path}: {e}") | |
| return 0 | |
| def read_audio_text_pairs(csv_file_path): | |
| """Use AWK to quickly process CSV""" | |
| awk_cmd = f"awk -F '|' 'NR > 1 {{ print $1, $2 }}' {csv_file_path}" | |
| output = os.popen(awk_cmd).read().strip().split("\n") | |
| parent = Path(csv_file_path).parent | |
| return [(str(parent / line.split(" ")[0]), " ".join(line.split(" ")[1:])) for line in output if len(line.split(" ")) >= 2] | |
| def process_audio(audio_path_text): | |
| """Processes an audio file: checks existence, computes duration, and converts text to Pinyin""" | |
| audio_path, text = audio_path_text | |
| if not Path(audio_path).exists(): | |
| return None | |
| duration = get_audio_duration(audio_path) | |
| if duration < 0.1 or duration > 30: | |
| return None | |
| text = convert_char_to_pinyin([text], polyphone=True)[0] | |
| return {"audio_path": audio_path, "text": text, "duration": duration}, duration | |
| def prepare_csv_wavs_dir(input_dir, num_processes=32): | |
| """Parallelized processing of audio-text pairs using multiprocessing""" | |
| input_dir = Path(input_dir) | |
| metadata_path = input_dir / "metadata.csv" | |
| audio_path_text_pairs = read_audio_text_pairs(metadata_path.as_posix()) | |
| with Pool(num_processes) as pool: | |
| results = list(tqdm(pool.imap(process_audio, audio_path_text_pairs), total=len(audio_path_text_pairs), desc="Processing audio files")) | |
| sub_result, durations, vocab_set = [], [], set() | |
| for result in results: | |
| if result: | |
| sub_result.append(result[0]) | |
| durations.append(result[1]) | |
| vocab_set.update(list(result[0]['text'])) | |
| return sub_result, durations, vocab_set | |
| def save_prepped_dataset(out_dir, result, duration_list, text_vocab_set): | |
| """Writes the processed dataset to disk efficiently""" | |
| out_dir = Path(out_dir) | |
| out_dir.mkdir(exist_ok=True, parents=True) | |
| print(f"\nSaving to {out_dir} ...") | |
| raw_arrow_path = out_dir / "raw.arrow" | |
| with ArrowWriter(path=raw_arrow_path.as_posix(), writer_batch_size=1) as writer: | |
| for line in tqdm(result, desc="Writing to raw.arrow"): | |
| writer.write(line) # Stream data directly to Arrow file | |
| dur_json_path = out_dir / "duration.json" | |
| with open(dur_json_path.as_posix(), "w", encoding="utf-8") as f: | |
| json.dump({"duration": duration_list}, f, ensure_ascii=False) | |
| voca_out_path = out_dir / "new_vocab.txt" | |
| with open(voca_out_path.as_posix(), "w") as f: | |
| f.writelines(f"{vocab}\n" for vocab in sorted(text_vocab_set)) | |
| dataset_name = out_dir.stem | |
| print(f"\nFor {dataset_name}, sample count: {len(result)}") | |
| print(f"For {dataset_name}, total {sum(duration_list)/3600:.2f} hours") | |
| def prepare_and_save_set(inp_dir, out_dir): | |
| """Runs the dataset preparation pipeline""" | |
| sub_result, durations, vocab_set = prepare_csv_wavs_dir(inp_dir) | |
| save_prepped_dataset(out_dir, sub_result, durations, vocab_set) | |
| def cli(): | |
| """Command-line interface for the script""" | |
| parser = argparse.ArgumentParser(description="Prepare and save dataset.") | |
| parser.add_argument("inp_dir", type=str, help="Input directory containing the data.") | |
| parser.add_argument("out_dir", type=str, help="Output directory to save the prepared data.") | |
| args = parser.parse_args() | |
| prepare_and_save_set(args.inp_dir, args.out_dir) | |
| if __name__ == "__main__": | |
| cli() | |