#!/usr/bin/env python3 """ ESC-50 Preprocessing Script for Duration Task This script processes all ESC-50 audio clips to: 1. Apply amplitude-based filtering to detect actual sound regions 2. Calculate effective duration (portion containing actual sound) 3. Save trimmed audio files (with silence removed) 4. Generate a CSV with all metadata including effective durations Usage: python preprocess_esc50.py --config config.yaml python preprocess_esc50.py --config config.yaml --threshold-db -40 --min-sound-ms 50 """ import argparse import os import sys from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd from pydub import AudioSegment from tqdm import tqdm # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) from utils.logger import setup_logger logger = setup_logger(__name__) def get_amplitude_array(audio: AudioSegment) -> np.ndarray: """ Convert AudioSegment to numpy array of amplitudes. Args: audio: Input audio segment Returns: Numpy array of amplitude values (normalized to -1 to 1) """ samples = np.array(audio.get_array_of_samples()) # Handle stereo by averaging channels if audio.channels == 2: samples = samples.reshape((-1, 2)).mean(axis=1) # Normalize to -1 to 1 range max_val = float(2 ** (audio.sample_width * 8 - 1)) samples = samples / max_val return samples def compute_rms_envelope(samples: np.ndarray, frame_size_ms: int, hop_size_ms: int, sample_rate: int) -> Tuple[np.ndarray, np.ndarray]: """ Compute RMS envelope of audio signal. Args: samples: Audio samples as numpy array frame_size_ms: Frame size in milliseconds hop_size_ms: Hop size in milliseconds sample_rate: Audio sample rate Returns: Tuple of (rms_values, time_stamps_ms) """ frame_size = int(sample_rate * frame_size_ms / 1000) hop_size = int(sample_rate * hop_size_ms / 1000) rms_values = [] time_stamps = [] for i in range(0, len(samples) - frame_size + 1, hop_size): frame = samples[i:i + frame_size] rms = np.sqrt(np.mean(frame ** 2)) rms_values.append(rms) time_stamps.append(i / sample_rate * 1000) # Convert to ms return np.array(rms_values), np.array(time_stamps) def rms_to_db(rms: np.ndarray, reference: float = 1.0) -> np.ndarray: """ Convert RMS values to decibels. Args: rms: RMS values reference: Reference value (default 1.0 for normalized audio) Returns: dB values """ # Avoid log(0) by using a small epsilon epsilon = 1e-10 return 20 * np.log10(np.maximum(rms, epsilon) / reference) def detect_sound_regions( audio: AudioSegment, threshold_db: float = -40.0, min_sound_duration_ms: int = 50, frame_size_ms: int = 20, hop_size_ms: int = 10, merge_gap_ms: int = 100, threshold_strategy: str = 'noise_floor', noise_floor_percentile: float = 10.0, noise_floor_delta_db: float = 15.0 ) -> List[Tuple[int, int]]: """ Detect regions in audio that contain actual sound (above threshold). Supports two threshold strategies: - 'peak_relative': threshold = peak_db + threshold_db (old behavior) - 'noise_floor': threshold = percentile(db_values, p) + delta_db (adaptive per-clip) The 'noise_floor' strategy is recommended as it adapts to each clip's actual background noise level rather than using a fixed offset from peak. Args: audio: Input audio segment threshold_db: dB threshold below peak (used if strategy='peak_relative') min_sound_duration_ms: Minimum duration of sound region to keep frame_size_ms: Frame size for RMS computation hop_size_ms: Hop size for RMS computation merge_gap_ms: Merge regions separated by less than this gap threshold_strategy: 'peak_relative' or 'noise_floor' noise_floor_percentile: Percentile for noise floor estimation (default 10) noise_floor_delta_db: dB above noise floor to set threshold (default 15) Returns: List of (start_ms, end_ms) tuples for sound regions """ samples = get_amplitude_array(audio) sample_rate = audio.frame_rate # Compute RMS envelope rms_values, time_stamps = compute_rms_envelope( samples, frame_size_ms, hop_size_ms, sample_rate ) if len(rms_values) == 0: return [] # Convert to dB db_values = rms_to_db(rms_values) # Compute threshold based on strategy peak_db = np.max(db_values) if threshold_strategy == 'noise_floor': # ADAPTIVE: Use noise floor (low percentile) + delta # This adapts to each clip's actual background noise level noise_floor_db = np.percentile(db_values, noise_floor_percentile) absolute_threshold = noise_floor_db + noise_floor_delta_db # Safeguard: don't exceed peak (would detect nothing) # Leave at least 1 dB below peak absolute_threshold = min(absolute_threshold, peak_db - 1.0) logger.debug( f"Noise-floor threshold: floor={noise_floor_db:.1f}dB (p{noise_floor_percentile}), " f"delta={noise_floor_delta_db}dB, threshold={absolute_threshold:.1f}dB, peak={peak_db:.1f}dB" ) else: # OLD: peak-relative threshold absolute_threshold = peak_db + threshold_db # threshold_db is negative logger.debug( f"Peak-relative threshold: peak={peak_db:.1f}dB, offset={threshold_db}dB, " f"threshold={absolute_threshold:.1f}dB" ) # Find frames above threshold above_threshold = db_values > absolute_threshold # Find contiguous regions regions = [] in_region = False region_start = 0 for i, (is_above, time_ms) in enumerate(zip(above_threshold, time_stamps)): if is_above and not in_region: # Start of new region in_region = True region_start = time_ms elif not is_above and in_region: # End of region in_region = False region_end = time_ms if region_end - region_start >= min_sound_duration_ms: regions.append((int(region_start), int(region_end))) # Handle case where audio ends while still in a region if in_region: region_end = time_stamps[-1] + hop_size_ms if region_end - region_start >= min_sound_duration_ms: regions.append((int(region_start), int(region_end))) # Merge regions that are close together if len(regions) > 1: merged_regions = [regions[0]] for start, end in regions[1:]: prev_start, prev_end = merged_regions[-1] if start - prev_end <= merge_gap_ms: # Merge with previous region merged_regions[-1] = (prev_start, end) else: merged_regions.append((start, end)) regions = merged_regions return regions def get_sound_regions( audio: AudioSegment, threshold_db: float = -40.0, min_sound_duration_ms: int = 50, threshold_strategy: str = 'noise_floor', noise_floor_percentile: float = 10.0, noise_floor_delta_db: float = 15.0 ) -> List[Tuple[int, int]]: """ Detect sound regions in audio using adaptive threshold. Args: audio: Input audio segment threshold_db: dB threshold below peak (used if strategy='peak_relative') min_sound_duration_ms: Minimum duration of sound region to keep threshold_strategy: 'peak_relative' or 'noise_floor' noise_floor_percentile: Percentile for noise floor estimation noise_floor_delta_db: dB above noise floor to set threshold Returns: List of (start_ms, end_ms) tuples for sound regions """ return detect_sound_regions( audio, threshold_db=threshold_db, min_sound_duration_ms=min_sound_duration_ms, threshold_strategy=threshold_strategy, noise_floor_percentile=noise_floor_percentile, noise_floor_delta_db=noise_floor_delta_db ) def extract_sound_with_edges_trimmed( audio: AudioSegment, regions: List[Tuple[int, int]], min_silence_to_trim_ms: int = 100, buffer_ratio: float = 0.1 ) -> AudioSegment: """ Extract audio with ONLY leftmost and rightmost silence removed IF present. Trimming is ADAPTIVE: - Only trims if edge silence >= min_silence_to_trim_ms - Keeps a small percentage (buffer_ratio) of the silence to preserve transients - Buffer size adapts to actual silence duration (not fixed) Preserves all internal structure and silence between sounds. Perfect for periodic sounds (clock ticks, footsteps, typing). Args: audio: Input audio segment regions: List of (start_ms, end_ms) tuples for sound regions min_silence_to_trim_ms: Only trim if edge silence is at least this long (default 100ms) buffer_ratio: Keep this fraction of the silence as buffer (default 0.1 = 10%) Example: 500ms silence -> keep 50ms buffer Returns: Audio segment with edges trimmed (or original if no significant silence) """ if not regions: # No sound detected - return original return audio # Find the overall sound boundaries (first sound start, last sound end) first_sound_start_ms = regions[0][0] last_sound_end_ms = regions[-1][1] audio_duration_ms = len(audio) # Calculate actual silence durations at edges leading_silence_ms = first_sound_start_ms trailing_silence_ms = audio_duration_ms - last_sound_end_ms # Adaptive trimming: only trim if there's significant silence # Keep a small percentage as buffer to avoid cutting transients if leading_silence_ms >= min_silence_to_trim_ms: buffer_ms = max(200, int(leading_silence_ms * buffer_ratio)) # At least 200ms buffer trim_start_ms = max(0, first_sound_start_ms - buffer_ms) else: # Not enough silence to trim - keep from start trim_start_ms = 0 if trailing_silence_ms >= min_silence_to_trim_ms: buffer_ms = max(200, int(trailing_silence_ms * buffer_ratio)) trim_end_ms = min(audio_duration_ms, last_sound_end_ms + buffer_ms) else: # Not enough silence to trim - keep to end trim_end_ms = audio_duration_ms # Extract the edge-trimmed portion (internal structure preserved) trimmed_audio = audio[trim_start_ms:trim_end_ms] logger.debug( f"Edge trim: {audio_duration_ms}ms -> {len(trimmed_audio)}ms " f"(leading_silence={leading_silence_ms}ms, trailing_silence={trailing_silence_ms}ms, " f"trim_start={trim_start_ms}ms, trim_end={trim_end_ms}ms)" ) return trimmed_audio def extract_all_sound_regions( audio: AudioSegment, regions: List[Tuple[int, int]], crossfade_ms: int = 10, padding_ms: int = 20 ) -> AudioSegment: """ Extract ALL sound portions and join them, removing ALL silence. WARNING: This destroys natural periodicity! Use trim_edges_only() instead for most use cases. This function is kept for backward compatibility. Args: audio: Input audio segment regions: List of (start_ms, end_ms) tuples for sound regions crossfade_ms: Crossfade duration when joining regions padding_ms: Padding around each region to avoid cutting transients Returns: Audio segment containing only sound portions (internal silence removed) """ if not regions: return audio # Extract each region extracted_parts = [] for start_ms, end_ms in regions: # Add padding to avoid cutting off transients padded_start = max(0, start_ms - padding_ms) padded_end = min(len(audio), end_ms + padding_ms) part = audio[padded_start:padded_end] extracted_parts.append(part) # Concatenate with crossfade if len(extracted_parts) == 1: return extracted_parts[0] result = extracted_parts[0] for part in extracted_parts[1:]: if len(result) > crossfade_ms and len(part) > crossfade_ms: result = result.append(part, crossfade=crossfade_ms) else: result = result + part return result def process_esc50_dataset( audio_dir: str, metadata_path: str, output_dir: str, threshold_db: float = -40.0, min_sound_duration_ms: int = 50, save_trimmed_audio: bool = True, threshold_strategy: str = 'noise_floor', noise_floor_percentile: float = 10.0, noise_floor_delta_db: float = 15.0 ) -> pd.DataFrame: """ Process entire ESC-50 dataset and compute effective durations. Uses ADAPTIVE EDGE-ONLY trimming to preserve natural periodicity of sounds. Only leading and trailing silence is removed IF significant (>=100ms). Trimming is adaptive: keeps a small percentage of silence as buffer for transients. All internal structure is preserved. Supports two threshold strategies for adaptive per-clip thresholding: - 'peak_relative': threshold = peak_db + threshold_db (fixed offset from peak) - 'noise_floor': threshold = percentile(db, p) + delta (adapts to noise floor) Args: audio_dir: Path to ESC-50 audio directory metadata_path: Path to ESC-50 metadata CSV output_dir: Output directory for processed files threshold_db: dB threshold for silence detection (peak_relative mode) min_sound_duration_ms: Minimum sound duration to keep save_trimmed_audio: Whether to save trimmed audio files threshold_strategy: 'peak_relative' or 'noise_floor' (recommended) noise_floor_percentile: Percentile for noise floor estimation (default 5) noise_floor_delta_db: dB above noise floor to set threshold (default 8) Returns: DataFrame with processed metadata """ # Load original metadata original_metadata = pd.read_csv(metadata_path) logger.info(f"Loaded metadata for {len(original_metadata)} clips") # Create output directories output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) if save_trimmed_audio: trimmed_audio_dir = output_path / "trimmed_audio" trimmed_audio_dir.mkdir(exist_ok=True) # Process each audio file results = [] for _, row in tqdm(original_metadata.iterrows(), total=len(original_metadata), desc="Processing ESC-50 clips"): filename = row['filename'] category = row['category'] audio_path = Path(audio_dir) / filename try: # Load audio audio = AudioSegment.from_file(str(audio_path), format="wav") raw_duration_s = len(audio) / 1000.0 # Detect sound regions (using adaptive threshold) regions = get_sound_regions( audio, threshold_db=threshold_db, min_sound_duration_ms=min_sound_duration_ms, threshold_strategy=threshold_strategy, noise_floor_percentile=noise_floor_percentile, noise_floor_delta_db=noise_floor_delta_db ) # Trim edges only (leftmost and rightmost silence) # Adaptive trimming: only trims if silence >= 100ms, keeps 10% as buffer trimmed_audio = extract_sound_with_edges_trimmed(audio, regions) final_duration_s = len(trimmed_audio) / 1000.0 # Calculate peak amplitude and RMS from trimmed audio samples = get_amplitude_array(trimmed_audio) peak_amplitude = np.max(np.abs(samples)) peak_amplitude_db = 20 * np.log10(peak_amplitude + 1e-10) rms = np.sqrt(np.mean(samples ** 2)) avg_rms_db = 20 * np.log10(rms + 1e-10) # Calculate effective duration (sum of sound regions) effective_duration_s = sum(end - start for start, end in regions) / 1000.0 if regions else final_duration_s # Save trimmed audio trimmed_filename = None if save_trimmed_audio: trimmed_filename = filename trimmed_path = trimmed_audio_dir / trimmed_filename trimmed_audio.export(str(trimmed_path), format="wav") # Store results results.append({ 'filename': filename, 'category': category, 'fold': row['fold'], 'target': row['target'], 'esc10': row['esc10'], 'raw_duration_s': round(raw_duration_s, 4), 'final_duration_s': round(final_duration_s, 4), 'effective_duration_s': round(effective_duration_s, 4), 'num_sound_regions': len(regions), 'peak_amplitude_db': round(peak_amplitude_db, 2), 'avg_rms_db': round(avg_rms_db, 2), 'trimmed_filename': trimmed_filename if save_trimmed_audio else None, 'threshold_strategy': threshold_strategy, 'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None, 'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None, 'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None, 'min_sound_duration_ms_used': min_sound_duration_ms }) except Exception as e: logger.error(f"Error processing {filename}: {e}") results.append({ 'filename': filename, 'category': category, 'fold': row['fold'], 'target': row['target'], 'esc10': row['esc10'], 'raw_duration_s': None, 'final_duration_s': None, 'effective_duration_s': None, 'num_sound_regions': 0, 'peak_amplitude_db': None, 'avg_rms_db': None, 'trimmed_filename': None, 'threshold_strategy': threshold_strategy, 'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None, 'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None, 'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None, 'min_sound_duration_ms_used': min_sound_duration_ms, 'error': str(e) }) # Create DataFrame results_df = pd.DataFrame(results) # Save CSV csv_path = output_path / "effective_durations.csv" results_df.to_csv(csv_path, index=False) logger.info(f"Saved effective durations to {csv_path}") # Print summary statistics print_summary_statistics(results_df) return results_df def print_summary_statistics(df: pd.DataFrame): """Print summary statistics of the processed dataset.""" print("\n" + "=" * 60) print("ESC-50 Preprocessing Summary") print("=" * 60) # Filter out errors valid_df = df[df['effective_duration_s'].notna()] print(f"\nTotal clips processed: {len(df)}") print(f"Successfully processed: {len(valid_df)}") print(f"Errors: {len(df) - len(valid_df)}") print(f"\nRaw duration statistics:") print(f" Mean: {valid_df['raw_duration_s'].mean():.3f}s") print(f" Std: {valid_df['raw_duration_s'].std():.3f}s") print(f" Min: {valid_df['raw_duration_s'].min():.3f}s") print(f" Max: {valid_df['raw_duration_s'].max():.3f}s") print(f"\nFinal duration statistics (edges trimmed, internal structure preserved):") print(f" Mean: {valid_df['final_duration_s'].mean():.3f}s") print(f" Std: {valid_df['final_duration_s'].std():.3f}s") print(f" Min: {valid_df['final_duration_s'].min():.3f}s") print(f" Max: {valid_df['final_duration_s'].max():.3f}s") print(f"\nEffective duration statistics (sum of sound regions only):") print(f" Mean: {valid_df['effective_duration_s'].mean():.3f}s") print(f" Std: {valid_df['effective_duration_s'].std():.3f}s") print(f" Min: {valid_df['effective_duration_s'].min():.3f}s") print(f" Max: {valid_df['effective_duration_s'].max():.3f}s") # Compare effective vs final print(f"\nComparison (final includes internal silences):") print(f" Avg effective: {valid_df['effective_duration_s'].mean():.3f}s") print(f" Avg final: {valid_df['final_duration_s'].mean():.3f}s") print(f" Difference: {valid_df['final_duration_s'].mean() - valid_df['effective_duration_s'].mean():.3f}s (internal silences)") # Duration reduction reduction = (1 - valid_df['final_duration_s'].mean() / valid_df['raw_duration_s'].mean()) * 100 print(f"\nAverage edge trimming reduction: {reduction:.1f}%") # Per-category statistics print("\nEffective duration by category (top 10 longest):") category_stats = valid_df.groupby('category')['effective_duration_s'].agg(['mean', 'std', 'min', 'max']) category_stats = category_stats.sort_values('mean', ascending=False) print(category_stats.head(10).to_string()) print("\nEffective duration by category (top 10 shortest):") print(category_stats.tail(10).to_string()) print("\n" + "=" * 60) def load_config(config_path: str) -> dict: """Load configuration from YAML file.""" import yaml with open(config_path, 'r') as f: return yaml.safe_load(f) def main(): parser = argparse.ArgumentParser( description="Preprocess ESC-50 dataset for duration task" ) parser.add_argument( '--config', '-c', type=str, default='config.yaml', help='Path to configuration file' ) parser.add_argument( '--threshold-db', type=float, default=None, help='dB threshold below peak for silence detection (default: -40)' ) parser.add_argument( '--min-sound-ms', type=int, default=None, help='Minimum sound duration in ms to keep (default: 50)' ) parser.add_argument( '--output-dir', type=str, default=None, help='Output directory (default: from config or ESC-50_preprocessed)' ) parser.add_argument( '--no-trimmed-audio', action='store_true', help='Do not save trimmed audio files (only save CSV)' ) parser.add_argument( '--threshold-strategy', type=str, choices=['peak_relative', 'noise_floor'], default=None, help='Threshold strategy: peak_relative (old) or noise_floor (adaptive, recommended)' ) parser.add_argument( '--noise-floor-percentile', type=float, default=None, help='Percentile for noise floor estimation (default: 10)' ) parser.add_argument( '--noise-floor-delta-db', type=float, default=None, help='dB above noise floor to set threshold (default: 15)' ) args = parser.parse_args() # Load config config = load_config(args.config) # Get ESC-50 paths from config esc50_config = config.get('esc50', {}) audio_dir = esc50_config.get('audio_path', '/home/debarpanb1/ESC-50_github/audio') metadata_path = esc50_config.get('metadata_path', '/home/debarpanb1/ESC-50_github/meta/esc50.csv') # Get duration task config for preprocessing parameters duration_config = config.get('tasks', {}).get('duration', {}) # Determine threshold and min sound duration threshold_db = args.threshold_db if threshold_db is None: threshold_db = duration_config.get('amplitude_threshold_db', -40.0) min_sound_ms = args.min_sound_ms if min_sound_ms is None: min_sound_ms = duration_config.get('min_sound_duration_ms', 50) # Determine output directory output_dir = args.output_dir if output_dir is None: output_dir = duration_config.get( 'preprocessed_data_path', '/home/debarpanb1/TREA_2.0/ESC-50_preprocessed' ) # Determine threshold strategy (noise_floor is recommended/default) threshold_strategy = args.threshold_strategy if threshold_strategy is None: threshold_strategy = duration_config.get('threshold_strategy', 'noise_floor') # Determine noise floor percentile noise_floor_percentile = args.noise_floor_percentile if noise_floor_percentile is None: noise_floor_percentile = duration_config.get('noise_floor_percentile', 10.0) # Determine noise floor delta dB noise_floor_delta_db = args.noise_floor_delta_db if noise_floor_delta_db is None: noise_floor_delta_db = duration_config.get('noise_floor_delta_db', 15.0) # Log configuration logger.info("=" * 60) logger.info("ESC-50 Preprocessing Configuration") logger.info("=" * 60) logger.info(f"Audio directory: {audio_dir}") logger.info(f"Metadata path: {metadata_path}") logger.info(f"Output directory: {output_dir}") logger.info(f"Threshold strategy: {threshold_strategy}") if threshold_strategy == 'peak_relative': logger.info(f" Peak-relative threshold dB: {threshold_db}") else: logger.info(f" Noise floor percentile: {noise_floor_percentile}") logger.info(f" Noise floor delta dB: {noise_floor_delta_db}") logger.info(f"Min sound duration (ms): {min_sound_ms}") logger.info(f"Adaptive edge trimming: only if silence >= 100ms, keep 10% buffer") logger.info(f"Save trimmed audio: {not args.no_trimmed_audio}") logger.info("=" * 60) # Process dataset results_df = process_esc50_dataset( audio_dir=audio_dir, metadata_path=metadata_path, output_dir=output_dir, threshold_db=threshold_db, min_sound_duration_ms=min_sound_ms, save_trimmed_audio=not args.no_trimmed_audio, threshold_strategy=threshold_strategy, noise_floor_percentile=noise_floor_percentile, noise_floor_delta_db=noise_floor_delta_db ) logger.info(f"\nPreprocessing complete!") logger.info(f"Results saved to: {output_dir}") return results_df if __name__ == "__main__": main()