|
|
|
|
|
""" |
|
|
ESC-50 Preprocessing Script for Duration Task |
|
|
|
|
|
This script processes all ESC-50 audio clips to: |
|
|
1. Apply amplitude-based filtering to detect actual sound regions |
|
|
2. Calculate effective duration (portion containing actual sound) |
|
|
3. Save trimmed audio files (with silence removed) |
|
|
4. Generate a CSV with all metadata including effective durations |
|
|
|
|
|
Usage: |
|
|
python preprocess_esc50.py --config config.yaml |
|
|
python preprocess_esc50.py --config config.yaml --threshold-db -40 --min-sound-ms 50 |
|
|
""" |
|
|
|
|
|
import argparse |
|
|
import os |
|
|
import sys |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from pydub import AudioSegment |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent)) |
|
|
|
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
def get_amplitude_array(audio: AudioSegment) -> np.ndarray: |
|
|
""" |
|
|
Convert AudioSegment to numpy array of amplitudes. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
|
|
|
Returns: |
|
|
Numpy array of amplitude values (normalized to -1 to 1) |
|
|
""" |
|
|
samples = np.array(audio.get_array_of_samples()) |
|
|
|
|
|
|
|
|
if audio.channels == 2: |
|
|
samples = samples.reshape((-1, 2)).mean(axis=1) |
|
|
|
|
|
|
|
|
max_val = float(2 ** (audio.sample_width * 8 - 1)) |
|
|
samples = samples / max_val |
|
|
|
|
|
return samples |
|
|
|
|
|
|
|
|
def compute_rms_envelope(samples: np.ndarray, frame_size_ms: int, hop_size_ms: int, |
|
|
sample_rate: int) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Compute RMS envelope of audio signal. |
|
|
|
|
|
Args: |
|
|
samples: Audio samples as numpy array |
|
|
frame_size_ms: Frame size in milliseconds |
|
|
hop_size_ms: Hop size in milliseconds |
|
|
sample_rate: Audio sample rate |
|
|
|
|
|
Returns: |
|
|
Tuple of (rms_values, time_stamps_ms) |
|
|
""" |
|
|
frame_size = int(sample_rate * frame_size_ms / 1000) |
|
|
hop_size = int(sample_rate * hop_size_ms / 1000) |
|
|
|
|
|
rms_values = [] |
|
|
time_stamps = [] |
|
|
|
|
|
for i in range(0, len(samples) - frame_size + 1, hop_size): |
|
|
frame = samples[i:i + frame_size] |
|
|
rms = np.sqrt(np.mean(frame ** 2)) |
|
|
rms_values.append(rms) |
|
|
time_stamps.append(i / sample_rate * 1000) |
|
|
|
|
|
return np.array(rms_values), np.array(time_stamps) |
|
|
|
|
|
|
|
|
def rms_to_db(rms: np.ndarray, reference: float = 1.0) -> np.ndarray: |
|
|
""" |
|
|
Convert RMS values to decibels. |
|
|
|
|
|
Args: |
|
|
rms: RMS values |
|
|
reference: Reference value (default 1.0 for normalized audio) |
|
|
|
|
|
Returns: |
|
|
dB values |
|
|
""" |
|
|
|
|
|
epsilon = 1e-10 |
|
|
return 20 * np.log10(np.maximum(rms, epsilon) / reference) |
|
|
|
|
|
|
|
|
def detect_sound_regions( |
|
|
audio: AudioSegment, |
|
|
threshold_db: float = -40.0, |
|
|
min_sound_duration_ms: int = 50, |
|
|
frame_size_ms: int = 20, |
|
|
hop_size_ms: int = 10, |
|
|
merge_gap_ms: int = 100, |
|
|
threshold_strategy: str = 'noise_floor', |
|
|
noise_floor_percentile: float = 10.0, |
|
|
noise_floor_delta_db: float = 15.0 |
|
|
) -> List[Tuple[int, int]]: |
|
|
""" |
|
|
Detect regions in audio that contain actual sound (above threshold). |
|
|
|
|
|
Supports two threshold strategies: |
|
|
- 'peak_relative': threshold = peak_db + threshold_db (old behavior) |
|
|
- 'noise_floor': threshold = percentile(db_values, p) + delta_db (adaptive per-clip) |
|
|
|
|
|
The 'noise_floor' strategy is recommended as it adapts to each clip's |
|
|
actual background noise level rather than using a fixed offset from peak. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
threshold_db: dB threshold below peak (used if strategy='peak_relative') |
|
|
min_sound_duration_ms: Minimum duration of sound region to keep |
|
|
frame_size_ms: Frame size for RMS computation |
|
|
hop_size_ms: Hop size for RMS computation |
|
|
merge_gap_ms: Merge regions separated by less than this gap |
|
|
threshold_strategy: 'peak_relative' or 'noise_floor' |
|
|
noise_floor_percentile: Percentile for noise floor estimation (default 10) |
|
|
noise_floor_delta_db: dB above noise floor to set threshold (default 15) |
|
|
|
|
|
Returns: |
|
|
List of (start_ms, end_ms) tuples for sound regions |
|
|
""" |
|
|
samples = get_amplitude_array(audio) |
|
|
sample_rate = audio.frame_rate |
|
|
|
|
|
|
|
|
rms_values, time_stamps = compute_rms_envelope( |
|
|
samples, frame_size_ms, hop_size_ms, sample_rate |
|
|
) |
|
|
|
|
|
if len(rms_values) == 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
db_values = rms_to_db(rms_values) |
|
|
|
|
|
|
|
|
peak_db = np.max(db_values) |
|
|
|
|
|
if threshold_strategy == 'noise_floor': |
|
|
|
|
|
|
|
|
noise_floor_db = np.percentile(db_values, noise_floor_percentile) |
|
|
absolute_threshold = noise_floor_db + noise_floor_delta_db |
|
|
|
|
|
|
|
|
|
|
|
absolute_threshold = min(absolute_threshold, peak_db - 1.0) |
|
|
|
|
|
logger.debug( |
|
|
f"Noise-floor threshold: floor={noise_floor_db:.1f}dB (p{noise_floor_percentile}), " |
|
|
f"delta={noise_floor_delta_db}dB, threshold={absolute_threshold:.1f}dB, peak={peak_db:.1f}dB" |
|
|
) |
|
|
else: |
|
|
|
|
|
absolute_threshold = peak_db + threshold_db |
|
|
logger.debug( |
|
|
f"Peak-relative threshold: peak={peak_db:.1f}dB, offset={threshold_db}dB, " |
|
|
f"threshold={absolute_threshold:.1f}dB" |
|
|
) |
|
|
|
|
|
|
|
|
above_threshold = db_values > absolute_threshold |
|
|
|
|
|
|
|
|
regions = [] |
|
|
in_region = False |
|
|
region_start = 0 |
|
|
|
|
|
for i, (is_above, time_ms) in enumerate(zip(above_threshold, time_stamps)): |
|
|
if is_above and not in_region: |
|
|
|
|
|
in_region = True |
|
|
region_start = time_ms |
|
|
elif not is_above and in_region: |
|
|
|
|
|
in_region = False |
|
|
region_end = time_ms |
|
|
if region_end - region_start >= min_sound_duration_ms: |
|
|
regions.append((int(region_start), int(region_end))) |
|
|
|
|
|
|
|
|
if in_region: |
|
|
region_end = time_stamps[-1] + hop_size_ms |
|
|
if region_end - region_start >= min_sound_duration_ms: |
|
|
regions.append((int(region_start), int(region_end))) |
|
|
|
|
|
|
|
|
if len(regions) > 1: |
|
|
merged_regions = [regions[0]] |
|
|
for start, end in regions[1:]: |
|
|
prev_start, prev_end = merged_regions[-1] |
|
|
if start - prev_end <= merge_gap_ms: |
|
|
|
|
|
merged_regions[-1] = (prev_start, end) |
|
|
else: |
|
|
merged_regions.append((start, end)) |
|
|
regions = merged_regions |
|
|
|
|
|
return regions |
|
|
|
|
|
|
|
|
def get_sound_regions( |
|
|
audio: AudioSegment, |
|
|
threshold_db: float = -40.0, |
|
|
min_sound_duration_ms: int = 50, |
|
|
threshold_strategy: str = 'noise_floor', |
|
|
noise_floor_percentile: float = 10.0, |
|
|
noise_floor_delta_db: float = 15.0 |
|
|
) -> List[Tuple[int, int]]: |
|
|
""" |
|
|
Detect sound regions in audio using adaptive threshold. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
threshold_db: dB threshold below peak (used if strategy='peak_relative') |
|
|
min_sound_duration_ms: Minimum duration of sound region to keep |
|
|
threshold_strategy: 'peak_relative' or 'noise_floor' |
|
|
noise_floor_percentile: Percentile for noise floor estimation |
|
|
noise_floor_delta_db: dB above noise floor to set threshold |
|
|
|
|
|
Returns: |
|
|
List of (start_ms, end_ms) tuples for sound regions |
|
|
""" |
|
|
return detect_sound_regions( |
|
|
audio, |
|
|
threshold_db=threshold_db, |
|
|
min_sound_duration_ms=min_sound_duration_ms, |
|
|
threshold_strategy=threshold_strategy, |
|
|
noise_floor_percentile=noise_floor_percentile, |
|
|
noise_floor_delta_db=noise_floor_delta_db |
|
|
) |
|
|
|
|
|
|
|
|
def extract_sound_with_edges_trimmed( |
|
|
audio: AudioSegment, |
|
|
regions: List[Tuple[int, int]], |
|
|
min_silence_to_trim_ms: int = 100, |
|
|
buffer_ratio: float = 0.1 |
|
|
) -> AudioSegment: |
|
|
""" |
|
|
Extract audio with ONLY leftmost and rightmost silence removed IF present. |
|
|
|
|
|
Trimming is ADAPTIVE: |
|
|
- Only trims if edge silence >= min_silence_to_trim_ms |
|
|
- Keeps a small percentage (buffer_ratio) of the silence to preserve transients |
|
|
- Buffer size adapts to actual silence duration (not fixed) |
|
|
|
|
|
Preserves all internal structure and silence between sounds. |
|
|
Perfect for periodic sounds (clock ticks, footsteps, typing). |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
regions: List of (start_ms, end_ms) tuples for sound regions |
|
|
min_silence_to_trim_ms: Only trim if edge silence is at least this long (default 100ms) |
|
|
buffer_ratio: Keep this fraction of the silence as buffer (default 0.1 = 10%) |
|
|
Example: 500ms silence -> keep 50ms buffer |
|
|
|
|
|
Returns: |
|
|
Audio segment with edges trimmed (or original if no significant silence) |
|
|
""" |
|
|
if not regions: |
|
|
|
|
|
return audio |
|
|
|
|
|
|
|
|
first_sound_start_ms = regions[0][0] |
|
|
last_sound_end_ms = regions[-1][1] |
|
|
audio_duration_ms = len(audio) |
|
|
|
|
|
|
|
|
leading_silence_ms = first_sound_start_ms |
|
|
trailing_silence_ms = audio_duration_ms - last_sound_end_ms |
|
|
|
|
|
|
|
|
|
|
|
if leading_silence_ms >= min_silence_to_trim_ms: |
|
|
buffer_ms = max(200, int(leading_silence_ms * buffer_ratio)) |
|
|
trim_start_ms = max(0, first_sound_start_ms - buffer_ms) |
|
|
else: |
|
|
|
|
|
trim_start_ms = 0 |
|
|
|
|
|
if trailing_silence_ms >= min_silence_to_trim_ms: |
|
|
buffer_ms = max(200, int(trailing_silence_ms * buffer_ratio)) |
|
|
trim_end_ms = min(audio_duration_ms, last_sound_end_ms + buffer_ms) |
|
|
else: |
|
|
|
|
|
trim_end_ms = audio_duration_ms |
|
|
|
|
|
|
|
|
trimmed_audio = audio[trim_start_ms:trim_end_ms] |
|
|
|
|
|
logger.debug( |
|
|
f"Edge trim: {audio_duration_ms}ms -> {len(trimmed_audio)}ms " |
|
|
f"(leading_silence={leading_silence_ms}ms, trailing_silence={trailing_silence_ms}ms, " |
|
|
f"trim_start={trim_start_ms}ms, trim_end={trim_end_ms}ms)" |
|
|
) |
|
|
|
|
|
return trimmed_audio |
|
|
|
|
|
|
|
|
def extract_all_sound_regions( |
|
|
audio: AudioSegment, |
|
|
regions: List[Tuple[int, int]], |
|
|
crossfade_ms: int = 10, |
|
|
padding_ms: int = 20 |
|
|
) -> AudioSegment: |
|
|
""" |
|
|
Extract ALL sound portions and join them, removing ALL silence. |
|
|
|
|
|
WARNING: This destroys natural periodicity! Use trim_edges_only() instead |
|
|
for most use cases. This function is kept for backward compatibility. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
regions: List of (start_ms, end_ms) tuples for sound regions |
|
|
crossfade_ms: Crossfade duration when joining regions |
|
|
padding_ms: Padding around each region to avoid cutting transients |
|
|
|
|
|
Returns: |
|
|
Audio segment containing only sound portions (internal silence removed) |
|
|
""" |
|
|
if not regions: |
|
|
return audio |
|
|
|
|
|
|
|
|
extracted_parts = [] |
|
|
for start_ms, end_ms in regions: |
|
|
|
|
|
padded_start = max(0, start_ms - padding_ms) |
|
|
padded_end = min(len(audio), end_ms + padding_ms) |
|
|
part = audio[padded_start:padded_end] |
|
|
extracted_parts.append(part) |
|
|
|
|
|
|
|
|
if len(extracted_parts) == 1: |
|
|
return extracted_parts[0] |
|
|
|
|
|
result = extracted_parts[0] |
|
|
for part in extracted_parts[1:]: |
|
|
if len(result) > crossfade_ms and len(part) > crossfade_ms: |
|
|
result = result.append(part, crossfade=crossfade_ms) |
|
|
else: |
|
|
result = result + part |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def process_esc50_dataset( |
|
|
audio_dir: str, |
|
|
metadata_path: str, |
|
|
output_dir: str, |
|
|
threshold_db: float = -40.0, |
|
|
min_sound_duration_ms: int = 50, |
|
|
save_trimmed_audio: bool = True, |
|
|
threshold_strategy: str = 'noise_floor', |
|
|
noise_floor_percentile: float = 10.0, |
|
|
noise_floor_delta_db: float = 15.0 |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Process entire ESC-50 dataset and compute effective durations. |
|
|
|
|
|
Uses ADAPTIVE EDGE-ONLY trimming to preserve natural periodicity of sounds. |
|
|
Only leading and trailing silence is removed IF significant (>=100ms). |
|
|
Trimming is adaptive: keeps a small percentage of silence as buffer for transients. |
|
|
All internal structure is preserved. |
|
|
|
|
|
Supports two threshold strategies for adaptive per-clip thresholding: |
|
|
- 'peak_relative': threshold = peak_db + threshold_db (fixed offset from peak) |
|
|
- 'noise_floor': threshold = percentile(db, p) + delta (adapts to noise floor) |
|
|
|
|
|
Args: |
|
|
audio_dir: Path to ESC-50 audio directory |
|
|
metadata_path: Path to ESC-50 metadata CSV |
|
|
output_dir: Output directory for processed files |
|
|
threshold_db: dB threshold for silence detection (peak_relative mode) |
|
|
min_sound_duration_ms: Minimum sound duration to keep |
|
|
save_trimmed_audio: Whether to save trimmed audio files |
|
|
threshold_strategy: 'peak_relative' or 'noise_floor' (recommended) |
|
|
noise_floor_percentile: Percentile for noise floor estimation (default 5) |
|
|
noise_floor_delta_db: dB above noise floor to set threshold (default 8) |
|
|
|
|
|
Returns: |
|
|
DataFrame with processed metadata |
|
|
""" |
|
|
|
|
|
original_metadata = pd.read_csv(metadata_path) |
|
|
logger.info(f"Loaded metadata for {len(original_metadata)} clips") |
|
|
|
|
|
|
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
if save_trimmed_audio: |
|
|
trimmed_audio_dir = output_path / "trimmed_audio" |
|
|
trimmed_audio_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
results = [] |
|
|
|
|
|
for _, row in tqdm(original_metadata.iterrows(), total=len(original_metadata), |
|
|
desc="Processing ESC-50 clips"): |
|
|
filename = row['filename'] |
|
|
category = row['category'] |
|
|
audio_path = Path(audio_dir) / filename |
|
|
|
|
|
try: |
|
|
|
|
|
audio = AudioSegment.from_file(str(audio_path), format="wav") |
|
|
raw_duration_s = len(audio) / 1000.0 |
|
|
|
|
|
|
|
|
regions = get_sound_regions( |
|
|
audio, |
|
|
threshold_db=threshold_db, |
|
|
min_sound_duration_ms=min_sound_duration_ms, |
|
|
threshold_strategy=threshold_strategy, |
|
|
noise_floor_percentile=noise_floor_percentile, |
|
|
noise_floor_delta_db=noise_floor_delta_db |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
trimmed_audio = extract_sound_with_edges_trimmed(audio, regions) |
|
|
final_duration_s = len(trimmed_audio) / 1000.0 |
|
|
|
|
|
|
|
|
samples = get_amplitude_array(trimmed_audio) |
|
|
peak_amplitude = np.max(np.abs(samples)) |
|
|
peak_amplitude_db = 20 * np.log10(peak_amplitude + 1e-10) |
|
|
rms = np.sqrt(np.mean(samples ** 2)) |
|
|
avg_rms_db = 20 * np.log10(rms + 1e-10) |
|
|
|
|
|
|
|
|
effective_duration_s = sum(end - start for start, end in regions) / 1000.0 if regions else final_duration_s |
|
|
|
|
|
|
|
|
trimmed_filename = None |
|
|
if save_trimmed_audio: |
|
|
trimmed_filename = filename |
|
|
trimmed_path = trimmed_audio_dir / trimmed_filename |
|
|
trimmed_audio.export(str(trimmed_path), format="wav") |
|
|
|
|
|
|
|
|
results.append({ |
|
|
'filename': filename, |
|
|
'category': category, |
|
|
'fold': row['fold'], |
|
|
'target': row['target'], |
|
|
'esc10': row['esc10'], |
|
|
'raw_duration_s': round(raw_duration_s, 4), |
|
|
'final_duration_s': round(final_duration_s, 4), |
|
|
'effective_duration_s': round(effective_duration_s, 4), |
|
|
'num_sound_regions': len(regions), |
|
|
'peak_amplitude_db': round(peak_amplitude_db, 2), |
|
|
'avg_rms_db': round(avg_rms_db, 2), |
|
|
'trimmed_filename': trimmed_filename if save_trimmed_audio else None, |
|
|
'threshold_strategy': threshold_strategy, |
|
|
'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None, |
|
|
'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None, |
|
|
'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None, |
|
|
'min_sound_duration_ms_used': min_sound_duration_ms |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing {filename}: {e}") |
|
|
results.append({ |
|
|
'filename': filename, |
|
|
'category': category, |
|
|
'fold': row['fold'], |
|
|
'target': row['target'], |
|
|
'esc10': row['esc10'], |
|
|
'raw_duration_s': None, |
|
|
'final_duration_s': None, |
|
|
'effective_duration_s': None, |
|
|
'num_sound_regions': 0, |
|
|
'peak_amplitude_db': None, |
|
|
'avg_rms_db': None, |
|
|
'trimmed_filename': None, |
|
|
'threshold_strategy': threshold_strategy, |
|
|
'threshold_db_used': threshold_db if threshold_strategy == 'peak_relative' else None, |
|
|
'noise_floor_percentile': noise_floor_percentile if threshold_strategy == 'noise_floor' else None, |
|
|
'noise_floor_delta_db': noise_floor_delta_db if threshold_strategy == 'noise_floor' else None, |
|
|
'min_sound_duration_ms_used': min_sound_duration_ms, |
|
|
'error': str(e) |
|
|
}) |
|
|
|
|
|
|
|
|
results_df = pd.DataFrame(results) |
|
|
|
|
|
|
|
|
csv_path = output_path / "effective_durations.csv" |
|
|
results_df.to_csv(csv_path, index=False) |
|
|
logger.info(f"Saved effective durations to {csv_path}") |
|
|
|
|
|
|
|
|
print_summary_statistics(results_df) |
|
|
|
|
|
return results_df |
|
|
|
|
|
|
|
|
def print_summary_statistics(df: pd.DataFrame): |
|
|
"""Print summary statistics of the processed dataset.""" |
|
|
print("\n" + "=" * 60) |
|
|
print("ESC-50 Preprocessing Summary") |
|
|
print("=" * 60) |
|
|
|
|
|
|
|
|
valid_df = df[df['effective_duration_s'].notna()] |
|
|
|
|
|
print(f"\nTotal clips processed: {len(df)}") |
|
|
print(f"Successfully processed: {len(valid_df)}") |
|
|
print(f"Errors: {len(df) - len(valid_df)}") |
|
|
|
|
|
print(f"\nRaw duration statistics:") |
|
|
print(f" Mean: {valid_df['raw_duration_s'].mean():.3f}s") |
|
|
print(f" Std: {valid_df['raw_duration_s'].std():.3f}s") |
|
|
print(f" Min: {valid_df['raw_duration_s'].min():.3f}s") |
|
|
print(f" Max: {valid_df['raw_duration_s'].max():.3f}s") |
|
|
|
|
|
print(f"\nFinal duration statistics (edges trimmed, internal structure preserved):") |
|
|
print(f" Mean: {valid_df['final_duration_s'].mean():.3f}s") |
|
|
print(f" Std: {valid_df['final_duration_s'].std():.3f}s") |
|
|
print(f" Min: {valid_df['final_duration_s'].min():.3f}s") |
|
|
print(f" Max: {valid_df['final_duration_s'].max():.3f}s") |
|
|
|
|
|
print(f"\nEffective duration statistics (sum of sound regions only):") |
|
|
print(f" Mean: {valid_df['effective_duration_s'].mean():.3f}s") |
|
|
print(f" Std: {valid_df['effective_duration_s'].std():.3f}s") |
|
|
print(f" Min: {valid_df['effective_duration_s'].min():.3f}s") |
|
|
print(f" Max: {valid_df['effective_duration_s'].max():.3f}s") |
|
|
|
|
|
|
|
|
print(f"\nComparison (final includes internal silences):") |
|
|
print(f" Avg effective: {valid_df['effective_duration_s'].mean():.3f}s") |
|
|
print(f" Avg final: {valid_df['final_duration_s'].mean():.3f}s") |
|
|
print(f" Difference: {valid_df['final_duration_s'].mean() - valid_df['effective_duration_s'].mean():.3f}s (internal silences)") |
|
|
|
|
|
|
|
|
reduction = (1 - valid_df['final_duration_s'].mean() / valid_df['raw_duration_s'].mean()) * 100 |
|
|
print(f"\nAverage edge trimming reduction: {reduction:.1f}%") |
|
|
|
|
|
|
|
|
print("\nEffective duration by category (top 10 longest):") |
|
|
category_stats = valid_df.groupby('category')['effective_duration_s'].agg(['mean', 'std', 'min', 'max']) |
|
|
category_stats = category_stats.sort_values('mean', ascending=False) |
|
|
print(category_stats.head(10).to_string()) |
|
|
|
|
|
print("\nEffective duration by category (top 10 shortest):") |
|
|
print(category_stats.tail(10).to_string()) |
|
|
|
|
|
print("\n" + "=" * 60) |
|
|
|
|
|
|
|
|
def load_config(config_path: str) -> dict: |
|
|
"""Load configuration from YAML file.""" |
|
|
import yaml |
|
|
with open(config_path, 'r') as f: |
|
|
return yaml.safe_load(f) |
|
|
|
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Preprocess ESC-50 dataset for duration task" |
|
|
) |
|
|
parser.add_argument( |
|
|
'--config', '-c', |
|
|
type=str, |
|
|
default='config.yaml', |
|
|
help='Path to configuration file' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--threshold-db', |
|
|
type=float, |
|
|
default=None, |
|
|
help='dB threshold below peak for silence detection (default: -40)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--min-sound-ms', |
|
|
type=int, |
|
|
default=None, |
|
|
help='Minimum sound duration in ms to keep (default: 50)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--output-dir', |
|
|
type=str, |
|
|
default=None, |
|
|
help='Output directory (default: from config or ESC-50_preprocessed)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--no-trimmed-audio', |
|
|
action='store_true', |
|
|
help='Do not save trimmed audio files (only save CSV)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--threshold-strategy', |
|
|
type=str, |
|
|
choices=['peak_relative', 'noise_floor'], |
|
|
default=None, |
|
|
help='Threshold strategy: peak_relative (old) or noise_floor (adaptive, recommended)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--noise-floor-percentile', |
|
|
type=float, |
|
|
default=None, |
|
|
help='Percentile for noise floor estimation (default: 10)' |
|
|
) |
|
|
parser.add_argument( |
|
|
'--noise-floor-delta-db', |
|
|
type=float, |
|
|
default=None, |
|
|
help='dB above noise floor to set threshold (default: 15)' |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
config = load_config(args.config) |
|
|
|
|
|
|
|
|
esc50_config = config.get('esc50', {}) |
|
|
audio_dir = esc50_config.get('audio_path', '/home/debarpanb1/ESC-50_github/audio') |
|
|
metadata_path = esc50_config.get('metadata_path', '/home/debarpanb1/ESC-50_github/meta/esc50.csv') |
|
|
|
|
|
|
|
|
duration_config = config.get('tasks', {}).get('duration', {}) |
|
|
|
|
|
|
|
|
threshold_db = args.threshold_db |
|
|
if threshold_db is None: |
|
|
threshold_db = duration_config.get('amplitude_threshold_db', -40.0) |
|
|
|
|
|
min_sound_ms = args.min_sound_ms |
|
|
if min_sound_ms is None: |
|
|
min_sound_ms = duration_config.get('min_sound_duration_ms', 50) |
|
|
|
|
|
|
|
|
output_dir = args.output_dir |
|
|
if output_dir is None: |
|
|
output_dir = duration_config.get( |
|
|
'preprocessed_data_path', |
|
|
'/home/debarpanb1/TREA_2.0/ESC-50_preprocessed' |
|
|
) |
|
|
|
|
|
|
|
|
threshold_strategy = args.threshold_strategy |
|
|
if threshold_strategy is None: |
|
|
threshold_strategy = duration_config.get('threshold_strategy', 'noise_floor') |
|
|
|
|
|
|
|
|
noise_floor_percentile = args.noise_floor_percentile |
|
|
if noise_floor_percentile is None: |
|
|
noise_floor_percentile = duration_config.get('noise_floor_percentile', 10.0) |
|
|
|
|
|
|
|
|
noise_floor_delta_db = args.noise_floor_delta_db |
|
|
if noise_floor_delta_db is None: |
|
|
noise_floor_delta_db = duration_config.get('noise_floor_delta_db', 15.0) |
|
|
|
|
|
|
|
|
logger.info("=" * 60) |
|
|
logger.info("ESC-50 Preprocessing Configuration") |
|
|
logger.info("=" * 60) |
|
|
logger.info(f"Audio directory: {audio_dir}") |
|
|
logger.info(f"Metadata path: {metadata_path}") |
|
|
logger.info(f"Output directory: {output_dir}") |
|
|
logger.info(f"Threshold strategy: {threshold_strategy}") |
|
|
if threshold_strategy == 'peak_relative': |
|
|
logger.info(f" Peak-relative threshold dB: {threshold_db}") |
|
|
else: |
|
|
logger.info(f" Noise floor percentile: {noise_floor_percentile}") |
|
|
logger.info(f" Noise floor delta dB: {noise_floor_delta_db}") |
|
|
logger.info(f"Min sound duration (ms): {min_sound_ms}") |
|
|
logger.info(f"Adaptive edge trimming: only if silence >= 100ms, keep 10% buffer") |
|
|
logger.info(f"Save trimmed audio: {not args.no_trimmed_audio}") |
|
|
logger.info("=" * 60) |
|
|
|
|
|
|
|
|
results_df = process_esc50_dataset( |
|
|
audio_dir=audio_dir, |
|
|
metadata_path=metadata_path, |
|
|
output_dir=output_dir, |
|
|
threshold_db=threshold_db, |
|
|
min_sound_duration_ms=min_sound_ms, |
|
|
save_trimmed_audio=not args.no_trimmed_audio, |
|
|
threshold_strategy=threshold_strategy, |
|
|
noise_floor_percentile=noise_floor_percentile, |
|
|
noise_floor_delta_db=noise_floor_delta_db |
|
|
) |
|
|
|
|
|
logger.info(f"\nPreprocessing complete!") |
|
|
logger.info(f"Results saved to: {output_dir}") |
|
|
|
|
|
return results_df |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|