| |
| |
| |
| |
|
|
| """ This code is modified from https://montreal-forced-aligner.readthedocs.io/en/latest/user_guide/performance.html""" |
|
|
| import os |
| import subprocess |
| from multiprocessing import Pool |
| from tqdm import tqdm |
| import torchaudio |
| from pathlib import Path |
|
|
|
|
| def remove_empty_dirs(path): |
| """remove empty directories in a given path""" |
| |
| if not os.path.isdir(path): |
| print(f"{path} is not a directory") |
| return |
|
|
| |
| for root, dirs, _ in os.walk(path, topdown=False): |
| for dir in dirs: |
| dir_path = os.path.join(root, dir) |
| |
| if not os.listdir(dir_path): |
| os.rmdir(dir_path) |
|
|
|
|
| def process_single_wav_file(task): |
| """process a single wav file""" |
| wav_file, output_dir = task |
| speaker_id, book_name, filename = Path(wav_file).parts[-3:] |
|
|
| output_book_dir = Path(output_dir, speaker_id) |
| output_book_dir.mkdir(parents=True, exist_ok=True) |
| new_filename = f"{speaker_id}_{book_name}_{filename}" |
|
|
| new_wav_file = Path(output_book_dir, new_filename) |
| command = [ |
| "ffmpeg", |
| "-nostdin", |
| "-hide_banner", |
| "-loglevel", |
| "error", |
| "-nostats", |
| "-i", |
| wav_file, |
| "-acodec", |
| "pcm_s16le", |
| "-ar", |
| "16000", |
| new_wav_file, |
| ] |
| subprocess.check_call( |
| command |
| ) |
| os.remove(wav_file) |
|
|
|
|
| def process_wav_files(wav_files, output_dir, n_process): |
| """process wav files in parallel""" |
| tasks = [(wav_file, output_dir) for wav_file in wav_files] |
| print(f"Processing {len(tasks)} files") |
| with Pool(processes=n_process) as pool: |
| for _ in tqdm( |
| pool.imap_unordered(process_single_wav_file, tasks), total=len(tasks) |
| ): |
| pass |
| print("Removing empty directories...") |
| remove_empty_dirs(output_dir) |
| print("Done!") |
|
|
|
|
| def get_wav_files(dataset_path): |
| """get all wav files in the dataset""" |
| wav_files = [] |
| for speaker_id in os.listdir(dataset_path): |
| speaker_dir = os.path.join(dataset_path, speaker_id) |
| if not os.path.isdir(speaker_dir): |
| continue |
| for book_name in os.listdir(speaker_dir): |
| book_dir = os.path.join(speaker_dir, book_name) |
| if not os.path.isdir(book_dir): |
| continue |
| for file in os.listdir(book_dir): |
| if file.endswith(".wav"): |
| wav_files.append(os.path.join(book_dir, file)) |
| print("Found {} wav files".format(len(wav_files))) |
| return wav_files |
|
|
|
|
| def filter_wav_files_by_length(wav_files, max_len_sec=15): |
| """filter wav files by length""" |
| print("original wav files: {}".format(len(wav_files))) |
| filtered_wav_files = [] |
| for audio_file in wav_files: |
| metadata = torchaudio.info(str(audio_file)) |
| audio_length = metadata.num_frames / metadata.sample_rate |
| if audio_length <= max_len_sec: |
| filtered_wav_files.append(audio_file) |
| else: |
| os.remove(audio_file) |
| print("filtered wav files: {}".format(len(filtered_wav_files))) |
| return filtered_wav_files |
|
|
|
|
| if __name__ == "__main__": |
| dataset_path = "/path/to/output/directory" |
| n_process = 16 |
| max_len_sec = 15 |
| wav_files = get_wav_files(dataset_path) |
| filtered_wav_files = filter_wav_files_by_length(wav_files, max_len_sec) |
| process_wav_files(filtered_wav_files, dataset_path, n_process) |
|
|