Spaces:
Sleeping
Sleeping
| import tqdm | |
| import librosa | |
| import numpy as np | |
| import os | |
| import soundfile as sf | |
| from typing import Optional | |
| from src.config.config import ProcessingConfig | |
| config = ProcessingConfig() | |
| class AudioAugment: | |
| def __init__(self, config: ProcessingConfig = config) -> None: | |
| self.config = config | |
| def _mel_spectrogram(self, audio: np.ndarray) -> np.ndarray: | |
| mel_spec = librosa.feature.melspectrogram( | |
| y=audio, | |
| sr=self.config.sample_rate, | |
| n_fft=self.config.fft_size, | |
| hop_length=self.config.hop_size, | |
| win_length=self.config.frame_size, | |
| n_mels=self.config.n_bands, | |
| fmin=0, | |
| fmax=self.config.sample_rate / 2, | |
| window='hann' | |
| ) | |
| mel_spectrogram_db = 10 * np.log10(mel_spec.T + 1e-10) | |
| max_db = mel_spectrogram_db.max() | |
| mel_spectrogram_db = mel_spectrogram_db - max_db | |
| return mel_spectrogram_db | |
| def _data_treatment_training(self, audio_path: str) -> tuple[list[np.ndarray], np.ndarray]: | |
| labels = [] | |
| log_mel_spectrograms = [] | |
| filenames = os.listdir(audio_path) | |
| for filename in tqdm.tqdm(filenames, desc="Processing audio files"): | |
| filename_splitted = filename.split("-") | |
| label = filename_splitted[-1].split(".")[0] | |
| label = label.split("_")[0] | |
| labels.append(int(label)) | |
| file_path = os.path.join(audio_path, filename) | |
| audio, sr = librosa.load(file_path, sr=self.config.sample_rate) | |
| mel_spectrogram_db = self._mel_spectrogram(audio) | |
| log_mel_spectrograms.append(mel_spectrogram_db) | |
| return log_mel_spectrograms, np.array(labels) | |
| def _data_treatment_testing(self, file_path: str) -> list[np.ndarray]: | |
| audio, sr = librosa.load(file_path, sr=self.config.sample_rate) | |
| mel_spectrogram_db = self._mel_spectrogram(audio) | |
| return [mel_spectrogram_db] | |
| def _pad(self, audio: np.ndarray) -> np.ndarray: | |
| target_len = int(self.config.sample_rate * self.config.target_seconds) | |
| n = len(audio) | |
| if n < target_len: | |
| audio = np.pad(audio, (0, target_len - n), mode="constant") | |
| return audio | |
| def _time_stretch_augmentation(self, file_path: str, rate: float) -> np.ndarray: | |
| audio, _ = librosa.load(file_path, sr=self.config.sample_rate) | |
| audio_timestretch = librosa.effects.time_stretch(audio.astype(np.float32), rate=rate) | |
| return self._pad(audio_timestretch) | |
| def _pitch_shift_augmentation(self, file_path: str, semitones: float) -> np.ndarray: | |
| audio, _ = librosa.load(file_path, sr=self.config.sample_rate) | |
| return librosa.effects.pitch_shift(audio.astype(np.float32), sr=self.config.sample_rate, n_steps=semitones) | |
| def _drc_augmentation(self, file_path: str, compression: float) -> np.ndarray: | |
| if compression == "musicstandard": threshold_db=-20; ratio=2.0; attack_ms=5; release_ms=50 | |
| elif compression == "filmstandard": threshold_db=-25; ratio=4.0; attack_ms=10; release_ms= 100 | |
| elif compression == "speech": threshold_db=-18; ratio=3.0; attack_ms=2; release_ms= 40 | |
| elif compression == "radio": threshold_db=-15; ratio=3.5; attack_ms=1; release_ms= 200 | |
| audio, _ = librosa.load(file_path, sr=self.config.sample_rate) | |
| threshold = 10**(threshold_db / 20) | |
| attack_coeff = np.exp(-1.0 / (0.001 * attack_ms * self.config.sample_rate)) | |
| release_coeff = np.exp(-1.0 / (0.001 * release_ms * self.config.sample_rate)) | |
| audio_filtered = np.zeros_like(audio) | |
| gain = 1.0 | |
| for n in range(len(audio)): | |
| abs_audio = abs(audio[n]) | |
| if abs_audio > threshold: | |
| desired_gain = (threshold / abs_audio) ** (ratio - 1) | |
| else: | |
| desired_gain = 1.0 | |
| if desired_gain < gain: | |
| gain = attack_coeff * (gain - desired_gain) + desired_gain | |
| else: | |
| gain = release_coeff * (gain - desired_gain) + desired_gain | |
| audio_filtered[n] = audio[n] * gain | |
| return audio_filtered | |
| def _augment_dataset(self, audio_path: str, output_path: str, probability_list: list[float]) -> None: | |
| filenames = os.listdir(audio_path) | |
| p1, p2, p3 = probability_list | |
| os.makedirs(output_path, exist_ok=True) | |
| for filename in tqdm.tqdm(filenames, desc="Augmenting audio files"): | |
| audio, _ = librosa.load(os.path.join(audio_path, filename), sr=self.config.sample_rate) | |
| # TS | |
| if np.random.rand() > p1: | |
| stretch_rates = [0.81, 0.93, 1.07, 1.23] | |
| stretch_rate = np.random.choice(stretch_rates) | |
| audio = self._time_stretch_augmentation(os.path.join(audio_path, filename), stretch_rate) | |
| # PS | |
| if np.random.rand() > p2: | |
| semitones = [-3.5, -2.5, -2, -1, 1, 2.5, 3, 3.5] | |
| semitone = np.random.choice(semitones) | |
| audio = self._pitch_shift_augmentation(os.path.join(audio_path, filename), semitone) | |
| # DRC | |
| if np.random.rand() > p3: | |
| compressions = ["radio", "filmstandard", "musicstandard", "speech"] | |
| compression = np.random.choice(compressions) | |
| audio = self._drc_augmentation(os.path.join(audio_path, filename), compression) | |
| sf.write(os.path.join(output_path, filename), audio, self.config.sample_rate) | |
| def _create_augmented_datasets(self, input_path: str, output_path: str) -> None: | |
| probability_lists = self.config.augmentation_probability_lists | |
| for i, probability_list in enumerate(probability_lists): | |
| augmented_path = os.path.join(output_path, f"{i+1}") | |
| os.makedirs(augmented_path, exist_ok=True) | |
| self._augment_dataset(input_path, augmented_path, probability_list) | |
| def _create_log_mel(self, input_path: str, output_path: str) -> tuple[list[np.ndarray], np.ndarray]: | |
| directories = os.listdir(input_path) | |
| X, y = [], [] | |
| for directory in directories: | |
| log_mels, labels = self._data_treatment_training(os.path.join(input_path, directory)) | |
| X.extend(log_mels) | |
| y.extend(labels) | |
| X_array = np.empty(len(X), dtype=object) | |
| for i, spec in enumerate(X): | |
| X_array[i] = spec | |
| y = np.array(y) | |
| os.makedirs(output_path, exist_ok=True) | |
| np.save(os.path.join(output_path, "X.npy"), X_array, allow_pickle=True) | |
| np.save(os.path.join(output_path, 'y.npy'), y) | |
| return X, y | |
| def run(self, augment: bool = True, preprocess : bool = True) -> None: | |
| if augment: | |
| self._create_augmented_datasets(self.config.audio_path, self.config.augmented_path) | |
| if preprocess: | |
| self._create_log_mel(self.config.augmented_path, self.config.log_mel_path) |