esc50-model / src /data /augment.py
mateo496's picture
OOP complete and functional rewrite
a3ea780
import tqdm
import librosa
import numpy as np
import os
import soundfile as sf
from typing import Optional
from src.config.config import ProcessingConfig
config = ProcessingConfig()
class AudioAugment:
def __init__(self, config: ProcessingConfig = config) -> None:
self.config = config
def _mel_spectrogram(self, audio: np.ndarray) -> np.ndarray:
mel_spec = librosa.feature.melspectrogram(
y=audio,
sr=self.config.sample_rate,
n_fft=self.config.fft_size,
hop_length=self.config.hop_size,
win_length=self.config.frame_size,
n_mels=self.config.n_bands,
fmin=0,
fmax=self.config.sample_rate / 2,
window='hann'
)
mel_spectrogram_db = 10 * np.log10(mel_spec.T + 1e-10)
max_db = mel_spectrogram_db.max()
mel_spectrogram_db = mel_spectrogram_db - max_db
return mel_spectrogram_db
def _data_treatment_training(self, audio_path: str) -> tuple[list[np.ndarray], np.ndarray]:
labels = []
log_mel_spectrograms = []
filenames = os.listdir(audio_path)
for filename in tqdm.tqdm(filenames, desc="Processing audio files"):
filename_splitted = filename.split("-")
label = filename_splitted[-1].split(".")[0]
label = label.split("_")[0]
labels.append(int(label))
file_path = os.path.join(audio_path, filename)
audio, sr = librosa.load(file_path, sr=self.config.sample_rate)
mel_spectrogram_db = self._mel_spectrogram(audio)
log_mel_spectrograms.append(mel_spectrogram_db)
return log_mel_spectrograms, np.array(labels)
def _data_treatment_testing(self, file_path: str) -> list[np.ndarray]:
audio, sr = librosa.load(file_path, sr=self.config.sample_rate)
mel_spectrogram_db = self._mel_spectrogram(audio)
return [mel_spectrogram_db]
def _pad(self, audio: np.ndarray) -> np.ndarray:
target_len = int(self.config.sample_rate * self.config.target_seconds)
n = len(audio)
if n < target_len:
audio = np.pad(audio, (0, target_len - n), mode="constant")
return audio
def _time_stretch_augmentation(self, file_path: str, rate: float) -> np.ndarray:
audio, _ = librosa.load(file_path, sr=self.config.sample_rate)
audio_timestretch = librosa.effects.time_stretch(audio.astype(np.float32), rate=rate)
return self._pad(audio_timestretch)
def _pitch_shift_augmentation(self, file_path: str, semitones: float) -> np.ndarray:
audio, _ = librosa.load(file_path, sr=self.config.sample_rate)
return librosa.effects.pitch_shift(audio.astype(np.float32), sr=self.config.sample_rate, n_steps=semitones)
def _drc_augmentation(self, file_path: str, compression: float) -> np.ndarray:
if compression == "musicstandard": threshold_db=-20; ratio=2.0; attack_ms=5; release_ms=50
elif compression == "filmstandard": threshold_db=-25; ratio=4.0; attack_ms=10; release_ms= 100
elif compression == "speech": threshold_db=-18; ratio=3.0; attack_ms=2; release_ms= 40
elif compression == "radio": threshold_db=-15; ratio=3.5; attack_ms=1; release_ms= 200
audio, _ = librosa.load(file_path, sr=self.config.sample_rate)
threshold = 10**(threshold_db / 20)
attack_coeff = np.exp(-1.0 / (0.001 * attack_ms * self.config.sample_rate))
release_coeff = np.exp(-1.0 / (0.001 * release_ms * self.config.sample_rate))
audio_filtered = np.zeros_like(audio)
gain = 1.0
for n in range(len(audio)):
abs_audio = abs(audio[n])
if abs_audio > threshold:
desired_gain = (threshold / abs_audio) ** (ratio - 1)
else:
desired_gain = 1.0
if desired_gain < gain:
gain = attack_coeff * (gain - desired_gain) + desired_gain
else:
gain = release_coeff * (gain - desired_gain) + desired_gain
audio_filtered[n] = audio[n] * gain
return audio_filtered
def _augment_dataset(self, audio_path: str, output_path: str, probability_list: list[float]) -> None:
filenames = os.listdir(audio_path)
p1, p2, p3 = probability_list
os.makedirs(output_path, exist_ok=True)
for filename in tqdm.tqdm(filenames, desc="Augmenting audio files"):
audio, _ = librosa.load(os.path.join(audio_path, filename), sr=self.config.sample_rate)
# TS
if np.random.rand() > p1:
stretch_rates = [0.81, 0.93, 1.07, 1.23]
stretch_rate = np.random.choice(stretch_rates)
audio = self._time_stretch_augmentation(os.path.join(audio_path, filename), stretch_rate)
# PS
if np.random.rand() > p2:
semitones = [-3.5, -2.5, -2, -1, 1, 2.5, 3, 3.5]
semitone = np.random.choice(semitones)
audio = self._pitch_shift_augmentation(os.path.join(audio_path, filename), semitone)
# DRC
if np.random.rand() > p3:
compressions = ["radio", "filmstandard", "musicstandard", "speech"]
compression = np.random.choice(compressions)
audio = self._drc_augmentation(os.path.join(audio_path, filename), compression)
sf.write(os.path.join(output_path, filename), audio, self.config.sample_rate)
def _create_augmented_datasets(self, input_path: str, output_path: str) -> None:
probability_lists = self.config.augmentation_probability_lists
for i, probability_list in enumerate(probability_lists):
augmented_path = os.path.join(output_path, f"{i+1}")
os.makedirs(augmented_path, exist_ok=True)
self._augment_dataset(input_path, augmented_path, probability_list)
def _create_log_mel(self, input_path: str, output_path: str) -> tuple[list[np.ndarray], np.ndarray]:
directories = os.listdir(input_path)
X, y = [], []
for directory in directories:
log_mels, labels = self._data_treatment_training(os.path.join(input_path, directory))
X.extend(log_mels)
y.extend(labels)
X_array = np.empty(len(X), dtype=object)
for i, spec in enumerate(X):
X_array[i] = spec
y = np.array(y)
os.makedirs(output_path, exist_ok=True)
np.save(os.path.join(output_path, "X.npy"), X_array, allow_pickle=True)
np.save(os.path.join(output_path, 'y.npy'), y)
return X, y
def run(self, augment: bool = True, preprocess : bool = True) -> None:
if augment:
self._create_augmented_datasets(self.config.audio_path, self.config.augmented_path)
if preprocess:
self._create_log_mel(self.config.augmented_path, self.config.log_mel_path)