|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
from math import ceil |
|
|
|
|
|
from fairseq.data.audio import rand_uniform |
|
|
from fairseq.data.audio.waveform_transforms import ( |
|
|
AudioWaveformTransform, |
|
|
register_audio_waveform_transform, |
|
|
) |
|
|
|
|
|
SNR_MIN = 5.0 |
|
|
SNR_MAX = 15.0 |
|
|
RATE = 0.25 |
|
|
|
|
|
NOISE_RATE = 1.0 |
|
|
NOISE_LEN_MEAN = 0.2 |
|
|
NOISE_LEN_STD = 0.05 |
|
|
|
|
|
|
|
|
class NoiseAugmentTransform(AudioWaveformTransform): |
|
|
@classmethod |
|
|
def from_config_dict(cls, config=None): |
|
|
_config = {} if config is None else config |
|
|
return cls( |
|
|
_config.get("samples_path", None), |
|
|
_config.get("snr_min", SNR_MIN), |
|
|
_config.get("snr_max", SNR_MAX), |
|
|
_config.get("rate", RATE), |
|
|
) |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
samples_path: str, |
|
|
snr_min: float = SNR_MIN, |
|
|
snr_max: float = SNR_MAX, |
|
|
rate: float = RATE, |
|
|
): |
|
|
|
|
|
assert ( |
|
|
samples_path |
|
|
), "need to provide path to audio samples for noise augmentation" |
|
|
assert snr_max >= snr_min, f"empty signal-to-noise range ({snr_min}, {snr_max})" |
|
|
assert rate >= 0 and rate <= 1, "rate should be a float between 0 to 1" |
|
|
|
|
|
self.paths = list(Path(samples_path).glob("**/*.wav")) |
|
|
self.n_samples = len(self.paths) |
|
|
assert self.n_samples > 0, f"no audio files found in {samples_path}" |
|
|
|
|
|
self.snr_min = snr_min |
|
|
self.snr_max = snr_max |
|
|
self.rate = rate |
|
|
|
|
|
def __repr__(self): |
|
|
return ( |
|
|
self.__class__.__name__ |
|
|
+ "(" |
|
|
+ ", ".join( |
|
|
[ |
|
|
f"n_samples={self.n_samples}", |
|
|
f"snr={self.snr_min}-{self.snr_max}dB", |
|
|
f"rate={self.rate}", |
|
|
] |
|
|
) |
|
|
+ ")" |
|
|
) |
|
|
|
|
|
def pick_sample(self, goal_shape, always_2d=False, use_sample_rate=None): |
|
|
from fairseq.data.audio.audio_utils import get_waveform |
|
|
|
|
|
path = self.paths[np.random.randint(0, self.n_samples)] |
|
|
sample = get_waveform( |
|
|
path, always_2d=always_2d, output_sample_rate=use_sample_rate |
|
|
)[0] |
|
|
|
|
|
|
|
|
|
|
|
is_2d = len(goal_shape) == 2 |
|
|
if len(goal_shape) != sample.ndim or ( |
|
|
is_2d and goal_shape[0] != sample.shape[0] |
|
|
): |
|
|
return np.zeros(goal_shape) |
|
|
|
|
|
|
|
|
len_dim = len(goal_shape) - 1 |
|
|
n_repeat = ceil(goal_shape[len_dim] / sample.shape[len_dim]) |
|
|
repeated = np.tile(sample, [1, n_repeat] if is_2d else n_repeat) |
|
|
start = np.random.randint(0, repeated.shape[len_dim] - goal_shape[len_dim] + 1) |
|
|
return ( |
|
|
repeated[:, start : start + goal_shape[len_dim]] |
|
|
if is_2d |
|
|
else repeated[start : start + goal_shape[len_dim]] |
|
|
) |
|
|
|
|
|
def _mix(self, source, noise, snr): |
|
|
get_power = lambda x: np.mean(x**2) |
|
|
if get_power(noise): |
|
|
scl = np.sqrt( |
|
|
get_power(source) / (np.power(10, snr / 10) * get_power(noise)) |
|
|
) |
|
|
else: |
|
|
scl = 0 |
|
|
return 1 * source + scl * noise |
|
|
|
|
|
def _get_noise(self, goal_shape, always_2d=False, use_sample_rate=None): |
|
|
return self.pick_sample(goal_shape, always_2d, use_sample_rate) |
|
|
|
|
|
def __call__(self, source, sample_rate): |
|
|
if np.random.random() > self.rate: |
|
|
return source, sample_rate |
|
|
|
|
|
noise = self._get_noise( |
|
|
source.shape, always_2d=True, use_sample_rate=sample_rate |
|
|
) |
|
|
|
|
|
return ( |
|
|
self._mix(source, noise, rand_uniform(self.snr_min, self.snr_max)), |
|
|
sample_rate, |
|
|
) |
|
|
|
|
|
|
|
|
@register_audio_waveform_transform("musicaugment") |
|
|
class MusicAugmentTransform(NoiseAugmentTransform): |
|
|
pass |
|
|
|
|
|
|
|
|
@register_audio_waveform_transform("backgroundnoiseaugment") |
|
|
class BackgroundNoiseAugmentTransform(NoiseAugmentTransform): |
|
|
pass |
|
|
|
|
|
|
|
|
@register_audio_waveform_transform("babbleaugment") |
|
|
class BabbleAugmentTransform(NoiseAugmentTransform): |
|
|
def _get_noise(self, goal_shape, always_2d=False, use_sample_rate=None): |
|
|
for i in range(np.random.randint(3, 8)): |
|
|
speech = self.pick_sample(goal_shape, always_2d, use_sample_rate) |
|
|
if i == 0: |
|
|
agg_noise = speech |
|
|
else: |
|
|
agg_noise = self._mix(agg_noise, speech, i) |
|
|
return agg_noise |
|
|
|
|
|
|
|
|
@register_audio_waveform_transform("sporadicnoiseaugment") |
|
|
class SporadicNoiseAugmentTransform(NoiseAugmentTransform): |
|
|
@classmethod |
|
|
def from_config_dict(cls, config=None): |
|
|
_config = {} if config is None else config |
|
|
return cls( |
|
|
_config.get("samples_path", None), |
|
|
_config.get("snr_min", SNR_MIN), |
|
|
_config.get("snr_max", SNR_MAX), |
|
|
_config.get("rate", RATE), |
|
|
_config.get("noise_rate", NOISE_RATE), |
|
|
_config.get("noise_len_mean", NOISE_LEN_MEAN), |
|
|
_config.get("noise_len_std", NOISE_LEN_STD), |
|
|
) |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
samples_path: str, |
|
|
snr_min: float = SNR_MIN, |
|
|
snr_max: float = SNR_MAX, |
|
|
rate: float = RATE, |
|
|
noise_rate: float = NOISE_RATE, |
|
|
noise_len_mean: float = NOISE_LEN_MEAN, |
|
|
noise_len_std: float = NOISE_LEN_STD, |
|
|
): |
|
|
super().__init__(samples_path, snr_min, snr_max, rate) |
|
|
self.noise_rate = noise_rate |
|
|
self.noise_len_mean = noise_len_mean |
|
|
self.noise_len_std = noise_len_std |
|
|
|
|
|
def _get_noise(self, goal_shape, always_2d=False, use_sample_rate=None): |
|
|
agg_noise = np.zeros(goal_shape) |
|
|
len_dim = len(goal_shape) - 1 |
|
|
is_2d = len(goal_shape) == 2 |
|
|
|
|
|
n_noises = round(self.noise_rate * goal_shape[len_dim] / use_sample_rate) |
|
|
start_pointers = [ |
|
|
round(rand_uniform(0, goal_shape[len_dim])) for _ in range(n_noises) |
|
|
] |
|
|
|
|
|
for start_pointer in start_pointers: |
|
|
noise_shape = list(goal_shape) |
|
|
len_seconds = np.random.normal(self.noise_len_mean, self.noise_len_std) |
|
|
noise_shape[len_dim] = round(max(0, len_seconds) * use_sample_rate) |
|
|
end_pointer = start_pointer + noise_shape[len_dim] |
|
|
if end_pointer >= goal_shape[len_dim]: |
|
|
continue |
|
|
|
|
|
noise = self.pick_sample(noise_shape, always_2d, use_sample_rate) |
|
|
if is_2d: |
|
|
agg_noise[:, start_pointer:end_pointer] = ( |
|
|
agg_noise[:, start_pointer:end_pointer] + noise |
|
|
) |
|
|
else: |
|
|
agg_noise[start_pointer:end_pointer] = ( |
|
|
agg_noise[start_pointer:end_pointer] + noise |
|
|
) |
|
|
|
|
|
return agg_noise |
|
|
|