Spaces:
Runtime error
Runtime error
File size: 8,941 Bytes
b7becdf |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
#!/usr/bin/env python3
import os
import numpy as np
import librosa
import soundfile as sf
from scipy import signal
import shutil
from signal_processing import SignalProcessor
class AudioAugmenter:
def __init__(self, sr=22050):
self.sr = sr
self.signal_processor = SignalProcessor(sr=sr)
def time_stretch(self, y, rate=1.2):
"""Time stretch the audio signal"""
return librosa.effects.time_stretch(y, rate=rate)
def pitch_shift(self, y, steps=2):
"""Pitch shift the audio signal"""
return librosa.effects.pitch_shift(y, sr=self.sr, n_steps=steps)
def add_noise(self, y, noise_factor=0.005):
"""Add random noise to the audio signal"""
noise = np.random.randn(len(y))
return y + noise_factor * noise
def shift(self, y, shift_max=1000):
"""Shift the audio signal"""
shift = np.random.randint(-shift_max, shift_max)
return np.roll(y, shift)
def change_volume(self, y, volume_factor=0.8):
"""Change the volume of the audio signal"""
return y * volume_factor
def apply_filter(self, y, filter_type='highpass', cutoff_freq=1000):
"""Apply a filter to the audio signal"""
nyquist = 0.5 * self.sr
cutoff = cutoff_freq / nyquist
if filter_type == 'highpass':
b, a = signal.butter(4, cutoff, btype='highpass')
elif filter_type == 'lowpass':
b, a = signal.butter(4, cutoff, btype='lowpass')
else:
return y
return signal.filtfilt(b, a, y)
def augment_audio(self, y, augmentation_type):
"""Apply a specific augmentation to the audio signal"""
if augmentation_type == 'time_stretch':
y_aug = self.time_stretch(y)
elif augmentation_type == 'pitch_shift':
y_aug = self.pitch_shift(y)
elif augmentation_type == 'noise':
y_aug = self.add_noise(y)
elif augmentation_type == 'shift':
y_aug = self.shift(y)
elif augmentation_type == 'volume':
y_aug = self.change_volume(y)
elif augmentation_type == 'highpass':
y_aug = self.apply_filter(y, 'highpass')
elif augmentation_type == 'lowpass':
y_aug = self.apply_filter(y, 'lowpass')
elif augmentation_type == 'valve_lash_enhance':
# Apply specialized valve lash processing
y_aug = self.signal_processor.bandpass_filter(y, low_freq=800, high_freq=5000)
y_aug = self.signal_processor.enhance_transients(y_aug, threshold=0.05, boost_factor=2.5)
y_aug = self.signal_processor.harmonic_percussive_separation(y_aug, margin=4.0)
else:
y_aug = y
return y_aug
def augment_file(self, input_file, output_dir, augmentation_types=None, prefix='aug'):
"""Augment a single audio file with multiple augmentation types"""
if augmentation_types is None:
augmentation_types = ['time_stretch', 'pitch_shift', 'noise', 'shift', 'volume', 'highpass', 'lowpass']
# Add valve_lash_enhance for valve lash files
if 'valve_lash' in input_file and 'valve_lash_enhance' not in augmentation_types:
augmentation_types.append('valve_lash_enhance')
# Load audio file
y, sr = librosa.load(input_file, sr=self.sr)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Get base filename
base_name = os.path.basename(input_file)
# Apply each augmentation and save the result
for aug_type in augmentation_types:
y_aug = self.augment_audio(y, aug_type)
# Create output filename
output_file = os.path.join(output_dir, f"{prefix}_{aug_type}_{base_name}")
# Save augmented audio
sf.write(output_file, y_aug, sr)
def augment_directory(self, input_dir, output_dir, target_count=None, augmentation_types=None):
"""Augment all audio files in a directory"""
if augmentation_types is None:
augmentation_types = ['time_stretch', 'pitch_shift', 'noise', 'shift', 'volume', 'highpass', 'lowpass']
# Add valve_lash_enhance for valve lash directory
if 'valve_lash' in input_dir and 'valve_lash_enhance' not in augmentation_types:
augmentation_types.append('valve_lash_enhance')
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Get all audio files in the input directory
audio_files = []
for root, _, files in os.walk(input_dir):
for file in files:
if file.endswith('.wav'):
audio_files.append(os.path.join(root, file))
# Determine how many augmentations to create per file
num_files = len(audio_files)
if target_count is not None and num_files > 0:
# Calculate how many augmentations we need per file to reach target_count
augs_per_file = max(1, int(np.ceil((target_count - num_files) / num_files)))
# Limit to the number of augmentation types available
augs_per_file = min(augs_per_file, len(augmentation_types))
print(f"Creating {augs_per_file} augmentations per file to reach target of {target_count}")
# Use only the needed augmentation types
augmentation_types = augmentation_types[:augs_per_file]
# Process each audio file
for audio_file in audio_files:
# Get the relative path from input_dir
rel_path = os.path.relpath(audio_file, input_dir)
parent_dir = os.path.dirname(rel_path)
# Create corresponding output directory
file_output_dir = os.path.join(output_dir, parent_dir)
os.makedirs(file_output_dir, exist_ok=True)
# Augment the file
self.augment_file(audio_file, file_output_dir, augmentation_types)
# Also copy the original file to the output directory
output_file = os.path.join(file_output_dir, os.path.basename(audio_file))
shutil.copy2(audio_file, output_file)
def augment_dataset(input_dir, output_dir, target_count=None, sr=22050):
"""
Augment all audio files in the input directory and save them to the output directory.
Args:
input_dir (str): Directory containing audio files to augment
output_dir (str): Directory to save augmented files
target_count (int, optional): Target number of samples per class
sr (int): Sample rate
"""
augmenter = AudioAugmenter(sr=sr)
# Get all subdirectories (classes)
classes = [d for d in os.listdir(input_dir) if os.path.isdir(os.path.join(input_dir, d))]
for class_name in classes:
class_input_dir = os.path.join(input_dir, class_name)
class_output_dir = os.path.join(output_dir, class_name)
# Determine augmentation types based on class
augmentation_types = ['time_stretch', 'pitch_shift', 'noise', 'shift', 'volume', 'highpass', 'lowpass']
# Add specialized augmentations for specific classes
if class_name == 'valve_lash':
augmentation_types.append('valve_lash_enhance')
# Apply more aggressive augmentation for fan_belt_issue class
if class_name == 'fan_belt_issue':
# Add more variations of the same augmentation types with different parameters
augmentation_types = ['time_stretch', 'pitch_shift', 'noise', 'shift', 'volume',
'highpass', 'lowpass', 'time_stretch', 'pitch_shift', 'noise']
print(f"Augmenting {class_name} with {augmentation_types}")
augmenter.augment_directory(class_input_dir, class_output_dir, target_count, augmentation_types)
print(f"Augmentation complete. Augmented files saved to {output_dir}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='Augment audio files for engine sound classification')
parser.add_argument('--input-dir', type=str, default='data', help='Directory containing audio files to augment')
parser.add_argument('--output-dir', type=str, default='augmented_data', help='Directory to save augmented files')
parser.add_argument('--target-count', type=int, default=None, help='Target number of samples per class')
parser.add_argument('--sr', type=int, default=22050, help='Sample rate')
args = parser.parse_args()
augment_dataset(args.input_dir, args.output_dir, args.target_count, args.sr)
|