TREA_2.0_codebase / utils /audio_utils.py
malay-36's picture
Upload folder using huggingface_hub
fec9168 verified
"""
Audio processing utilities for temporal reasoning dataset generation.
"""
import os
import random
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from pydub import AudioSegment
try:
import pyloudnorm as pyln
PYLOUDNORM_AVAILABLE = True
except ImportError:
PYLOUDNORM_AVAILABLE = False
from .logger import setup_logger
logger = setup_logger(__name__)
def get_lufs_loudness(audio: AudioSegment) -> float:
"""
Calculate integrated LUFS loudness (perceived loudness) of an audio segment.
LUFS (Loudness Units Full Scale) is the broadcast standard for measuring
perceived loudness. It accounts for human hearing sensitivity to different
frequencies using K-weighting.
Args:
audio: Input audio segment (pydub AudioSegment)
Returns:
Loudness in LUFS (negative values, typically -70 to 0)
Returns dBFS if pyloudnorm is not available (fallback)
"""
if not PYLOUDNORM_AVAILABLE:
logger.warning("pyloudnorm not available, falling back to dBFS")
return audio.dBFS
# Convert pydub AudioSegment to numpy array
samples = np.array(audio.get_array_of_samples())
# Handle stereo by reshaping
if audio.channels == 2:
samples = samples.reshape((-1, 2))
# Normalize to float [-1, 1]
if audio.sample_width == 1:
samples = samples.astype(np.float64) / 128.0 - 1.0
elif audio.sample_width == 2:
samples = samples.astype(np.float64) / 32768.0
elif audio.sample_width == 4:
samples = samples.astype(np.float64) / 2147483648.0
else:
samples = samples.astype(np.float64) / 32768.0 # default to 16-bit
# Create meter with sample rate
meter = pyln.Meter(audio.frame_rate)
# Measure integrated loudness
try:
loudness = meter.integrated_loudness(samples)
# Handle -inf for silent audio
if np.isinf(loudness):
loudness = -70.0 # Return very quiet value instead of -inf
return loudness
except Exception as e:
logger.warning(f"LUFS measurement failed: {e}, falling back to dBFS")
return audio.dBFS
def normalize_to_lufs(audio: AudioSegment, target_lufs: float = -23.0) -> AudioSegment:
"""
Normalize audio to a target LUFS level (perceived loudness normalization).
This is superior to dBFS normalization for comparing different sound types
because it accounts for human hearing sensitivity.
Args:
audio: Input audio segment
target_lufs: Target loudness level in LUFS (default: -23 LUFS, EBU R128 standard)
Returns:
Loudness-normalized audio segment
"""
if not PYLOUDNORM_AVAILABLE:
logger.warning("pyloudnorm not available, falling back to dBFS normalization")
change_db = target_lufs - audio.dBFS
return audio.apply_gain(change_db)
current_lufs = get_lufs_loudness(audio)
# Calculate required gain change
gain_db = target_lufs - current_lufs
# Apply gain
normalized = audio.apply_gain(gain_db)
logger.debug(f"Normalized LUFS: {current_lufs:.2f} -> {get_lufs_loudness(normalized):.2f} LUFS")
return normalized
class AudioProcessor:
"""Handles audio loading, processing, and concatenation."""
def __init__(
self,
crossfade_duration: int = 500,
silence_duration: int = 1000,
with_silence: bool = True,
normalize: bool = False,
normalize_target_dBFS: float = -20.0,
synthetic_silence_path: Optional[str] = None
):
"""
Initialize the audio processor.
Args:
crossfade_duration: Duration of crossfade in milliseconds
silence_duration: Duration of silence between clips in milliseconds
with_silence: Whether to add silence between clips
normalize: Whether to normalize audio levels
normalize_target_dBFS: Target dBFS level for normalization
synthetic_silence_path: Path to synthetic silence audio files
"""
self.crossfade_duration = crossfade_duration
self.silence_duration = silence_duration
self.with_silence = with_silence
self.normalize = normalize
self.normalize_target_dBFS = normalize_target_dBFS
self.synthetic_silence_path = synthetic_silence_path
self._silence_cache = {}
def load_audio(self, audio_path: str) -> AudioSegment:
"""
Load an audio file.
Args:
audio_path: Path to the audio file
Returns:
Loaded audio segment
"""
try:
audio = AudioSegment.from_file(audio_path, format="wav")
logger.debug(f"Loaded audio: {audio_path}, duration: {len(audio)}ms")
return audio
except Exception as e:
logger.error(f"Error loading audio {audio_path}: {e}")
raise
def normalize_audio(self, audio: AudioSegment, target_dBFS: Optional[float] = None) -> AudioSegment:
"""
Normalize audio to a target dBFS level.
Args:
audio: Input audio segment
target_dBFS: Target dBFS level (uses default if None)
Returns:
Normalized audio segment
"""
if target_dBFS is None:
target_dBFS = self.normalize_target_dBFS
change_in_dBFS = target_dBFS - audio.dBFS
normalized = audio.apply_gain(change_in_dBFS)
logger.debug(f"Normalized audio: {audio.dBFS:.2f} dBFS -> {normalized.dBFS:.2f} dBFS")
return normalized
def adjust_volume(self, audio: AudioSegment, volume_db: float) -> AudioSegment:
"""
Adjust audio volume by a specific dB amount.
Args:
audio: Input audio segment
volume_db: Volume adjustment in dB (positive = louder, negative = quieter)
Returns:
Volume-adjusted audio segment
"""
adjusted = audio.apply_gain(volume_db)
logger.debug(f"Adjusted volume by {volume_db} dB: {audio.dBFS:.2f} -> {adjusted.dBFS:.2f} dBFS")
return adjusted
def get_silence(self, duration: Optional[int] = None) -> AudioSegment:
"""
Get a silence audio segment, using synthetic silence if available.
Args:
duration: Duration in milliseconds (uses default if None)
Returns:
Silence audio segment
"""
if duration is None:
duration = self.silence_duration
# Check cache first
if duration in self._silence_cache:
return self._silence_cache[duration]
# Try to load synthetic silence
if self.synthetic_silence_path and os.path.exists(self.synthetic_silence_path):
silence_files = list(Path(self.synthetic_silence_path).glob("*.wav"))
if silence_files:
silence = self.load_audio(str(random.choice(silence_files)))
# Adjust duration if needed
if len(silence) < duration:
# Repeat the silence
repetitions = (duration // len(silence)) + 1
silence = silence * repetitions
silence = silence[:duration]
self._silence_cache[duration] = silence
logger.debug(f"Using synthetic silence: {duration}ms")
return silence
# Fall back to pure silence
silence = AudioSegment.silent(duration=duration)
self._silence_cache[duration] = silence
logger.debug(f"Using pure silence: {duration}ms")
return silence
def concatenate_audios(
self,
audio_list: List[AudioSegment],
normalize_each: bool = False,
volume_adjustments: Optional[List[float]] = None
) -> AudioSegment:
"""
Concatenate multiple audio segments with crossfade and optional silence.
Args:
audio_list: List of audio segments to concatenate
normalize_each: Whether to normalize each audio before concatenation
volume_adjustments: Optional list of volume adjustments (in dB) for each audio
Returns:
Concatenated audio segment
"""
if not audio_list:
raise ValueError("audio_list cannot be empty")
if len(audio_list) == 1:
audio = audio_list[0]
if normalize_each and self.normalize:
audio = self.normalize_audio(audio)
if volume_adjustments and len(volume_adjustments) > 0:
audio = self.adjust_volume(audio, volume_adjustments[0])
return audio
# Process first audio
merged = audio_list[0]
if normalize_each and self.normalize:
merged = self.normalize_audio(merged)
if volume_adjustments and len(volume_adjustments) > 0:
merged = self.adjust_volume(merged, volume_adjustments[0])
# Concatenate remaining audios
for i, audio in enumerate(audio_list[1:], start=1):
# Process current audio
current = audio
if normalize_each and self.normalize:
current = self.normalize_audio(current)
if volume_adjustments and len(volume_adjustments) > i:
current = self.adjust_volume(current, volume_adjustments[i])
# Add silence if configured
if self.with_silence:
silence = self.get_silence()
# Crossfade between audio and silence for smooth transition
merged = merged.append(silence, crossfade=self.crossfade_duration)
# Append current audio WITHOUT crossfade to avoid cutting it
# The crossfade with silence already provides smooth transition
merged = merged.append(current, crossfade=0)
logger.debug(f"Concatenated {len(audio_list)} audio segments, total duration: {len(merged)}ms")
return merged
def concatenate_audio_files(
self,
audio_paths: List[str],
output_path: str,
normalize_each: bool = False,
volume_adjustments: Optional[List[float]] = None,
target_durations: Optional[List[float]] = None
) -> Tuple[AudioSegment, dict]:
"""
Load, concatenate, and save multiple audio files.
Args:
audio_paths: List of paths to audio files
output_path: Path to save the concatenated audio
normalize_each: Whether to normalize each audio before concatenation
volume_adjustments: Optional list of volume adjustments (in dB) for each audio
target_durations: Optional list of target durations (in seconds) for each clip
Returns:
Tuple of (concatenated audio segment, metadata dict)
"""
# Load all audio files
audio_segments = []
for i, path in enumerate(audio_paths):
audio = self.load_audio(path)
# Adjust duration if specified
if target_durations and i < len(target_durations):
target_ms = int(target_durations[i] * 1000)
audio = trim_or_repeat_audio(audio, target_ms)
logger.debug(f"Adjusted clip {i} to {len(audio)}ms (target: {target_ms}ms)")
audio_segments.append(audio)
# Concatenate
merged = self.concatenate_audios(audio_segments, normalize_each, volume_adjustments)
# Save
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
merged.export(str(output_path), format="wav")
logger.info(f"Saved concatenated audio: {output_path}")
# Create metadata
metadata = {
"output_path": str(output_path),
"source_files": audio_paths,
"num_sources": len(audio_paths),
"total_duration_ms": len(merged),
"total_duration_s": len(merged) / 1000.0,
"individual_durations_ms": [len(a) for a in audio_segments],
"individual_durations_s": [len(a) / 1000.0 for a in audio_segments],
"target_durations_s": target_durations if target_durations else [],
"volume_adjustments_db": volume_adjustments if volume_adjustments else []
}
return merged, metadata
def generate_sample_durations_for_task(
task_duration_hours: float,
min_clip_duration: float,
max_clip_duration: float
) -> list:
"""
Generate sample durations that exactly fill the target task duration.
Algorithm:
1. Start with remaining = total_seconds
2. While remaining >= min_clip_duration:
- Sample d ~ Uniform(min, min(max, remaining))
- Append d to durations list
- Subtract d from remaining
3. Return shuffled list of durations
This ensures:
- Total of all durations ≈ task_duration (within min_clip_duration tolerance)
- Each duration is uniformly sampled within valid range
- No overshoot of target duration
Args:
task_duration_hours: Total duration for the task in hours
min_clip_duration: Minimum duration per clip in seconds
max_clip_duration: Maximum duration per clip in seconds
Returns:
List of sample durations in seconds (shuffled)
"""
task_duration_seconds = task_duration_hours * 3600
remaining = task_duration_seconds
durations = []
while remaining >= min_clip_duration:
# Cap max at remaining to avoid overshoot
effective_max = min(max_clip_duration, remaining)
# If remaining is less than min, we can't fit another sample
if effective_max < min_clip_duration:
break
# Sample uniformly within valid range
d = random.uniform(min_clip_duration, effective_max)
durations.append(d)
remaining -= d
# Shuffle to randomize order (durations were generated sequentially)
random.shuffle(durations)
total_duration = sum(durations)
logger.info(f"Task duration target: {task_duration_hours}h ({task_duration_seconds:.1f}s)")
logger.info(f"Generated {len(durations)} sample durations, total: {total_duration:.1f}s")
logger.info(f"Duration range: [{min(durations):.1f}s, {max(durations):.1f}s], "
f"mean: {total_duration/len(durations):.1f}s")
logger.info(f"Unused remainder: {remaining:.1f}s ({remaining/task_duration_seconds*100:.2f}%)")
return durations
def calculate_num_samples_for_task(
task_duration_hours: float,
min_clip_duration: float,
max_clip_duration: float
) -> int:
"""
Calculate number of samples needed to fill the task duration.
DEPRECATED: Use generate_sample_durations_for_task() instead for exact duration filling.
This function is kept for backward compatibility but uses average-based estimation.
Args:
task_duration_hours: Total duration for the task in hours
min_clip_duration: Minimum duration per clip in seconds
max_clip_duration: Maximum duration per clip in seconds
Returns:
Number of samples to generate (estimate)
"""
task_duration_seconds = task_duration_hours * 3600
avg_clip_duration = (min_clip_duration + max_clip_duration) / 2
num_samples = int(task_duration_seconds / avg_clip_duration)
logger.info(f"Task duration: {task_duration_hours}h ({task_duration_seconds}s)")
logger.info(f"Avg clip duration: {avg_clip_duration}s (min: {min_clip_duration}s, max: {max_clip_duration}s)")
logger.info(f"Calculated number of samples: {num_samples}")
return max(1, num_samples) # At least 1 sample
def generate_single_clip_duration(
min_duration: float,
max_duration: float
) -> float:
"""
Generate a random clip duration between min and max.
Args:
min_duration: Minimum duration in seconds
max_duration: Maximum duration in seconds
Returns:
Random duration in seconds
"""
return random.uniform(min_duration, max_duration)
def concatenate_to_target_duration(
base_audio: AudioSegment,
target_duration_seconds: float,
crossfade_ms: int = 0
) -> AudioSegment:
"""
Concatenate a base audio clip to reach target duration.
This takes a 5-second ESC-50 clip and repeats it to create a longer clip.
Args:
base_audio: Original 5s audio segment
target_duration_seconds: Target duration in seconds
crossfade_ms: Crossfade between repetitions in milliseconds
Returns:
Audio segment of target duration
"""
target_duration_ms = int(target_duration_seconds * 1000)
base_duration_ms = len(base_audio)
if target_duration_ms <= base_duration_ms:
# Just trim if target is shorter
return base_audio[:target_duration_ms]
# Calculate number of repetitions needed
num_repetitions = (target_duration_ms // base_duration_ms) + 1
# Concatenate with crossfade
result = base_audio
for i in range(1, num_repetitions):
if crossfade_ms > 0:
result = result.append(base_audio, crossfade=crossfade_ms)
else:
result = result + base_audio
# Stop if we've reached target
if len(result) >= target_duration_ms:
break
# Trim to exact duration
return result[:target_duration_ms]
def set_random_seed(seed: int):
"""Set random seed for reproducibility."""
random.seed(seed)
np.random.seed(seed)
logger.info(f"Random seed set to: {seed}")
def get_max_clip_num_to_be_joined(
target_duration_seconds: float,
source_clip_duration_seconds: float,
min_silence_ms: int = 100
) -> Tuple[int, float]:
"""
Calculate the maximum number of source clips needed to reach target duration.
Pipeline: pick dataset -> pick class -> pick audio clip -> get duration ->
concatenate clips to reach target duration -> modulo to get num clips ->
inserting silences randomly based on remainder.
Args:
target_duration_seconds: Target total duration in seconds
source_clip_duration_seconds: Duration of each source clip (e.g., 5s for ESC-50)
min_silence_ms: Minimum silence between clips in milliseconds
Returns:
Tuple of (num_clips_needed, remainder_seconds_for_silences)
- num_clips_needed: How many source clips to concatenate
- remainder_seconds_for_silences: Extra time to distribute as random silences
Example:
target=30s, source=5s -> (6, 0.0) - exactly 6 clips, no extra silence
target=32s, source=5s -> (6, 2.0) - 6 clips + 2s distributed as silences
"""
target_ms = target_duration_seconds * 1000
source_ms = source_clip_duration_seconds * 1000
# Account for minimum silence between each pair of clips
# If we have N clips, we have (N-1) gaps for silence
# Each gap needs at least min_silence_ms
# Start by computing raw number of clips (floor division)
num_clips = int(target_ms // source_ms)
num_clips = max(1, num_clips) # At least 1 clip
# Total audio content from clips
clips_duration_ms = num_clips * source_ms
# Minimum required silence for gaps
num_gaps = max(0, num_clips - 1)
min_total_silence_ms = num_gaps * min_silence_ms
# Check if we need to reduce clips to fit silences
while num_clips > 1 and (clips_duration_ms + min_total_silence_ms) > target_ms:
num_clips -= 1
clips_duration_ms = num_clips * source_ms
num_gaps = num_clips - 1
min_total_silence_ms = num_gaps * min_silence_ms
# Calculate remainder for extra silences
remainder_ms = target_ms - clips_duration_ms - min_total_silence_ms
remainder_seconds = max(0, remainder_ms / 1000.0)
logger.debug(
f"get_max_clip_num: target={target_duration_seconds}s, source={source_clip_duration_seconds}s "
f"-> {num_clips} clips, {remainder_seconds:.3f}s remainder for extra silences"
)
return num_clips, remainder_seconds
def build_clip_sequence_with_silences(
audio_segments: List[AudioSegment],
target_duration_seconds: float,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_ms: int = 0
) -> AudioSegment:
"""
Build a final audio clip by concatenating segments with guaranteed silences.
Ensures:
1. All clips are joined with at least min_silence_ms between them
2. Any remainder duration is distributed as random extra silences in gaps
3. Final duration matches target_duration_seconds exactly
Args:
audio_segments: List of audio segments to concatenate
target_duration_seconds: Target total duration in seconds
min_silence_ms: Minimum silence between each pair of clips (always inserted)
max_extra_silence_per_gap_ms: Maximum extra silence to add per gap
crossfade_ms: Crossfade duration in ms (applied when joining)
Returns:
Concatenated audio segment of exact target duration
"""
if not audio_segments:
raise ValueError("audio_segments cannot be empty")
target_ms = int(target_duration_seconds * 1000)
if len(audio_segments) == 1:
# Single clip: just trim/repeat to target
audio = audio_segments[0]
if len(audio) >= target_ms:
return audio[:target_ms]
else:
# Repeat to reach target
return concatenate_to_target_duration(audio, target_duration_seconds, crossfade_ms)
# Calculate total audio content duration
total_audio_ms = sum(len(seg) for seg in audio_segments)
num_gaps = len(audio_segments) - 1
# Minimum silence needed
min_total_silence_ms = num_gaps * min_silence_ms
# Available time for extra silences
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms
if available_extra_ms < 0:
# Not enough room - need to trim clips
logger.warning(
f"Clips too long for target duration. Total audio: {total_audio_ms}ms, "
f"target: {target_ms}ms. Will trim final result."
)
available_extra_ms = 0
# Distribute extra silence randomly across gaps
extra_silences_ms = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
# Build the final audio
result = audio_segments[0]
for i, audio in enumerate(audio_segments[1:]):
# Calculate total silence for this gap
gap_silence_ms = min_silence_ms + extra_silences_ms[i]
# Add silence
silence = AudioSegment.silent(duration=gap_silence_ms)
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms:
# Crossfade audio->silence for smooth transition, but NOT silence->audio
result = result.append(silence, crossfade=crossfade_ms)
result = result.append(audio, crossfade=0) # No crossfade to avoid cutting audio
else:
result = result + silence + audio
# Trim to exact target duration
if len(result) > target_ms:
result = result[:target_ms]
elif len(result) < target_ms:
# Pad with silence if slightly short
padding = AudioSegment.silent(duration=target_ms - len(result))
result = result + padding
logger.debug(
f"Built clip sequence: {len(audio_segments)} segments, "
f"final duration: {len(result)}ms (target: {target_ms}ms)"
)
return result
def distribute_remainder_as_silences(
remainder_ms: float,
num_gaps: int,
max_per_gap_ms: int = 500
) -> List[int]:
"""
Distribute remainder time as random silences across gaps.
Args:
remainder_ms: Total extra time to distribute (in ms)
num_gaps: Number of gaps between clips
max_per_gap_ms: Maximum extra silence per gap
Returns:
List of extra silence durations (in ms) for each gap
"""
if num_gaps <= 0:
return []
remainder_ms = int(max(0, remainder_ms))
if remainder_ms == 0:
return [0] * num_gaps
# Generate random weights for distribution
weights = [random.random() for _ in range(num_gaps)]
total_weight = sum(weights)
if total_weight == 0:
# Fallback to uniform distribution
weights = [1.0] * num_gaps
total_weight = num_gaps
# Distribute proportionally, respecting max_per_gap
extra_silences = []
remaining = remainder_ms
for i, w in enumerate(weights):
if i == num_gaps - 1:
# Last gap gets whatever is left
extra = min(remaining, max_per_gap_ms)
else:
proportion = w / total_weight
extra = int(remainder_ms * proportion)
extra = min(extra, max_per_gap_ms, remaining)
extra_silences.append(extra)
remaining -= extra
total_weight -= w
# If there's still remainder (due to max_per_gap limits), do another pass
while remaining > 0:
for i in range(num_gaps):
if extra_silences[i] < max_per_gap_ms and remaining > 0:
add = min(remaining, max_per_gap_ms - extra_silences[i])
extra_silences[i] += add
remaining -= add
if remaining > 0:
# Can't distribute more (all gaps at max)
break
logger.debug(f"Distributed {remainder_ms}ms across {num_gaps} gaps: {extra_silences}")
return extra_silences
def repeat_clips_to_fill_duration(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100
) -> Tuple[List[AudioSegment], List[str], int]:
"""
Repeat source clips to fill target duration, cycling through all sources.
This ensures all unique sources appear and are repeated proportionally.
Args:
source_audios: List of unique source audio segments
source_categories: List of category names corresponding to source_audios
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between clips
Returns:
Tuple of (expanded_audio_list, expanded_categories, num_clips)
"""
num_clips, remainder = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms
)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Build expanded lists by cycling through sources
expanded_audios = []
expanded_categories = []
for i in range(num_clips):
idx = i % num_sources
expanded_audios.append(source_audios[idx])
expanded_categories.append(source_categories[idx])
logger.debug(
f"Repeated {num_sources} sources to {num_clips} clips for "
f"{target_duration_seconds}s target duration"
)
return expanded_audios, expanded_categories, num_clips
def build_consecutive_sources_for_count_task(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_between_sources_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with consecutive same-class clips.
For count task, same-class clips must be consecutive (AAA BBB CCC) so they
are perceived as ONE sound source. Silences are only inserted BETWEEN
different classes, not within same-class repetitions.
Pipeline: pick classes -> for each class concatenate clips consecutively ->
insert silences only between different classes -> distribute remainder
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_between_sources_ms: Minimum silence between different sources
max_extra_silence_per_gap_ms: Max extra silence per gap for remainder distribution
crossfade_within_source_ms: Small crossfade within same-source repetitions
Returns:
Tuple of (final_audio, category_sequence, metadata_dict)
"""
target_ms = int(target_duration_seconds * 1000)
source_ms = int(source_clip_duration_seconds * 1000)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Calculate total clips needed
num_clips, remainder_seconds = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_between_sources_ms
)
# Safety check: if more sources than clips can fit, warn
if num_sources > num_clips:
logger.warning(
f"More sources ({num_sources}) than clips that fit ({num_clips}). "
f"Each source needs at least 1 clip, so output may exceed target duration. "
f"Consider capping n_unique_audios <= max_clips in task_count.py"
)
# Each source gets exactly 1 rep if there are more sources than clips
num_clips = num_sources # This will exceed target but ensures each source is included
# Distribute clips across sources as evenly as possible
# Each source gets at least 1 clip since num_sources <= num_clips
base_reps = num_clips // num_sources
extra_reps = num_clips % num_sources
repetitions_per_source = []
for i in range(num_sources):
reps = base_reps + (1 if i < extra_reps else 0)
repetitions_per_source.append(reps)
# Shuffle repetition assignment to add variety
random.shuffle(repetitions_per_source)
# Build each source's audio block (consecutive clips of same class)
source_blocks = []
category_sequence = []
for i, (audio, category, reps) in enumerate(zip(source_audios, source_categories, repetitions_per_source)):
if reps == 0:
continue
# Concatenate same-source clips with minimal/no gap (just small crossfade)
block = audio
for _ in range(reps - 1):
if crossfade_within_source_ms > 0:
block = block.append(audio, crossfade=crossfade_within_source_ms)
else:
block = block + audio
source_blocks.append(block)
category_sequence.append(category)
# Now we have N source blocks, need to join them with silences
# Number of gaps = num_source_blocks - 1
num_gaps = len(source_blocks) - 1
if num_gaps <= 0:
# Only one source block
final_audio = source_blocks[0]
else:
# Calculate total audio duration from blocks
total_blocks_ms = sum(len(block) for block in source_blocks)
min_total_silence_ms = num_gaps * min_silence_between_sources_ms
# Available for extra silences
available_extra_ms = target_ms - total_blocks_ms - min_total_silence_ms
available_extra_ms = max(0, available_extra_ms)
# Distribute extra silence across gaps
extra_silences = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
# Build final audio with silences between source blocks
final_audio = source_blocks[0]
for i, block in enumerate(source_blocks[1:]):
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i]
silence = AudioSegment.silent(duration=gap_silence_ms)
final_audio = final_audio + silence + block
# Trim or pad to exact target duration
if len(final_audio) > target_ms:
final_audio = final_audio[:target_ms]
elif len(final_audio) < target_ms:
padding = AudioSegment.silent(duration=target_ms - len(final_audio))
final_audio = final_audio + padding
# Create metadata
metadata = {
'num_unique_sources': num_sources,
'total_clips': num_clips,
'ordering_mode': 'consecutive',
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)),
'target_duration_ms': target_ms,
'actual_duration_ms': len(final_audio),
'num_gaps_between_sources': num_gaps
}
logger.debug(
f"Count task (consecutive): {num_sources} sources, {num_clips} total clips, "
f"reps={repetitions_per_source}, duration={len(final_audio)}ms"
)
return final_audio, category_sequence, metadata
def build_random_order_for_count_task(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with RANDOM ordering of clips.
Clips from different sources are shuffled randomly (A B A C B A C...).
This tests whether the model can recognize recurring sounds as the same source.
Silences are inserted between ALL clips (same or different source).
Pipeline:
1. Calculate total clips needed
2. Distribute clips across sources
3. Create expanded list with all clip instances
4. Shuffle randomly
5. Insert silences between ALL clips
6. Distribute remainder as extra random silences
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between ALL clips
max_extra_silence_per_gap_ms: Max extra silence per gap
Returns:
Tuple of (final_audio, clip_sequence, metadata_dict)
"""
target_ms = int(target_duration_seconds * 1000)
source_ms = int(source_clip_duration_seconds * 1000)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Calculate total clips needed
num_clips, remainder_seconds = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms
)
# Safety check: if more sources than clips can fit, warn and cap sources
if num_sources > num_clips:
logger.warning(
f"More sources ({num_sources}) than clips that fit ({num_clips}). "
f"Each source needs at least 1 clip, so output may exceed target duration. "
f"Consider capping n_unique_audios <= max_clips in task_count.py"
)
# Each source gets exactly 1 rep if there are more sources than clips
num_clips = num_sources # This will exceed target but ensures each source is included
# Distribute clips across sources as evenly as possible
base_reps = num_clips // num_sources # At least 1 since num_sources <= num_clips (after cap)
extra_reps = num_clips % num_sources
repetitions_per_source = []
for i in range(num_sources):
reps = base_reps + (1 if i < extra_reps else 0)
repetitions_per_source.append(reps)
# Build expanded list of (audio, category) pairs
expanded_clips = []
for audio, category, reps in zip(source_audios, source_categories, repetitions_per_source):
for _ in range(reps):
expanded_clips.append((audio, category))
# Shuffle the clips randomly
random.shuffle(expanded_clips)
# Extract shuffled audios and categories
shuffled_audios = [clip[0] for clip in expanded_clips]
clip_sequence = [clip[1] for clip in expanded_clips]
# Build final audio with silences between ALL clips
final_audio = build_clip_sequence_with_silences(
shuffled_audios,
target_duration_seconds,
min_silence_ms=min_silence_ms,
max_extra_silence_per_gap_ms=max_extra_silence_per_gap_ms,
crossfade_ms=0 # No crossfade for random ordering
)
# Create metadata
metadata = {
'num_unique_sources': num_sources,
'total_clips': len(expanded_clips),
'ordering_mode': 'random',
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)),
'clip_sequence': clip_sequence,
'target_duration_ms': target_ms,
'actual_duration_ms': len(final_audio),
'num_gaps': len(expanded_clips) - 1
}
logger.debug(
f"Count task (random): {num_sources} sources, {len(expanded_clips)} clips, "
f"sequence={clip_sequence[:5]}..., duration={len(final_audio)}ms"
)
return final_audio, clip_sequence, metadata
def build_count_task_audio(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
ordering_mode: str = "random",
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with configurable ordering mode.
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
ordering_mode: "random" or "consecutive"
- "random": Clips shuffled (A B A C B A C) - tests sound recognition
- "consecutive": Same-source grouped (AAA BBB CCC) - easier
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between clips
max_extra_silence_per_gap_ms: Max extra silence per gap
crossfade_within_source_ms: Crossfade for consecutive mode only
Returns:
Tuple of (final_audio, clip_sequence, metadata_dict)
"""
if ordering_mode == "consecutive":
return build_consecutive_sources_for_count_task(
source_audios,
source_categories,
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms,
max_extra_silence_per_gap_ms,
crossfade_within_source_ms
)
else: # random (default)
return build_random_order_for_count_task(
source_audios,
source_categories,
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms,
max_extra_silence_per_gap_ms
)
# =============================================================================
# DURATION TASK FUNCTIONS
# =============================================================================
def calculate_duration_slot_distribution(
target_total_duration_s: float,
effective_durations: Dict[str, float],
target_category: str,
question_type: str,
multiplier_longest: float = 1.5,
multiplier_shortest: float = 0.5,
min_silence_between_sources_ms: int = 100
) -> Tuple[Dict[str, int], bool, Dict]:
"""
Calculate how many repetitions each source gets for duration task.
For LONGEST: target gets max repetitions, backgrounds get 1 each
For SHORTEST: target gets 1, backgrounds share remaining duration
Args:
target_total_duration_s: Target total audio duration
effective_durations: Dict mapping category -> effective duration in seconds
target_category: The category that should be longest/shortest
question_type: "longest" or "shortest"
multiplier_longest: target >= max_background * this
multiplier_shortest: target <= min_background * this
min_silence_between_sources_ms: Minimum silence between different sources
Returns:
Tuple of (slot_distribution, gap_satisfied, metadata)
slot_distribution: Dict mapping category -> number of repetitions
gap_satisfied: Whether the duration gap constraint is met
metadata: Additional info about the calculation
"""
categories = list(effective_durations.keys())
n_sources = len(categories)
if n_sources < 2:
# Single source - always satisfies constraint
reps = max(1, int(target_total_duration_s / effective_durations[target_category]))
return {target_category: reps}, True, {'note': 'single_source'}
# Total silence between sources
total_silence_s = (n_sources - 1) * min_silence_between_sources_ms / 1000.0
available_for_audio_s = target_total_duration_s - total_silence_s
background_categories = [c for c in categories if c != target_category]
if question_type == "longest":
# Backgrounds get 1 rep each
background_duration_s = sum(effective_durations[c] for c in background_categories)
# Remaining for target
remaining_for_target_s = available_for_audio_s - background_duration_s
target_duration_per_rep = effective_durations[target_category]
# Calculate reps for target
target_reps = max(1, int(remaining_for_target_s / target_duration_per_rep))
actual_target_duration = target_reps * target_duration_per_rep
# Verify gap
max_background_duration = max(effective_durations[c] for c in background_categories)
required_target_duration = max_background_duration * multiplier_longest
gap_satisfied = actual_target_duration >= required_target_duration
slot_distribution = {c: 1 for c in background_categories}
slot_distribution[target_category] = target_reps
metadata = {
'available_for_audio_s': available_for_audio_s,
'background_duration_s': background_duration_s,
'remaining_for_target_s': remaining_for_target_s,
'target_reps': target_reps,
'actual_target_duration_s': actual_target_duration,
'max_background_duration_s': max_background_duration,
'required_target_duration_s': required_target_duration,
'multiplier_used': multiplier_longest
}
else: # shortest
# Target gets 1 rep
target_duration_s = effective_durations[target_category]
# Remaining for backgrounds
remaining_for_backgrounds_s = available_for_audio_s - target_duration_s
# Distribute remaining to backgrounds as evenly as possible
# while ensuring each background is longer than target * 1/multiplier
slot_distribution = {target_category: 1}
# Calculate minimum required duration for each background
min_background_required = target_duration_s / multiplier_shortest
background_reps = {}
for cat in background_categories:
eff_dur = effective_durations[cat]
# How many reps needed to exceed min_background_required?
min_reps = max(1, int(min_background_required / eff_dur) + 1)
background_reps[cat] = min_reps
# Check if we have room for all backgrounds
total_background_needed = sum(
background_reps[c] * effective_durations[c]
for c in background_categories
)
if total_background_needed <= remaining_for_backgrounds_s:
# Distribute extra reps
extra_available = remaining_for_backgrounds_s - total_background_needed
# Add extra reps to backgrounds proportionally
while extra_available > 0:
added_any = False
for cat in background_categories:
eff_dur = effective_durations[cat]
if extra_available >= eff_dur:
background_reps[cat] += 1
extra_available -= eff_dur
added_any = True
if not added_any:
break
slot_distribution.update(background_reps)
gap_satisfied = True
else:
# Not enough room - use minimum reps anyway
slot_distribution.update(background_reps)
gap_satisfied = False
# Calculate actual durations
actual_durations = {
cat: slot_distribution[cat] * effective_durations[cat]
for cat in categories
}
min_background_actual = min(
actual_durations[c] for c in background_categories
)
# Re-verify gap
gap_satisfied = actual_durations[target_category] <= min_background_actual * multiplier_shortest
metadata = {
'available_for_audio_s': available_for_audio_s,
'target_duration_s': target_duration_s,
'remaining_for_backgrounds_s': remaining_for_backgrounds_s,
'min_background_required_s': min_background_required,
'actual_durations_s': actual_durations,
'min_background_actual_s': min_background_actual,
'multiplier_used': multiplier_shortest
}
return slot_distribution, gap_satisfied, metadata
def build_duration_task_audio(
source_audio_lists: Dict[str, List[AudioSegment]],
slot_distribution: Dict[str, int],
effective_durations: Dict[str, float],
target_total_duration_s: float,
min_silence_between_sources_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], Dict]:
"""
Build audio for DURATION task with consecutive ordering per source.
Structure: [SourceA × n] + silence + [SourceB × m] + silence + ...
Order of sources is randomized to avoid patterns.
Args:
source_audio_lists: Dict mapping category -> list of audio segments
slot_distribution: Dict mapping category -> number of repetitions
effective_durations: Dict mapping category -> effective duration per clip
target_total_duration_s: Target total duration
min_silence_between_sources_ms: Min silence between different sources
max_extra_silence_per_gap_ms: Max extra silence per gap
crossfade_within_source_ms: Crossfade between same-source repetitions
Returns:
Tuple of (final_audio, category_sequence, metadata)
"""
categories = list(slot_distribution.keys())
# Randomize source order
random.shuffle(categories)
# Build audio blocks for each source
source_blocks = []
category_sequence = []
actual_durations = {}
block_durations_ms = [] # Track duration of each block for timestamp calculation
for category in categories:
reps = slot_distribution[category]
audio_list = source_audio_lists[category]
if reps == 0:
continue
# Build block for this source
block = audio_list[0]
for i in range(1, reps):
# Use same clip or cycle through available clips
next_clip = audio_list[i % len(audio_list)]
# Crossfade within same source
if crossfade_within_source_ms > 0:
if len(block) > crossfade_within_source_ms and len(next_clip) > crossfade_within_source_ms:
block = block.append(next_clip, crossfade=crossfade_within_source_ms)
else:
block = block + next_clip
else:
block = block + next_clip
source_blocks.append((category, block))
block_durations_ms.append(len(block))
category_sequence.extend([category] * reps)
actual_durations[category] = len(block) / 1000.0
# Calculate total audio duration and available extra silence
total_audio_ms = sum(len(block) for _, block in source_blocks)
num_gaps = len(source_blocks) - 1
min_total_silence_ms = num_gaps * min_silence_between_sources_ms
target_ms = int(target_total_duration_s * 1000)
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms
# Distribute extra silence
if available_extra_ms > 0 and num_gaps > 0:
extra_silences = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
else:
extra_silences = [0] * max(num_gaps, 1)
# Concatenate with silences and track timestamps
source_timestamps = [] # List of (category, start_ms, end_ms)
current_position_ms = 0
if len(source_blocks) == 1:
final_audio = source_blocks[0][1]
cat, block = source_blocks[0]
source_timestamps.append((cat, 0, len(block)))
else:
final_audio = source_blocks[0][1]
cat, block = source_blocks[0]
source_timestamps.append((cat, 0, len(block)))
current_position_ms = len(block)
for i, (cat, block) in enumerate(source_blocks[1:]):
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i]
silence = AudioSegment.silent(duration=gap_silence_ms)
# Prefer crossfading from audio -> silence for a smooth transition,
# but avoid crossfading silence -> audio (it cuts the start of the next clip).
# Conditions for safe crossfade:
# - crossfade length should be less than gap silence
# - both segments must be longer than crossfade
crossfade_ms = min(500, gap_silence_ms)
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms and len(final_audio) > crossfade_ms and len(block) > crossfade_ms:
final_audio = final_audio.append(silence, crossfade=crossfade_ms)
# Append next block without crossfade to avoid trimming its start
final_audio = final_audio.append(block, crossfade=0)
# Track timestamp after silence (start of block)
start_ms = current_position_ms + gap_silence_ms
end_ms = start_ms + len(block)
source_timestamps.append((cat, start_ms, end_ms))
current_position_ms = end_ms
else:
# Fall back to simple concatenation
final_audio = final_audio + silence + block
start_ms = current_position_ms + gap_silence_ms
end_ms = start_ms + len(block)
source_timestamps.append((cat, start_ms, end_ms))
current_position_ms = end_ms
# Adjust to target duration
if len(final_audio) > target_ms:
final_audio = final_audio[:target_ms]
elif len(final_audio) < target_ms:
padding = AudioSegment.silent(duration=target_ms - len(final_audio))
final_audio = final_audio + padding
# Build timestamp string: "category1 start-end, category2 start-end, ..."
timestamp_parts = []
for cat, start_ms, end_ms in source_timestamps:
start_s = round(start_ms / 1000.0, 2)
end_s = round(end_ms / 1000.0, 2)
duration_s = round((end_ms - start_ms) / 1000.0, 2)
timestamp_parts.append(f"{cat} {start_s}s-{end_s}s ({duration_s}s)")
timestamp_string = ", ".join(timestamp_parts)
metadata = {
'source_order': [cat for cat, _ in source_blocks],
'slot_distribution': slot_distribution,
'actual_durations_s': actual_durations,
'total_audio_ms': total_audio_ms,
'num_gaps': num_gaps,
'final_duration_ms': len(final_audio),
'source_timestamps': source_timestamps, # List of (category, start_ms, end_ms)
'timestamp_string': timestamp_string # Human-readable format
}
logger.debug(
f"Duration task audio: {len(source_blocks)} sources, "
f"order={metadata['source_order']}, duration={len(final_audio)}ms"
)
return final_audio, category_sequence, metadata