| | |
| | |
| | |
| | |
| |
|
| | import math |
| | import random |
| | import os |
| | import json |
| |
|
| | import numpy as np |
| | import parselmouth |
| | import torch |
| | import torchaudio |
| | from tqdm import tqdm |
| |
|
| | from audiomentations import TimeStretch |
| |
|
| | from pedalboard import ( |
| | Pedalboard, |
| | HighShelfFilter, |
| | LowShelfFilter, |
| | PeakFilter, |
| | PitchShift, |
| | ) |
| |
|
| | from utils.util import has_existed |
| |
|
| | PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT = 0.0 |
| | PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT = 1.0 |
| | PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT = 1.0 |
| | PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT = 1.0 |
| | PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT = 1.0 |
| |
|
| |
|
| | def wav_to_Sound(wav, sr: int) -> parselmouth.Sound: |
| | """Convert a waveform to a parselmouth.Sound object |
| | |
| | Args: |
| | wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
| | sr (int, optional): sampling rate. |
| | |
| | Returns: |
| | parselmouth.Sound: a parselmouth.Sound object |
| | """ |
| | assert wav.shape == (1, len(wav[0])), "wav must be of shape (1, n_samples)" |
| | sound = None |
| | if isinstance(wav, np.ndarray): |
| | sound = parselmouth.Sound(wav[0], sampling_frequency=sr) |
| | elif isinstance(wav, torch.Tensor): |
| | sound = parselmouth.Sound(wav[0].numpy(), sampling_frequency=sr) |
| | assert sound is not None, "wav must be either np.ndarray or torch.Tensor" |
| | return sound |
| |
|
| |
|
| | def get_pitch_median(wav, sr: int): |
| | """Get the median pitch of a waveform |
| | |
| | Args: |
| | wav (np.ndarray/torch.Tensor): waveform of shape (n_channels, n_samples) |
| | sr (int, optional): sampling rate. |
| | |
| | Returns: |
| | parselmouth.Pitch, float: a parselmouth.Pitch object and the median pitch |
| | """ |
| | if not isinstance(wav, parselmouth.Sound): |
| | sound = wav_to_Sound(wav, sr) |
| | else: |
| | sound = wav |
| | pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| |
|
| | |
| | pitch = parselmouth.praat.call(sound, "To Pitch", 0.8 / 75, 75, 600) |
| | |
| | pitch_median = parselmouth.praat.call(pitch, "Get quantile", 0.0, 0.0, 0.5, "Hertz") |
| |
|
| | return pitch, pitch_median |
| |
|
| |
|
| | def change_gender( |
| | sound, |
| | pitch=None, |
| | formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
| | new_pitch_median: float = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT, |
| | pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
| | duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
| | ) -> parselmouth.Sound: |
| | """Invoke change gender function in praat |
| | |
| | Args: |
| | sound (parselmouth.Sound): a parselmouth.Sound object |
| | pitch (parselmouth.Pitch, optional): a parselmouth.Pitch object. Defaults to None. |
| | formant_shift_ratio (float, optional): formant shift ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch. Less than 1.0 means lower pitch. |
| | new_pitch_median (float, optional): new pitch median. |
| | pitch_range_ratio (float, optional): pitch range ratio. A value of 1.0 means no change. Greater than 1.0 means higher pitch range. Less than 1.0 means lower pitch range. |
| | duration_factor (float, optional): duration factor. A value of 1.0 means no change. Greater than 1.0 means longer duration. Less than 1.0 means shorter duration. |
| | |
| | Returns: |
| | parselmouth.Sound: a parselmouth.Sound object |
| | """ |
| | if pitch is None: |
| | new_sound = parselmouth.praat.call( |
| | sound, |
| | "Change gender", |
| | 75, |
| | 600, |
| | formant_shift_ratio, |
| | new_pitch_median, |
| | pitch_range_ratio, |
| | duration_factor, |
| | ) |
| | else: |
| | new_sound = parselmouth.praat.call( |
| | (sound, pitch), |
| | "Change gender", |
| | formant_shift_ratio, |
| | new_pitch_median, |
| | pitch_range_ratio, |
| | duration_factor, |
| | ) |
| | return new_sound |
| |
|
| |
|
| | def apply_formant_and_pitch_shift( |
| | sound: parselmouth.Sound, |
| | formant_shift_ratio: float = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT, |
| | pitch_shift_ratio: float = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT, |
| | pitch_range_ratio: float = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT, |
| | duration_factor: float = PRAAT_CHANGEGENDER_DURATIONFACTOR_DEFAULT, |
| | ) -> parselmouth.Sound: |
| | """use Praat "Changer gender" command to manipulate pitch and formant |
| | "Change gender": Praat -> Sound Object -> Convert -> Change gender |
| | refer to Help of Praat for more details |
| | # https://github.com/YannickJadoul/Parselmouth/issues/25#issuecomment-608632887 might help |
| | """ |
| | pitch = None |
| | new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| | if pitch_shift_ratio != 1.0: |
| | pitch, pitch_median = get_pitch_median(sound, sound.sampling_frequency) |
| | new_pitch_median = pitch_median * pitch_shift_ratio |
| |
|
| | |
| | pitch_minimum = parselmouth.praat.call( |
| | pitch, "Get minimum", 0.0, 0.0, "Hertz", "Parabolic" |
| | ) |
| | new_median = pitch_median * pitch_shift_ratio |
| | scaled_minimum = pitch_minimum * pitch_shift_ratio |
| | result_minimum = new_median + (scaled_minimum - new_median) * pitch_range_ratio |
| | if result_minimum < 0: |
| | new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| | pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
| |
|
| | if math.isnan(new_pitch_median): |
| | new_pitch_median = PRAAT_CHANGEGENDER_PITCHMEDIAN_DEFAULT |
| | pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
| |
|
| | new_sound = change_gender( |
| | sound, |
| | pitch, |
| | formant_shift_ratio, |
| | new_pitch_median, |
| | pitch_range_ratio, |
| | duration_factor, |
| | ) |
| | return new_sound |
| |
|
| |
|
| | |
| | def pedalboard_equalizer(wav: np.ndarray, sr: int) -> np.ndarray: |
| | """Use pedalboard to do equalizer""" |
| | board = Pedalboard() |
| |
|
| | cutoff_low_freq = 60 |
| | cutoff_high_freq = 10000 |
| |
|
| | q_min = 2 |
| | q_max = 5 |
| |
|
| | random_all_freq = True |
| | num_filters = 10 |
| | if random_all_freq: |
| | key_freqs = [random.uniform(1, 12000) for _ in range(num_filters)] |
| | else: |
| | key_freqs = [ |
| | power_ratio(float(z) / (num_filters - 1), cutoff_low_freq, cutoff_high_freq) |
| | for z in range(num_filters) |
| | ] |
| | q_values = [ |
| | power_ratio(random.uniform(0, 1), q_min, q_max) for _ in range(num_filters) |
| | ] |
| | gains = [random.uniform(-12, 12) for _ in range(num_filters)] |
| | |
| | board.append( |
| | LowShelfFilter( |
| | cutoff_frequency_hz=key_freqs[0], gain_db=gains[0], q=q_values[0] |
| | ) |
| | ) |
| | |
| | for i in range(1, 9): |
| | board.append( |
| | PeakFilter( |
| | cutoff_frequency_hz=key_freqs[i], gain_db=gains[i], q=q_values[i] |
| | ) |
| | ) |
| | |
| | board.append( |
| | HighShelfFilter( |
| | cutoff_frequency_hz=key_freqs[9], gain_db=gains[9], q=q_values[9] |
| | ) |
| | ) |
| |
|
| | |
| | processed_audio = board(wav, sr) |
| | return processed_audio |
| |
|
| |
|
| | def power_ratio(r: float, a: float, b: float): |
| | return a * math.pow((b / a), r) |
| |
|
| |
|
| | def audiomentations_time_stretch(wav: np.ndarray, sr: int) -> np.ndarray: |
| | """Use audiomentations to do time stretch""" |
| | transform = TimeStretch( |
| | min_rate=0.8, max_rate=1.25, leave_length_unchanged=False, p=1.0 |
| | ) |
| | augmented_wav = transform(wav, sample_rate=sr) |
| | return augmented_wav |
| |
|
| |
|
| | def formant_and_pitch_shift( |
| | sound: parselmouth.Sound, fs: bool, ps: bool |
| | ) -> parselmouth.Sound: |
| | """ """ |
| | formant_shift_ratio = PRAAT_CHANGEGENDER_FORMANTSHIFTRATIO_DEFAULT |
| | pitch_shift_ratio = PRAAT_CHANGEGENDER_PITCHSHIFTRATIO_DEFAULT |
| | pitch_range_ratio = PRAAT_CHANGEGENDER_PITCHRANGERATIO_DEFAULT |
| |
|
| | assert fs != ps, "fs, ps are mutually exclusive" |
| |
|
| | if fs: |
| | formant_shift_ratio = random.uniform(1.0, 1.4) |
| | use_reciprocal = random.uniform(-1, 1) > 0 |
| | if use_reciprocal: |
| | formant_shift_ratio = 1.0 / formant_shift_ratio |
| | |
| | new_sound = apply_formant_and_pitch_shift( |
| | sound, |
| | formant_shift_ratio=formant_shift_ratio, |
| | ) |
| | return new_sound |
| |
|
| | if ps: |
| | board = Pedalboard() |
| | board.append(PitchShift(random.uniform(-12, 12))) |
| | wav_numpy = sound.values |
| | wav_numpy = board(wav_numpy, sound.sampling_frequency) |
| | |
| | new_sound = parselmouth.Sound( |
| | wav_numpy, sampling_frequency=sound.sampling_frequency |
| | ) |
| | return new_sound |
| |
|
| |
|
| | def wav_manipulation( |
| | wav: torch.Tensor, |
| | sr: int, |
| | aug_type: str = "None", |
| | formant_shift: bool = False, |
| | pitch_shift: bool = False, |
| | time_stretch: bool = False, |
| | equalizer: bool = False, |
| | ) -> torch.Tensor: |
| | assert aug_type == "None" or aug_type in [ |
| | "formant_shift", |
| | "pitch_shift", |
| | "time_stretch", |
| | "equalizer", |
| | ], "aug_type must be one of formant_shift, pitch_shift, time_stretch, equalizer" |
| |
|
| | assert aug_type == "None" or ( |
| | formant_shift == False |
| | and pitch_shift == False |
| | and time_stretch == False |
| | and equalizer == False |
| | ), "if aug_type is specified, other argument must be False" |
| |
|
| | if aug_type != "None": |
| | if aug_type == "formant_shift": |
| | formant_shift = True |
| | if aug_type == "pitch_shift": |
| | pitch_shift = True |
| | if aug_type == "equalizer": |
| | equalizer = True |
| | if aug_type == "time_stretch": |
| | time_stretch = True |
| |
|
| | wav_numpy = wav.numpy() |
| |
|
| | if equalizer: |
| | wav_numpy = pedalboard_equalizer(wav_numpy, sr) |
| |
|
| | if time_stretch: |
| | wav_numpy = audiomentations_time_stretch(wav_numpy, sr) |
| |
|
| | sound = wav_to_Sound(wav_numpy, sr) |
| |
|
| | if formant_shift or pitch_shift: |
| | sound = formant_and_pitch_shift(sound, formant_shift, pitch_shift) |
| |
|
| | wav = torch.from_numpy(sound.values).float() |
| | |
| | return wav |
| |
|
| |
|
| | def augment_dataset(cfg, dataset) -> list: |
| | """Augment dataset with formant_shift, pitch_shift, time_stretch, equalizer |
| | |
| | Args: |
| | cfg (dict): configuration |
| | dataset (str): dataset name |
| | |
| | Returns: |
| | list: augmented dataset names |
| | """ |
| | |
| | dataset_path = os.path.join(cfg.preprocess.processed_dir, dataset) |
| | split = ["train", "test"] if "eval" not in dataset else ["test"] |
| | augment_datasets = [] |
| | aug_types = [ |
| | "formant_shift" if cfg.preprocess.use_formant_shift else None, |
| | "pitch_shift" if cfg.preprocess.use_pitch_shift else None, |
| | "time_stretch" if cfg.preprocess.use_time_stretch else None, |
| | "equalizer" if cfg.preprocess.use_equalizer else None, |
| | ] |
| | aug_types = filter(None, aug_types) |
| | for aug_type in aug_types: |
| | print("Augmenting {} with {}...".format(dataset, aug_type)) |
| | new_dataset = dataset + "_" + aug_type |
| | augment_datasets.append(new_dataset) |
| | new_dataset_path = os.path.join(cfg.preprocess.processed_dir, new_dataset) |
| |
|
| | for dataset_type in split: |
| | metadata_path = os.path.join(dataset_path, "{}.json".format(dataset_type)) |
| | augmented_metadata = [] |
| | new_metadata_path = os.path.join( |
| | new_dataset_path, "{}.json".format(dataset_type) |
| | ) |
| | os.makedirs(new_dataset_path, exist_ok=True) |
| | new_dataset_wav_dir = os.path.join(new_dataset_path, "wav") |
| | os.makedirs(new_dataset_wav_dir, exist_ok=True) |
| |
|
| | if has_existed(new_metadata_path): |
| | continue |
| |
|
| | with open(metadata_path, "r") as f: |
| | metadata = json.load(f) |
| |
|
| | for utt in tqdm(metadata): |
| | original_wav_path = utt["Path"] |
| | original_wav, sr = torchaudio.load(original_wav_path) |
| | new_wav = wav_manipulation(original_wav, sr, aug_type=aug_type) |
| | new_wav_path = os.path.join(new_dataset_wav_dir, utt["Uid"] + ".wav") |
| | torchaudio.save(new_wav_path, new_wav, sr) |
| | new_utt = { |
| | "Dataset": utt["Dataset"] + "_" + aug_type, |
| | "index": utt["index"], |
| | "Singer": utt["Singer"], |
| | "Uid": utt["Uid"], |
| | "Path": new_wav_path, |
| | "Duration": utt["Duration"], |
| | } |
| | augmented_metadata.append(new_utt) |
| | new_metadata_path = os.path.join( |
| | new_dataset_path, "{}.json".format(dataset_type) |
| | ) |
| | with open(new_metadata_path, "w") as f: |
| | json.dump(augmented_metadata, f, indent=4, ensure_ascii=False) |
| | return augment_datasets |
| |
|