TREA_2.0_codebase / preprocess_esc50.py
malay-36's picture
Upload folder using huggingface_hub
fec9168 verified
#!/usr/bin/env python3
"""
ESC-50 Preprocessing Script for Duration Task
This script processes all ESC-50 audio clips to:
1. Apply amplitude-based filtering to detect actual sound regions
2. Calculate effective duration (portion containing actual sound)
3. Save trimmed audio files (with silence removed)
4. Generate a CSV with all metadata including effective durations
Usage:
python preprocess_esc50.py --config config.yaml
python preprocess_esc50.py --config config.yaml --threshold-db -40 --min-sound-ms 50
"""
import argparse
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from pydub import AudioSegment
from tqdm import tqdm
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from utils.logger import setup_logger
logger = setup_logger(__name__)
def get_amplitude_array(audio: AudioSegment) -> np.ndarray:
"""
Convert AudioSegment to numpy array of amplitudes.
Args:
audio: Input audio segment
Returns:
Numpy array of amplitude values (normalized to -1 to 1)
"""
samples = np.array(audio.get_array_of_samples())
# Handle stereo by averaging channels
if audio.channels == 2:
samples = samples.reshape((-1, 2)).mean(axis=1)
# Normalize to -1 to 1 range
max_val = float(2 ** (audio.sample_width * 8 - 1))
samples = samples / max_val
return samples
def compute_rms_envelope(samples: np.ndarray, frame_size_ms: int, hop_size_ms: int,
sample_rate: int) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute RMS envelope of audio signal.
Args:
samples: Audio samples as numpy array
frame_size_ms: Frame size in milliseconds
hop_size_ms: Hop size in milliseconds
sample_rate: Audio sample rate
Returns:
Tuple of (rms_values, time_stamps_ms)
"""
frame_size = int(sample_rate * frame_size_ms / 1000)
hop_size = int(sample_rate * hop_size_ms / 1000)
rms_values = []
time_stamps = []
for i in range(0, len(samples) - frame_size + 1, hop_size):
frame = samples[i:i + frame_size]
rms = np.sqrt(np.mean(frame ** 2))
rms_values.append(rms)
time_stamps.append(i / sample_rate * 1000) # Convert to ms
return np.array(rms_values), np.array(time_stamps)
def rms_to_db(rms: np.ndarray, reference: float = 1.0) -> np.ndarray:
"""
Convert RMS values to decibels.
Args:
rms: RMS values
reference: Reference value (default 1.0 for normalized audio)
Returns:
dB values
"""
# Avoid log(0) by using a small epsilon
epsilon = 1e-10
return 20 * np.log10(np.maximum(rms, epsilon) / reference)
def detect_sound_regions(
audio: AudioSegment,
threshold_db: float = -40.0,
min_sound_duration_ms: int = 50,
frame_size_ms: int = 20,
hop_size_ms: int = 10,
merge_gap_ms: int = 100,
threshold_strategy: str = 'noise_floor',
noise_floor_percentile: float = 10.0,
noise_floor_delta_db: float = 15.0
) -> List[Tuple[int, int]]:
"""
Detect regions in audio that contain actual sound (above threshold).
Supports two threshold strategies:
- 'peak_relative': threshold = peak_db + threshold_db (old behavior)
- 'noise_floor': threshold = percentile(db_values, p) + delta_db (adaptive per-clip)
The 'noise_floor' strategy is recommended as it adapts to each clip's
actual background noise level rather than using a fixed offset from peak.
Args:
audio: Input audio segment
threshold_db: dB threshold below peak (used if strategy='peak_relative')
min_sound_duration_ms: Minimum duration of sound region to keep
frame_size_ms: Frame size for RMS computation
hop_size_ms: Hop size for RMS computation
merge_gap_ms: Merge regions separated by less than this gap
threshold_strategy: 'peak_relative' or 'noise_floor'
noise_floor_percentile: Percentile for noise floor estimation (default 10)
noise_floor_delta_db: dB above noise floor to set threshold (default 15)
Returns:
List of (start_ms, end_ms) tuples for sound regions
"""
samples = get_amplitude_array(audio)
sample_rate = audio.frame_rate
# Compute RMS envelope
rms_values, time_stamps = compute_rms_envelope(
samples, frame_size_ms, hop_size_ms, sample_rate
)
if len(rms_values) == 0:
return []
# Convert to dB
db_values = rms_to_db(rms_values)
# Compute threshold based on strategy
peak_db = np.max(db_values)
if threshold_strategy == 'noise_floor':
# ADAPTIVE: Use noise floor (low percentile) + delta
# This adapts to each clip's actual background noise level
noise_floor_db = np.percentile(db_values, noise_floor_percentile)
absolute_threshold = noise_floor_db + noise_floor_delta_db
# Safeguard: don't exceed peak (would detect nothing)
# Leave at least 1 dB below peak
absolute_threshold = min(absolute_threshold, peak_db - 1.0)
logger.debug(
f"Noise-floor threshold: floor={noise_floor_db:.1f}dB (p{noise_floor_percentile}), "
f"delta={noise_floor_delta_db}dB, threshold={absolute_threshold:.1f}dB, peak={peak_db:.1f}dB"
)
else:
# OLD: peak-relative threshold
absolute_threshold = peak_db + threshold_db # threshold_db is negative
logger.debug(
f"Peak-relative threshold: peak={peak_db:.1f}dB, offset={threshold_db}dB, "
f"threshold={absolute_threshold:.1f}dB"
)
# Find frames above threshold
above_threshold = db_values > absolute_threshold
# Find contiguous regions
regions = []
in_region = False
region_start = 0
for i, (is_above, time_ms) in enumerate(zip(above_threshold, time_stamps)):
if is_above and not in_region:
# Start of new region
in_region = True
region_start = time_ms
elif not is_above and in_region:
# End of region
in_region = False
region_end = time_ms
if region_end - region_start >= min_sound_duration_ms:
regions.append((int(region_start), int(region_end)))
# Handle case where audio ends while still in a region
if in_region:
region_end = time_stamps[-1] + hop_size_ms
if region_end - region_start >= min_sound_duration_ms:
regions.append((int(region_start), int(region_end)))
# Merge regions that are close together
if len(regions) > 1:
merged_regions = [regions[0]]
for start, end in regions[1:]:
prev_start, prev_end = merged_regions[-1]
if start - prev_end <= merge_gap_ms:
# Merge with previous region
merged_regions[-1] = (prev_start, end)
else:
merged_regions.append((start, end))
regions = merged_regions
return regions
def get_sound_regions(
audio: AudioSegment,
threshold_db: float = -40.0,
min_sound_duration_ms: int = 50,
threshold_strategy: str = 'noise_floor',
noise_floor_percentile: float = 10.0,
noise_floor_delta_db: float = 15.0
) -> List[Tuple[int, int]]:
"""
Detect sound regions in audio using adaptive threshold.
Args:
audio: Input audio segment
threshold_db: dB threshold below peak (used if strategy='peak_relative')
min_sound_duration_ms: Minimum duration of sound region to keep
threshold_strategy: 'peak_relative' or 'noise_floor'
noise_floor_percentile: Percentile for noise floor estimation
noise_floor_delta_db: dB above noise floor to set threshold
Returns:
List of (start_ms, end_ms) tuples for sound regions
"""
return detect_sound_regions(
audio,
threshold_db=threshold_db,
min_sound_duration_ms=min_sound_duration_ms,
threshold_strategy=threshold_strategy,
noise_floor_percentile=noise_floor_percentile,
noise_floor_delta_db=noise_floor_delta_db
)
def extract_sound_with_edges_trimmed(
audio: AudioSegment,
regions: List[Tuple[int, int]],
min_silence_to_trim_ms: int = 100,
buffer_ratio: float = 0.1
) -> AudioSegment:
"""
Extract audio with ONLY leftmost and rightmost silence removed IF present.
Trimming is ADAPTIVE:
- Only trims if edge silence >= min_silence_to_trim_ms
- Keeps a small percentage (buffer_ratio) of the silence to preserve transients
- Buffer size adapts to actual silence duration (not fixed)
Preserves all internal structure and silence between sounds.
Perfect for periodic sounds (clock ticks, footsteps, typing).
Args:
audio: Input audio segment
regions: List of (start_ms, end_ms) tuples for sound regions
min_silence_to_trim_ms: Only trim if edge silence is at least this long (default 100ms)
buffer_ratio: Keep this fraction of the silence as buffer (default 0.1 = 10%)
Example: 500ms silence -> keep 50ms buffer
Returns:
Audio segment with edges trimmed (or original if no significant silence)
"""
if not regions:
# No sound detected - return original
return audio
# Find the overall sound boundaries (first sound start, last sound end)
first_sound_start_ms = regions[0][0]
last_sound_end_ms = regions[-1][1]
audio_duration_ms = len(audio)
# Calculate actual silence durations at edges
leading_silence_ms = first_sound_start_ms
trailing_silence_ms = audio_duration_ms - last_sound_end_ms
# Adaptive trimming: only trim if there's significant silence
# Keep a small percentage as buffer to avoid cutting transients
if leading_silence_ms >= min_silence_to_trim_ms:
buffer_ms = max(200, int(leading_silence_ms * buffer_ratio)) # At least 200ms buffer
trim_start_ms = max(0, first_sound_start_ms - buffer_ms)
else:
# Not enough silence to trim - keep from start
trim_start_ms = 0
if trailing_silence_ms >= min_silence_to_trim_ms:
buffer_ms = max(200, int(trailing_silence_ms * buffer_ratio))
trim_end_ms = min(audio_duration_ms, last_sound_end_ms + buffer_ms)
else:
# Not enough silence to trim - keep to end
trim_end_ms = audio_duration_ms
# Extract the edge-trimmed portion (internal structure preserved)
trimmed_audio = audio[trim_start_ms:trim_end_ms]
logger.debug(
f"Edge trim: {audio_duration_ms}ms -> {len(trimmed_audio)}ms "
f"(leading_silence={leading_silence_ms}ms, trailing_silence={trailing_silence_ms}ms, "
f"trim_start={trim_start_ms}ms, trim_end={trim_end_ms}ms)"
)
return trimmed_audio
def extract_all_sound_regions(
audio: AudioSegment,
regions: List[Tuple[int, int]],
crossfade_ms: int = 10,
padding_ms: int = 20
) -> AudioSegment:
"""
Extract ALL sound portions and join them, removing ALL silence.
WARNING: This destroys natural periodicity! Use trim_edges_only() instead
for most use cases. This function is kept for backward compatibility.
Args:
audio: Input audio segment
regions: List of (start_ms, end_ms) tuples for sound regions
crossfade_ms: Crossfade duration when joining regions
padding_ms: Padding around each region to avoid cutting transients
Returns:
Audio segment containing only sound portions (internal silence removed)
"""
if not regions:
return audio
# Extract each region
extracted_parts = []
for start_ms, end_ms in regions:
# Add padding to avoid cutting off transients
padded_start = max(0, start_ms - padding_ms)
padded_end = min(len(audio), end_ms + padding_ms)
part = audio[padded_start:padded_end]
extracted_parts.append(part)
# Concatenate with crossfade
if len(extracted_parts) == 1:
return extracted_parts[0]
result = extracted_parts[0]
for part in extracted_parts[1:]:
if len(result) > crossfade_ms and len(part) > crossfade_ms:
result = result.append(part, crossfade=crossfade_ms)
else:
result = result + part
return result
def process_esc50_dataset(
audio_dir: str,
metadata_path: str,
output_dir: str,
threshold_db: float = -40.0,
min_sound_duration_ms: int = 50,
save_trimmed_audio: bool = True,
threshold_strategy: str = 'noise_floor',
noise_floor_percentile: float = 10.0,
noise_floor_delta_db: float = 15.0
) -> pd.DataFrame:
"""
Process entire ESC-50 dataset and compute effective durations.
Uses ADAPTIVE EDGE-ONLY trimming to preserve natural periodicity of sounds.
Only leading and trailing silence is removed IF significant (>=100ms).
Trimming is adaptive: keeps a small percentage of silence as buffer for transients.
All internal structure is preserved.
Supports two threshold strategies for adaptive per-clip thresholding:
- 'peak_relative': threshold = peak_db + threshold_db (fixed offset from peak)
- 'noise_floor': threshold = percentile(db, p) + delta (adapts to noise floor)
Args:
audio_dir: Path to ESC-50 audio directory
metadata_path: Path to ESC-50 metadata CSV
output_dir: Output directory for processed files
threshold_db: dB threshold for silence detection (peak_relative mode)
min_sound_duration_ms: Minimum sound duration to keep
save_trimmed_audio: Whether to save trimmed audio files
threshold_strategy: 'peak_relative' or 'noise_floor' (recommended)
noise_floor_percentile: Percentile for noise floor estimation (default 5)
noise_floor_delta_db: dB above noise floor to set threshold (default 8)
Returns:
DataFrame with processed metadata
"""
# Load original metadata
original_metadata = pd.read_csv(metadata_path)
logger.info(f"Loaded metadata for {len(original_metadata)} clips")
# Create output directories
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
if save_trimmed_audio:
trimmed_audio_dir = output_path / "trimmed_audio"
trimmed_audio_dir.mkdir(exist_ok=True)
# Process each audio file
results = []
for _, row in tqdm(original_metadata.iterrows(), total=len(original_metadata),
desc="Processing ESC-50 clips"):
filename = row['filename']
category = row['category']
audio_path = Path(audio_dir) / filename
try:
# Load audio
audio = AudioSegment.from_file(str(audio_path), format="wav")
raw_duration_s = len(audio) / 1000.0
# Detect sound regions (using adaptive threshold)
regions = get_sound_regions(
audio,
threshold_db=threshold_db,
min_sound_duration_ms=min_sound_duration_ms,
threshold_strategy=threshold_strategy,
noise_floor_percentile=noise_floor_percentile,
noise_floor_delta_db=noise_floor_delta_db
)
# Trim edges only (leftmost and rightmost silence)
# Adaptive trimming: only trims if silence >= 100ms, keeps 10% as buffer
trimmed_audio = extract_sound_with_edges_trimmed(audio, regions)
final_duration_s = len(trimmed_audio) / 1000.0
# Calculate peak amplitude and RMS from trimmed audio
samples = get_amplitude_array(trimmed_audio)
peak_amplitude = np.max(np.abs(samples))
peak_amplitude_db = 20 * np.log10(peak_amplitude + 1e-10)
rms = np.sqrt(np.mean(samples ** 2))
avg_rms_db = 20 * np.log10(rms + 1e-10)
# Calculate effective duration (sum of sound regions)
effective_duration_s = sum(end - start for start, end in regions) / 1000.0 if regions else final_duration_s
# Save trimmed audio
trimmed_filename = None
if save_trimmed_audio:
trimmed_filename = filename
trimmed_path = trimmed_audio_dir / trimmed_filename
trimmed_audio.export(str(trimmed_path), format="wav")
# Store results
results.append({
'filename': filename,
'category': category,
'fold': row['fold'],
'target': row['target'],
'esc10': row['esc10'],
'raw_duration_s': round(raw_duration_s, 4),
'final_duration_s': round(final_duration_s, 4),
'effective_duration_s': round(effective_duration_s, 4),
'num_sound_regions': len(regions),
'peak_amplitude_db': round(peak_amplitude_db, 2),
'avg_rms_db': round(avg_rms_db, 2),
'trimmed_filename': trimmed_filename if save_trimmed_audio else None,
'threshold_strategy': threshold_strategy,
'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None,
'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None,
'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None,
'min_sound_duration_ms_used': min_sound_duration_ms
})
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
results.append({
'filename': filename,
'category': category,
'fold': row['fold'],
'target': row['target'],
'esc10': row['esc10'],
'raw_duration_s': None,
'final_duration_s': None,
'effective_duration_s': None,
'num_sound_regions': 0,
'peak_amplitude_db': None,
'avg_rms_db': None,
'trimmed_filename': None,
'threshold_strategy': threshold_strategy,
'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None,
'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None,
'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None,
'min_sound_duration_ms_used': min_sound_duration_ms,
'error': str(e)
})
# Create DataFrame
results_df = pd.DataFrame(results)
# Save CSV
csv_path = output_path / "effective_durations.csv"
results_df.to_csv(csv_path, index=False)
logger.info(f"Saved effective durations to {csv_path}")
# Print summary statistics
print_summary_statistics(results_df)
return results_df
def print_summary_statistics(df: pd.DataFrame):
"""Print summary statistics of the processed dataset."""
print("\n" + "=" * 60)
print("ESC-50 Preprocessing Summary")
print("=" * 60)
# Filter out errors
valid_df = df[df['effective_duration_s'].notna()]
print(f"\nTotal clips processed: {len(df)}")
print(f"Successfully processed: {len(valid_df)}")
print(f"Errors: {len(df) - len(valid_df)}")
print(f"\nRaw duration statistics:")
print(f" Mean: {valid_df['raw_duration_s'].mean():.3f}s")
print(f" Std: {valid_df['raw_duration_s'].std():.3f}s")
print(f" Min: {valid_df['raw_duration_s'].min():.3f}s")
print(f" Max: {valid_df['raw_duration_s'].max():.3f}s")
print(f"\nFinal duration statistics (edges trimmed, internal structure preserved):")
print(f" Mean: {valid_df['final_duration_s'].mean():.3f}s")
print(f" Std: {valid_df['final_duration_s'].std():.3f}s")
print(f" Min: {valid_df['final_duration_s'].min():.3f}s")
print(f" Max: {valid_df['final_duration_s'].max():.3f}s")
print(f"\nEffective duration statistics (sum of sound regions only):")
print(f" Mean: {valid_df['effective_duration_s'].mean():.3f}s")
print(f" Std: {valid_df['effective_duration_s'].std():.3f}s")
print(f" Min: {valid_df['effective_duration_s'].min():.3f}s")
print(f" Max: {valid_df['effective_duration_s'].max():.3f}s")
# Compare effective vs final
print(f"\nComparison (final includes internal silences):")
print(f" Avg effective: {valid_df['effective_duration_s'].mean():.3f}s")
print(f" Avg final: {valid_df['final_duration_s'].mean():.3f}s")
print(f" Difference: {valid_df['final_duration_s'].mean() - valid_df['effective_duration_s'].mean():.3f}s (internal silences)")
# Duration reduction
reduction = (1 - valid_df['final_duration_s'].mean() / valid_df['raw_duration_s'].mean()) * 100
print(f"\nAverage edge trimming reduction: {reduction:.1f}%")
# Per-category statistics
print("\nEffective duration by category (top 10 longest):")
category_stats = valid_df.groupby('category')['effective_duration_s'].agg(['mean', 'std', 'min', 'max'])
category_stats = category_stats.sort_values('mean', ascending=False)
print(category_stats.head(10).to_string())
print("\nEffective duration by category (top 10 shortest):")
print(category_stats.tail(10).to_string())
print("\n" + "=" * 60)
def load_config(config_path: str) -> dict:
"""Load configuration from YAML file."""
import yaml
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def main():
parser = argparse.ArgumentParser(
description="Preprocess ESC-50 dataset for duration task"
)
parser.add_argument(
'--config', '-c',
type=str,
default='config.yaml',
help='Path to configuration file'
)
parser.add_argument(
'--threshold-db',
type=float,
default=None,
help='dB threshold below peak for silence detection (default: -40)'
)
parser.add_argument(
'--min-sound-ms',
type=int,
default=None,
help='Minimum sound duration in ms to keep (default: 50)'
)
parser.add_argument(
'--output-dir',
type=str,
default=None,
help='Output directory (default: from config or ESC-50_preprocessed)'
)
parser.add_argument(
'--no-trimmed-audio',
action='store_true',
help='Do not save trimmed audio files (only save CSV)'
)
parser.add_argument(
'--threshold-strategy',
type=str,
choices=['peak_relative', 'noise_floor'],
default=None,
help='Threshold strategy: peak_relative (old) or noise_floor (adaptive, recommended)'
)
parser.add_argument(
'--noise-floor-percentile',
type=float,
default=None,
help='Percentile for noise floor estimation (default: 10)'
)
parser.add_argument(
'--noise-floor-delta-db',
type=float,
default=None,
help='dB above noise floor to set threshold (default: 15)'
)
args = parser.parse_args()
# Load config
config = load_config(args.config)
# Get ESC-50 paths from config
esc50_config = config.get('esc50', {})
audio_dir = esc50_config.get('audio_path', '/home/debarpanb1/ESC-50_github/audio')
metadata_path = esc50_config.get('metadata_path', '/home/debarpanb1/ESC-50_github/meta/esc50.csv')
# Get duration task config for preprocessing parameters
duration_config = config.get('tasks', {}).get('duration', {})
# Determine threshold and min sound duration
threshold_db = args.threshold_db
if threshold_db is None:
threshold_db = duration_config.get('amplitude_threshold_db', -40.0)
min_sound_ms = args.min_sound_ms
if min_sound_ms is None:
min_sound_ms = duration_config.get('min_sound_duration_ms', 50)
# Determine output directory
output_dir = args.output_dir
if output_dir is None:
output_dir = duration_config.get(
'preprocessed_data_path',
'/home/debarpanb1/TREA_2.0/ESC-50_preprocessed'
)
# Determine threshold strategy (noise_floor is recommended/default)
threshold_strategy = args.threshold_strategy
if threshold_strategy is None:
threshold_strategy = duration_config.get('threshold_strategy', 'noise_floor')
# Determine noise floor percentile
noise_floor_percentile = args.noise_floor_percentile
if noise_floor_percentile is None:
noise_floor_percentile = duration_config.get('noise_floor_percentile', 10.0)
# Determine noise floor delta dB
noise_floor_delta_db = args.noise_floor_delta_db
if noise_floor_delta_db is None:
noise_floor_delta_db = duration_config.get('noise_floor_delta_db', 15.0)
# Log configuration
logger.info("=" * 60)
logger.info("ESC-50 Preprocessing Configuration")
logger.info("=" * 60)
logger.info(f"Audio directory: {audio_dir}")
logger.info(f"Metadata path: {metadata_path}")
logger.info(f"Output directory: {output_dir}")
logger.info(f"Threshold strategy: {threshold_strategy}")
if threshold_strategy == 'peak_relative':
logger.info(f" Peak-relative threshold dB: {threshold_db}")
else:
logger.info(f" Noise floor percentile: {noise_floor_percentile}")
logger.info(f" Noise floor delta dB: {noise_floor_delta_db}")
logger.info(f"Min sound duration (ms): {min_sound_ms}")
logger.info(f"Adaptive edge trimming: only if silence >= 100ms, keep 10% buffer")
logger.info(f"Save trimmed audio: {not args.no_trimmed_audio}")
logger.info("=" * 60)
# Process dataset
results_df = process_esc50_dataset(
audio_dir=audio_dir,
metadata_path=metadata_path,
output_dir=output_dir,
threshold_db=threshold_db,
min_sound_duration_ms=min_sound_ms,
save_trimmed_audio=not args.no_trimmed_audio,
threshold_strategy=threshold_strategy,
noise_floor_percentile=noise_floor_percentile,
noise_floor_delta_db=noise_floor_delta_db
)
logger.info(f"\nPreprocessing complete!")
logger.info(f"Results saved to: {output_dir}")
return results_df
if __name__ == "__main__":
main()