""" Audio processing utilities for temporal reasoning dataset generation. """ import os import random from pathlib import Path from typing import Dict, List, Optional, Tuple, Union import numpy as np from pydub import AudioSegment try: import pyloudnorm as pyln PYLOUDNORM_AVAILABLE = True except ImportError: PYLOUDNORM_AVAILABLE = False from .logger import setup_logger logger = setup_logger(__name__) def get_lufs_loudness(audio: AudioSegment) -> float: """ Calculate integrated LUFS loudness (perceived loudness) of an audio segment. LUFS (Loudness Units Full Scale) is the broadcast standard for measuring perceived loudness. It accounts for human hearing sensitivity to different frequencies using K-weighting. Args: audio: Input audio segment (pydub AudioSegment) Returns: Loudness in LUFS (negative values, typically -70 to 0) Returns dBFS if pyloudnorm is not available (fallback) """ if not PYLOUDNORM_AVAILABLE: logger.warning("pyloudnorm not available, falling back to dBFS") return audio.dBFS # Convert pydub AudioSegment to numpy array samples = np.array(audio.get_array_of_samples()) # Handle stereo by reshaping if audio.channels == 2: samples = samples.reshape((-1, 2)) # Normalize to float [-1, 1] if audio.sample_width == 1: samples = samples.astype(np.float64) / 128.0 - 1.0 elif audio.sample_width == 2: samples = samples.astype(np.float64) / 32768.0 elif audio.sample_width == 4: samples = samples.astype(np.float64) / 2147483648.0 else: samples = samples.astype(np.float64) / 32768.0 # default to 16-bit # Create meter with sample rate meter = pyln.Meter(audio.frame_rate) # Measure integrated loudness try: loudness = meter.integrated_loudness(samples) # Handle -inf for silent audio if np.isinf(loudness): loudness = -70.0 # Return very quiet value instead of -inf return loudness except Exception as e: logger.warning(f"LUFS measurement failed: {e}, falling back to dBFS") return audio.dBFS def normalize_to_lufs(audio: AudioSegment, target_lufs: float = -23.0) -> AudioSegment: """ Normalize audio to a target LUFS level (perceived loudness normalization). This is superior to dBFS normalization for comparing different sound types because it accounts for human hearing sensitivity. Args: audio: Input audio segment target_lufs: Target loudness level in LUFS (default: -23 LUFS, EBU R128 standard) Returns: Loudness-normalized audio segment """ if not PYLOUDNORM_AVAILABLE: logger.warning("pyloudnorm not available, falling back to dBFS normalization") change_db = target_lufs - audio.dBFS return audio.apply_gain(change_db) current_lufs = get_lufs_loudness(audio) # Calculate required gain change gain_db = target_lufs - current_lufs # Apply gain normalized = audio.apply_gain(gain_db) logger.debug(f"Normalized LUFS: {current_lufs:.2f} -> {get_lufs_loudness(normalized):.2f} LUFS") return normalized class AudioProcessor: """Handles audio loading, processing, and concatenation.""" def __init__( self, crossfade_duration: int = 500, silence_duration: int = 1000, with_silence: bool = True, normalize: bool = False, normalize_target_dBFS: float = -20.0, synthetic_silence_path: Optional[str] = None ): """ Initialize the audio processor. Args: crossfade_duration: Duration of crossfade in milliseconds silence_duration: Duration of silence between clips in milliseconds with_silence: Whether to add silence between clips normalize: Whether to normalize audio levels normalize_target_dBFS: Target dBFS level for normalization synthetic_silence_path: Path to synthetic silence audio files """ self.crossfade_duration = crossfade_duration self.silence_duration = silence_duration self.with_silence = with_silence self.normalize = normalize self.normalize_target_dBFS = normalize_target_dBFS self.synthetic_silence_path = synthetic_silence_path self._silence_cache = {} def load_audio(self, audio_path: str) -> AudioSegment: """ Load an audio file. Args: audio_path: Path to the audio file Returns: Loaded audio segment """ try: audio = AudioSegment.from_file(audio_path, format="wav") logger.debug(f"Loaded audio: {audio_path}, duration: {len(audio)}ms") return audio except Exception as e: logger.error(f"Error loading audio {audio_path}: {e}") raise def normalize_audio(self, audio: AudioSegment, target_dBFS: Optional[float] = None) -> AudioSegment: """ Normalize audio to a target dBFS level. Args: audio: Input audio segment target_dBFS: Target dBFS level (uses default if None) Returns: Normalized audio segment """ if target_dBFS is None: target_dBFS = self.normalize_target_dBFS change_in_dBFS = target_dBFS - audio.dBFS normalized = audio.apply_gain(change_in_dBFS) logger.debug(f"Normalized audio: {audio.dBFS:.2f} dBFS -> {normalized.dBFS:.2f} dBFS") return normalized def adjust_volume(self, audio: AudioSegment, volume_db: float) -> AudioSegment: """ Adjust audio volume by a specific dB amount. Args: audio: Input audio segment volume_db: Volume adjustment in dB (positive = louder, negative = quieter) Returns: Volume-adjusted audio segment """ adjusted = audio.apply_gain(volume_db) logger.debug(f"Adjusted volume by {volume_db} dB: {audio.dBFS:.2f} -> {adjusted.dBFS:.2f} dBFS") return adjusted def get_silence(self, duration: Optional[int] = None) -> AudioSegment: """ Get a silence audio segment, using synthetic silence if available. Args: duration: Duration in milliseconds (uses default if None) Returns: Silence audio segment """ if duration is None: duration = self.silence_duration # Check cache first if duration in self._silence_cache: return self._silence_cache[duration] # Try to load synthetic silence if self.synthetic_silence_path and os.path.exists(self.synthetic_silence_path): silence_files = list(Path(self.synthetic_silence_path).glob("*.wav")) if silence_files: silence = self.load_audio(str(random.choice(silence_files))) # Adjust duration if needed if len(silence) < duration: # Repeat the silence repetitions = (duration // len(silence)) + 1 silence = silence * repetitions silence = silence[:duration] self._silence_cache[duration] = silence logger.debug(f"Using synthetic silence: {duration}ms") return silence # Fall back to pure silence silence = AudioSegment.silent(duration=duration) self._silence_cache[duration] = silence logger.debug(f"Using pure silence: {duration}ms") return silence def concatenate_audios( self, audio_list: List[AudioSegment], normalize_each: bool = False, volume_adjustments: Optional[List[float]] = None ) -> AudioSegment: """ Concatenate multiple audio segments with crossfade and optional silence. Args: audio_list: List of audio segments to concatenate normalize_each: Whether to normalize each audio before concatenation volume_adjustments: Optional list of volume adjustments (in dB) for each audio Returns: Concatenated audio segment """ if not audio_list: raise ValueError("audio_list cannot be empty") if len(audio_list) == 1: audio = audio_list[0] if normalize_each and self.normalize: audio = self.normalize_audio(audio) if volume_adjustments and len(volume_adjustments) > 0: audio = self.adjust_volume(audio, volume_adjustments[0]) return audio # Process first audio merged = audio_list[0] if normalize_each and self.normalize: merged = self.normalize_audio(merged) if volume_adjustments and len(volume_adjustments) > 0: merged = self.adjust_volume(merged, volume_adjustments[0]) # Concatenate remaining audios for i, audio in enumerate(audio_list[1:], start=1): # Process current audio current = audio if normalize_each and self.normalize: current = self.normalize_audio(current) if volume_adjustments and len(volume_adjustments) > i: current = self.adjust_volume(current, volume_adjustments[i]) # Add silence if configured if self.with_silence: silence = self.get_silence() # Crossfade between audio and silence for smooth transition merged = merged.append(silence, crossfade=self.crossfade_duration) # Append current audio WITHOUT crossfade to avoid cutting it # The crossfade with silence already provides smooth transition merged = merged.append(current, crossfade=0) logger.debug(f"Concatenated {len(audio_list)} audio segments, total duration: {len(merged)}ms") return merged def concatenate_audio_files( self, audio_paths: List[str], output_path: str, normalize_each: bool = False, volume_adjustments: Optional[List[float]] = None, target_durations: Optional[List[float]] = None ) -> Tuple[AudioSegment, dict]: """ Load, concatenate, and save multiple audio files. Args: audio_paths: List of paths to audio files output_path: Path to save the concatenated audio normalize_each: Whether to normalize each audio before concatenation volume_adjustments: Optional list of volume adjustments (in dB) for each audio target_durations: Optional list of target durations (in seconds) for each clip Returns: Tuple of (concatenated audio segment, metadata dict) """ # Load all audio files audio_segments = [] for i, path in enumerate(audio_paths): audio = self.load_audio(path) # Adjust duration if specified if target_durations and i < len(target_durations): target_ms = int(target_durations[i] * 1000) audio = trim_or_repeat_audio(audio, target_ms) logger.debug(f"Adjusted clip {i} to {len(audio)}ms (target: {target_ms}ms)") audio_segments.append(audio) # Concatenate merged = self.concatenate_audios(audio_segments, normalize_each, volume_adjustments) # Save output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) merged.export(str(output_path), format="wav") logger.info(f"Saved concatenated audio: {output_path}") # Create metadata metadata = { "output_path": str(output_path), "source_files": audio_paths, "num_sources": len(audio_paths), "total_duration_ms": len(merged), "total_duration_s": len(merged) / 1000.0, "individual_durations_ms": [len(a) for a in audio_segments], "individual_durations_s": [len(a) / 1000.0 for a in audio_segments], "target_durations_s": target_durations if target_durations else [], "volume_adjustments_db": volume_adjustments if volume_adjustments else [] } return merged, metadata def generate_sample_durations_for_task( task_duration_hours: float, min_clip_duration: float, max_clip_duration: float ) -> list: """ Generate sample durations that exactly fill the target task duration. Algorithm: 1. Start with remaining = total_seconds 2. While remaining >= min_clip_duration: - Sample d ~ Uniform(min, min(max, remaining)) - Append d to durations list - Subtract d from remaining 3. Return shuffled list of durations This ensures: - Total of all durations ≈ task_duration (within min_clip_duration tolerance) - Each duration is uniformly sampled within valid range - No overshoot of target duration Args: task_duration_hours: Total duration for the task in hours min_clip_duration: Minimum duration per clip in seconds max_clip_duration: Maximum duration per clip in seconds Returns: List of sample durations in seconds (shuffled) """ task_duration_seconds = task_duration_hours * 3600 remaining = task_duration_seconds durations = [] while remaining >= min_clip_duration: # Cap max at remaining to avoid overshoot effective_max = min(max_clip_duration, remaining) # If remaining is less than min, we can't fit another sample if effective_max < min_clip_duration: break # Sample uniformly within valid range d = random.uniform(min_clip_duration, effective_max) durations.append(d) remaining -= d # Shuffle to randomize order (durations were generated sequentially) random.shuffle(durations) total_duration = sum(durations) logger.info(f"Task duration target: {task_duration_hours}h ({task_duration_seconds:.1f}s)") logger.info(f"Generated {len(durations)} sample durations, total: {total_duration:.1f}s") logger.info(f"Duration range: [{min(durations):.1f}s, {max(durations):.1f}s], " f"mean: {total_duration/len(durations):.1f}s") logger.info(f"Unused remainder: {remaining:.1f}s ({remaining/task_duration_seconds*100:.2f}%)") return durations def calculate_num_samples_for_task( task_duration_hours: float, min_clip_duration: float, max_clip_duration: float ) -> int: """ Calculate number of samples needed to fill the task duration. DEPRECATED: Use generate_sample_durations_for_task() instead for exact duration filling. This function is kept for backward compatibility but uses average-based estimation. Args: task_duration_hours: Total duration for the task in hours min_clip_duration: Minimum duration per clip in seconds max_clip_duration: Maximum duration per clip in seconds Returns: Number of samples to generate (estimate) """ task_duration_seconds = task_duration_hours * 3600 avg_clip_duration = (min_clip_duration + max_clip_duration) / 2 num_samples = int(task_duration_seconds / avg_clip_duration) logger.info(f"Task duration: {task_duration_hours}h ({task_duration_seconds}s)") logger.info(f"Avg clip duration: {avg_clip_duration}s (min: {min_clip_duration}s, max: {max_clip_duration}s)") logger.info(f"Calculated number of samples: {num_samples}") return max(1, num_samples) # At least 1 sample def generate_single_clip_duration( min_duration: float, max_duration: float ) -> float: """ Generate a random clip duration between min and max. Args: min_duration: Minimum duration in seconds max_duration: Maximum duration in seconds Returns: Random duration in seconds """ return random.uniform(min_duration, max_duration) def concatenate_to_target_duration( base_audio: AudioSegment, target_duration_seconds: float, crossfade_ms: int = 0 ) -> AudioSegment: """ Concatenate a base audio clip to reach target duration. This takes a 5-second ESC-50 clip and repeats it to create a longer clip. Args: base_audio: Original 5s audio segment target_duration_seconds: Target duration in seconds crossfade_ms: Crossfade between repetitions in milliseconds Returns: Audio segment of target duration """ target_duration_ms = int(target_duration_seconds * 1000) base_duration_ms = len(base_audio) if target_duration_ms <= base_duration_ms: # Just trim if target is shorter return base_audio[:target_duration_ms] # Calculate number of repetitions needed num_repetitions = (target_duration_ms // base_duration_ms) + 1 # Concatenate with crossfade result = base_audio for i in range(1, num_repetitions): if crossfade_ms > 0: result = result.append(base_audio, crossfade=crossfade_ms) else: result = result + base_audio # Stop if we've reached target if len(result) >= target_duration_ms: break # Trim to exact duration return result[:target_duration_ms] def set_random_seed(seed: int): """Set random seed for reproducibility.""" random.seed(seed) np.random.seed(seed) logger.info(f"Random seed set to: {seed}") def get_max_clip_num_to_be_joined( target_duration_seconds: float, source_clip_duration_seconds: float, min_silence_ms: int = 100 ) -> Tuple[int, float]: """ Calculate the maximum number of source clips needed to reach target duration. Pipeline: pick dataset -> pick class -> pick audio clip -> get duration -> concatenate clips to reach target duration -> modulo to get num clips -> inserting silences randomly based on remainder. Args: target_duration_seconds: Target total duration in seconds source_clip_duration_seconds: Duration of each source clip (e.g., 5s for ESC-50) min_silence_ms: Minimum silence between clips in milliseconds Returns: Tuple of (num_clips_needed, remainder_seconds_for_silences) - num_clips_needed: How many source clips to concatenate - remainder_seconds_for_silences: Extra time to distribute as random silences Example: target=30s, source=5s -> (6, 0.0) - exactly 6 clips, no extra silence target=32s, source=5s -> (6, 2.0) - 6 clips + 2s distributed as silences """ target_ms = target_duration_seconds * 1000 source_ms = source_clip_duration_seconds * 1000 # Account for minimum silence between each pair of clips # If we have N clips, we have (N-1) gaps for silence # Each gap needs at least min_silence_ms # Start by computing raw number of clips (floor division) num_clips = int(target_ms // source_ms) num_clips = max(1, num_clips) # At least 1 clip # Total audio content from clips clips_duration_ms = num_clips * source_ms # Minimum required silence for gaps num_gaps = max(0, num_clips - 1) min_total_silence_ms = num_gaps * min_silence_ms # Check if we need to reduce clips to fit silences while num_clips > 1 and (clips_duration_ms + min_total_silence_ms) > target_ms: num_clips -= 1 clips_duration_ms = num_clips * source_ms num_gaps = num_clips - 1 min_total_silence_ms = num_gaps * min_silence_ms # Calculate remainder for extra silences remainder_ms = target_ms - clips_duration_ms - min_total_silence_ms remainder_seconds = max(0, remainder_ms / 1000.0) logger.debug( f"get_max_clip_num: target={target_duration_seconds}s, source={source_clip_duration_seconds}s " f"-> {num_clips} clips, {remainder_seconds:.3f}s remainder for extra silences" ) return num_clips, remainder_seconds def build_clip_sequence_with_silences( audio_segments: List[AudioSegment], target_duration_seconds: float, min_silence_ms: int = 100, max_extra_silence_per_gap_ms: int = 500, crossfade_ms: int = 0 ) -> AudioSegment: """ Build a final audio clip by concatenating segments with guaranteed silences. Ensures: 1. All clips are joined with at least min_silence_ms between them 2. Any remainder duration is distributed as random extra silences in gaps 3. Final duration matches target_duration_seconds exactly Args: audio_segments: List of audio segments to concatenate target_duration_seconds: Target total duration in seconds min_silence_ms: Minimum silence between each pair of clips (always inserted) max_extra_silence_per_gap_ms: Maximum extra silence to add per gap crossfade_ms: Crossfade duration in ms (applied when joining) Returns: Concatenated audio segment of exact target duration """ if not audio_segments: raise ValueError("audio_segments cannot be empty") target_ms = int(target_duration_seconds * 1000) if len(audio_segments) == 1: # Single clip: just trim/repeat to target audio = audio_segments[0] if len(audio) >= target_ms: return audio[:target_ms] else: # Repeat to reach target return concatenate_to_target_duration(audio, target_duration_seconds, crossfade_ms) # Calculate total audio content duration total_audio_ms = sum(len(seg) for seg in audio_segments) num_gaps = len(audio_segments) - 1 # Minimum silence needed min_total_silence_ms = num_gaps * min_silence_ms # Available time for extra silences available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms if available_extra_ms < 0: # Not enough room - need to trim clips logger.warning( f"Clips too long for target duration. Total audio: {total_audio_ms}ms, " f"target: {target_ms}ms. Will trim final result." ) available_extra_ms = 0 # Distribute extra silence randomly across gaps extra_silences_ms = distribute_remainder_as_silences( available_extra_ms, num_gaps, max_extra_silence_per_gap_ms ) # Build the final audio result = audio_segments[0] for i, audio in enumerate(audio_segments[1:]): # Calculate total silence for this gap gap_silence_ms = min_silence_ms + extra_silences_ms[i] # Add silence silence = AudioSegment.silent(duration=gap_silence_ms) if crossfade_ms > 0 and crossfade_ms < gap_silence_ms: # Crossfade audio->silence for smooth transition, but NOT silence->audio result = result.append(silence, crossfade=crossfade_ms) result = result.append(audio, crossfade=0) # No crossfade to avoid cutting audio else: result = result + silence + audio # Trim to exact target duration if len(result) > target_ms: result = result[:target_ms] elif len(result) < target_ms: # Pad with silence if slightly short padding = AudioSegment.silent(duration=target_ms - len(result)) result = result + padding logger.debug( f"Built clip sequence: {len(audio_segments)} segments, " f"final duration: {len(result)}ms (target: {target_ms}ms)" ) return result def distribute_remainder_as_silences( remainder_ms: float, num_gaps: int, max_per_gap_ms: int = 500 ) -> List[int]: """ Distribute remainder time as random silences across gaps. Args: remainder_ms: Total extra time to distribute (in ms) num_gaps: Number of gaps between clips max_per_gap_ms: Maximum extra silence per gap Returns: List of extra silence durations (in ms) for each gap """ if num_gaps <= 0: return [] remainder_ms = int(max(0, remainder_ms)) if remainder_ms == 0: return [0] * num_gaps # Generate random weights for distribution weights = [random.random() for _ in range(num_gaps)] total_weight = sum(weights) if total_weight == 0: # Fallback to uniform distribution weights = [1.0] * num_gaps total_weight = num_gaps # Distribute proportionally, respecting max_per_gap extra_silences = [] remaining = remainder_ms for i, w in enumerate(weights): if i == num_gaps - 1: # Last gap gets whatever is left extra = min(remaining, max_per_gap_ms) else: proportion = w / total_weight extra = int(remainder_ms * proportion) extra = min(extra, max_per_gap_ms, remaining) extra_silences.append(extra) remaining -= extra total_weight -= w # If there's still remainder (due to max_per_gap limits), do another pass while remaining > 0: for i in range(num_gaps): if extra_silences[i] < max_per_gap_ms and remaining > 0: add = min(remaining, max_per_gap_ms - extra_silences[i]) extra_silences[i] += add remaining -= add if remaining > 0: # Can't distribute more (all gaps at max) break logger.debug(f"Distributed {remainder_ms}ms across {num_gaps} gaps: {extra_silences}") return extra_silences def repeat_clips_to_fill_duration( source_audios: List[AudioSegment], source_categories: List[str], target_duration_seconds: float, source_clip_duration_seconds: float = 5.0, min_silence_ms: int = 100 ) -> Tuple[List[AudioSegment], List[str], int]: """ Repeat source clips to fill target duration, cycling through all sources. This ensures all unique sources appear and are repeated proportionally. Args: source_audios: List of unique source audio segments source_categories: List of category names corresponding to source_audios target_duration_seconds: Target total duration source_clip_duration_seconds: Duration of each source clip min_silence_ms: Minimum silence between clips Returns: Tuple of (expanded_audio_list, expanded_categories, num_clips) """ num_clips, remainder = get_max_clip_num_to_be_joined( target_duration_seconds, source_clip_duration_seconds, min_silence_ms ) num_sources = len(source_audios) if num_sources == 0: raise ValueError("source_audios cannot be empty") # Build expanded lists by cycling through sources expanded_audios = [] expanded_categories = [] for i in range(num_clips): idx = i % num_sources expanded_audios.append(source_audios[idx]) expanded_categories.append(source_categories[idx]) logger.debug( f"Repeated {num_sources} sources to {num_clips} clips for " f"{target_duration_seconds}s target duration" ) return expanded_audios, expanded_categories, num_clips def build_consecutive_sources_for_count_task( source_audios: List[AudioSegment], source_categories: List[str], target_duration_seconds: float, source_clip_duration_seconds: float = 5.0, min_silence_between_sources_ms: int = 100, max_extra_silence_per_gap_ms: int = 500, crossfade_within_source_ms: int = 50 ) -> Tuple[AudioSegment, List[str], dict]: """ Build audio for COUNT task with consecutive same-class clips. For count task, same-class clips must be consecutive (AAA BBB CCC) so they are perceived as ONE sound source. Silences are only inserted BETWEEN different classes, not within same-class repetitions. Pipeline: pick classes -> for each class concatenate clips consecutively -> insert silences only between different classes -> distribute remainder Args: source_audios: List of unique source audio segments (one per class) source_categories: List of category names target_duration_seconds: Target total duration source_clip_duration_seconds: Duration of each source clip min_silence_between_sources_ms: Minimum silence between different sources max_extra_silence_per_gap_ms: Max extra silence per gap for remainder distribution crossfade_within_source_ms: Small crossfade within same-source repetitions Returns: Tuple of (final_audio, category_sequence, metadata_dict) """ target_ms = int(target_duration_seconds * 1000) source_ms = int(source_clip_duration_seconds * 1000) num_sources = len(source_audios) if num_sources == 0: raise ValueError("source_audios cannot be empty") # Calculate total clips needed num_clips, remainder_seconds = get_max_clip_num_to_be_joined( target_duration_seconds, source_clip_duration_seconds, min_silence_between_sources_ms ) # Safety check: if more sources than clips can fit, warn if num_sources > num_clips: logger.warning( f"More sources ({num_sources}) than clips that fit ({num_clips}). " f"Each source needs at least 1 clip, so output may exceed target duration. " f"Consider capping n_unique_audios <= max_clips in task_count.py" ) # Each source gets exactly 1 rep if there are more sources than clips num_clips = num_sources # This will exceed target but ensures each source is included # Distribute clips across sources as evenly as possible # Each source gets at least 1 clip since num_sources <= num_clips base_reps = num_clips // num_sources extra_reps = num_clips % num_sources repetitions_per_source = [] for i in range(num_sources): reps = base_reps + (1 if i < extra_reps else 0) repetitions_per_source.append(reps) # Shuffle repetition assignment to add variety random.shuffle(repetitions_per_source) # Build each source's audio block (consecutive clips of same class) source_blocks = [] category_sequence = [] for i, (audio, category, reps) in enumerate(zip(source_audios, source_categories, repetitions_per_source)): if reps == 0: continue # Concatenate same-source clips with minimal/no gap (just small crossfade) block = audio for _ in range(reps - 1): if crossfade_within_source_ms > 0: block = block.append(audio, crossfade=crossfade_within_source_ms) else: block = block + audio source_blocks.append(block) category_sequence.append(category) # Now we have N source blocks, need to join them with silences # Number of gaps = num_source_blocks - 1 num_gaps = len(source_blocks) - 1 if num_gaps <= 0: # Only one source block final_audio = source_blocks[0] else: # Calculate total audio duration from blocks total_blocks_ms = sum(len(block) for block in source_blocks) min_total_silence_ms = num_gaps * min_silence_between_sources_ms # Available for extra silences available_extra_ms = target_ms - total_blocks_ms - min_total_silence_ms available_extra_ms = max(0, available_extra_ms) # Distribute extra silence across gaps extra_silences = distribute_remainder_as_silences( available_extra_ms, num_gaps, max_extra_silence_per_gap_ms ) # Build final audio with silences between source blocks final_audio = source_blocks[0] for i, block in enumerate(source_blocks[1:]): gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] silence = AudioSegment.silent(duration=gap_silence_ms) final_audio = final_audio + silence + block # Trim or pad to exact target duration if len(final_audio) > target_ms: final_audio = final_audio[:target_ms] elif len(final_audio) < target_ms: padding = AudioSegment.silent(duration=target_ms - len(final_audio)) final_audio = final_audio + padding # Create metadata metadata = { 'num_unique_sources': num_sources, 'total_clips': num_clips, 'ordering_mode': 'consecutive', 'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), 'target_duration_ms': target_ms, 'actual_duration_ms': len(final_audio), 'num_gaps_between_sources': num_gaps } logger.debug( f"Count task (consecutive): {num_sources} sources, {num_clips} total clips, " f"reps={repetitions_per_source}, duration={len(final_audio)}ms" ) return final_audio, category_sequence, metadata def build_random_order_for_count_task( source_audios: List[AudioSegment], source_categories: List[str], target_duration_seconds: float, source_clip_duration_seconds: float = 5.0, min_silence_ms: int = 100, max_extra_silence_per_gap_ms: int = 500 ) -> Tuple[AudioSegment, List[str], dict]: """ Build audio for COUNT task with RANDOM ordering of clips. Clips from different sources are shuffled randomly (A B A C B A C...). This tests whether the model can recognize recurring sounds as the same source. Silences are inserted between ALL clips (same or different source). Pipeline: 1. Calculate total clips needed 2. Distribute clips across sources 3. Create expanded list with all clip instances 4. Shuffle randomly 5. Insert silences between ALL clips 6. Distribute remainder as extra random silences Args: source_audios: List of unique source audio segments (one per class) source_categories: List of category names target_duration_seconds: Target total duration source_clip_duration_seconds: Duration of each source clip min_silence_ms: Minimum silence between ALL clips max_extra_silence_per_gap_ms: Max extra silence per gap Returns: Tuple of (final_audio, clip_sequence, metadata_dict) """ target_ms = int(target_duration_seconds * 1000) source_ms = int(source_clip_duration_seconds * 1000) num_sources = len(source_audios) if num_sources == 0: raise ValueError("source_audios cannot be empty") # Calculate total clips needed num_clips, remainder_seconds = get_max_clip_num_to_be_joined( target_duration_seconds, source_clip_duration_seconds, min_silence_ms ) # Safety check: if more sources than clips can fit, warn and cap sources if num_sources > num_clips: logger.warning( f"More sources ({num_sources}) than clips that fit ({num_clips}). " f"Each source needs at least 1 clip, so output may exceed target duration. " f"Consider capping n_unique_audios <= max_clips in task_count.py" ) # Each source gets exactly 1 rep if there are more sources than clips num_clips = num_sources # This will exceed target but ensures each source is included # Distribute clips across sources as evenly as possible base_reps = num_clips // num_sources # At least 1 since num_sources <= num_clips (after cap) extra_reps = num_clips % num_sources repetitions_per_source = [] for i in range(num_sources): reps = base_reps + (1 if i < extra_reps else 0) repetitions_per_source.append(reps) # Build expanded list of (audio, category) pairs expanded_clips = [] for audio, category, reps in zip(source_audios, source_categories, repetitions_per_source): for _ in range(reps): expanded_clips.append((audio, category)) # Shuffle the clips randomly random.shuffle(expanded_clips) # Extract shuffled audios and categories shuffled_audios = [clip[0] for clip in expanded_clips] clip_sequence = [clip[1] for clip in expanded_clips] # Build final audio with silences between ALL clips final_audio = build_clip_sequence_with_silences( shuffled_audios, target_duration_seconds, min_silence_ms=min_silence_ms, max_extra_silence_per_gap_ms=max_extra_silence_per_gap_ms, crossfade_ms=0 # No crossfade for random ordering ) # Create metadata metadata = { 'num_unique_sources': num_sources, 'total_clips': len(expanded_clips), 'ordering_mode': 'random', 'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), 'clip_sequence': clip_sequence, 'target_duration_ms': target_ms, 'actual_duration_ms': len(final_audio), 'num_gaps': len(expanded_clips) - 1 } logger.debug( f"Count task (random): {num_sources} sources, {len(expanded_clips)} clips, " f"sequence={clip_sequence[:5]}..., duration={len(final_audio)}ms" ) return final_audio, clip_sequence, metadata def build_count_task_audio( source_audios: List[AudioSegment], source_categories: List[str], target_duration_seconds: float, ordering_mode: str = "random", source_clip_duration_seconds: float = 5.0, min_silence_ms: int = 100, max_extra_silence_per_gap_ms: int = 500, crossfade_within_source_ms: int = 50 ) -> Tuple[AudioSegment, List[str], dict]: """ Build audio for COUNT task with configurable ordering mode. Args: source_audios: List of unique source audio segments (one per class) source_categories: List of category names target_duration_seconds: Target total duration ordering_mode: "random" or "consecutive" - "random": Clips shuffled (A B A C B A C) - tests sound recognition - "consecutive": Same-source grouped (AAA BBB CCC) - easier source_clip_duration_seconds: Duration of each source clip min_silence_ms: Minimum silence between clips max_extra_silence_per_gap_ms: Max extra silence per gap crossfade_within_source_ms: Crossfade for consecutive mode only Returns: Tuple of (final_audio, clip_sequence, metadata_dict) """ if ordering_mode == "consecutive": return build_consecutive_sources_for_count_task( source_audios, source_categories, target_duration_seconds, source_clip_duration_seconds, min_silence_ms, max_extra_silence_per_gap_ms, crossfade_within_source_ms ) else: # random (default) return build_random_order_for_count_task( source_audios, source_categories, target_duration_seconds, source_clip_duration_seconds, min_silence_ms, max_extra_silence_per_gap_ms ) # ============================================================================= # DURATION TASK FUNCTIONS # ============================================================================= def calculate_duration_slot_distribution( target_total_duration_s: float, effective_durations: Dict[str, float], target_category: str, question_type: str, multiplier_longest: float = 1.5, multiplier_shortest: float = 0.5, min_silence_between_sources_ms: int = 100 ) -> Tuple[Dict[str, int], bool, Dict]: """ Calculate how many repetitions each source gets for duration task. For LONGEST: target gets max repetitions, backgrounds get 1 each For SHORTEST: target gets 1, backgrounds share remaining duration Args: target_total_duration_s: Target total audio duration effective_durations: Dict mapping category -> effective duration in seconds target_category: The category that should be longest/shortest question_type: "longest" or "shortest" multiplier_longest: target >= max_background * this multiplier_shortest: target <= min_background * this min_silence_between_sources_ms: Minimum silence between different sources Returns: Tuple of (slot_distribution, gap_satisfied, metadata) slot_distribution: Dict mapping category -> number of repetitions gap_satisfied: Whether the duration gap constraint is met metadata: Additional info about the calculation """ categories = list(effective_durations.keys()) n_sources = len(categories) if n_sources < 2: # Single source - always satisfies constraint reps = max(1, int(target_total_duration_s / effective_durations[target_category])) return {target_category: reps}, True, {'note': 'single_source'} # Total silence between sources total_silence_s = (n_sources - 1) * min_silence_between_sources_ms / 1000.0 available_for_audio_s = target_total_duration_s - total_silence_s background_categories = [c for c in categories if c != target_category] if question_type == "longest": # Backgrounds get 1 rep each background_duration_s = sum(effective_durations[c] for c in background_categories) # Remaining for target remaining_for_target_s = available_for_audio_s - background_duration_s target_duration_per_rep = effective_durations[target_category] # Calculate reps for target target_reps = max(1, int(remaining_for_target_s / target_duration_per_rep)) actual_target_duration = target_reps * target_duration_per_rep # Verify gap max_background_duration = max(effective_durations[c] for c in background_categories) required_target_duration = max_background_duration * multiplier_longest gap_satisfied = actual_target_duration >= required_target_duration slot_distribution = {c: 1 for c in background_categories} slot_distribution[target_category] = target_reps metadata = { 'available_for_audio_s': available_for_audio_s, 'background_duration_s': background_duration_s, 'remaining_for_target_s': remaining_for_target_s, 'target_reps': target_reps, 'actual_target_duration_s': actual_target_duration, 'max_background_duration_s': max_background_duration, 'required_target_duration_s': required_target_duration, 'multiplier_used': multiplier_longest } else: # shortest # Target gets 1 rep target_duration_s = effective_durations[target_category] # Remaining for backgrounds remaining_for_backgrounds_s = available_for_audio_s - target_duration_s # Distribute remaining to backgrounds as evenly as possible # while ensuring each background is longer than target * 1/multiplier slot_distribution = {target_category: 1} # Calculate minimum required duration for each background min_background_required = target_duration_s / multiplier_shortest background_reps = {} for cat in background_categories: eff_dur = effective_durations[cat] # How many reps needed to exceed min_background_required? min_reps = max(1, int(min_background_required / eff_dur) + 1) background_reps[cat] = min_reps # Check if we have room for all backgrounds total_background_needed = sum( background_reps[c] * effective_durations[c] for c in background_categories ) if total_background_needed <= remaining_for_backgrounds_s: # Distribute extra reps extra_available = remaining_for_backgrounds_s - total_background_needed # Add extra reps to backgrounds proportionally while extra_available > 0: added_any = False for cat in background_categories: eff_dur = effective_durations[cat] if extra_available >= eff_dur: background_reps[cat] += 1 extra_available -= eff_dur added_any = True if not added_any: break slot_distribution.update(background_reps) gap_satisfied = True else: # Not enough room - use minimum reps anyway slot_distribution.update(background_reps) gap_satisfied = False # Calculate actual durations actual_durations = { cat: slot_distribution[cat] * effective_durations[cat] for cat in categories } min_background_actual = min( actual_durations[c] for c in background_categories ) # Re-verify gap gap_satisfied = actual_durations[target_category] <= min_background_actual * multiplier_shortest metadata = { 'available_for_audio_s': available_for_audio_s, 'target_duration_s': target_duration_s, 'remaining_for_backgrounds_s': remaining_for_backgrounds_s, 'min_background_required_s': min_background_required, 'actual_durations_s': actual_durations, 'min_background_actual_s': min_background_actual, 'multiplier_used': multiplier_shortest } return slot_distribution, gap_satisfied, metadata def build_duration_task_audio( source_audio_lists: Dict[str, List[AudioSegment]], slot_distribution: Dict[str, int], effective_durations: Dict[str, float], target_total_duration_s: float, min_silence_between_sources_ms: int = 100, max_extra_silence_per_gap_ms: int = 500, crossfade_within_source_ms: int = 50 ) -> Tuple[AudioSegment, List[str], Dict]: """ Build audio for DURATION task with consecutive ordering per source. Structure: [SourceA × n] + silence + [SourceB × m] + silence + ... Order of sources is randomized to avoid patterns. Args: source_audio_lists: Dict mapping category -> list of audio segments slot_distribution: Dict mapping category -> number of repetitions effective_durations: Dict mapping category -> effective duration per clip target_total_duration_s: Target total duration min_silence_between_sources_ms: Min silence between different sources max_extra_silence_per_gap_ms: Max extra silence per gap crossfade_within_source_ms: Crossfade between same-source repetitions Returns: Tuple of (final_audio, category_sequence, metadata) """ categories = list(slot_distribution.keys()) # Randomize source order random.shuffle(categories) # Build audio blocks for each source source_blocks = [] category_sequence = [] actual_durations = {} block_durations_ms = [] # Track duration of each block for timestamp calculation for category in categories: reps = slot_distribution[category] audio_list = source_audio_lists[category] if reps == 0: continue # Build block for this source block = audio_list[0] for i in range(1, reps): # Use same clip or cycle through available clips next_clip = audio_list[i % len(audio_list)] # Crossfade within same source if crossfade_within_source_ms > 0: if len(block) > crossfade_within_source_ms and len(next_clip) > crossfade_within_source_ms: block = block.append(next_clip, crossfade=crossfade_within_source_ms) else: block = block + next_clip else: block = block + next_clip source_blocks.append((category, block)) block_durations_ms.append(len(block)) category_sequence.extend([category] * reps) actual_durations[category] = len(block) / 1000.0 # Calculate total audio duration and available extra silence total_audio_ms = sum(len(block) for _, block in source_blocks) num_gaps = len(source_blocks) - 1 min_total_silence_ms = num_gaps * min_silence_between_sources_ms target_ms = int(target_total_duration_s * 1000) available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms # Distribute extra silence if available_extra_ms > 0 and num_gaps > 0: extra_silences = distribute_remainder_as_silences( available_extra_ms, num_gaps, max_extra_silence_per_gap_ms ) else: extra_silences = [0] * max(num_gaps, 1) # Concatenate with silences and track timestamps source_timestamps = [] # List of (category, start_ms, end_ms) current_position_ms = 0 if len(source_blocks) == 1: final_audio = source_blocks[0][1] cat, block = source_blocks[0] source_timestamps.append((cat, 0, len(block))) else: final_audio = source_blocks[0][1] cat, block = source_blocks[0] source_timestamps.append((cat, 0, len(block))) current_position_ms = len(block) for i, (cat, block) in enumerate(source_blocks[1:]): gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] silence = AudioSegment.silent(duration=gap_silence_ms) # Prefer crossfading from audio -> silence for a smooth transition, # but avoid crossfading silence -> audio (it cuts the start of the next clip). # Conditions for safe crossfade: # - crossfade length should be less than gap silence # - both segments must be longer than crossfade crossfade_ms = min(500, gap_silence_ms) if crossfade_ms > 0 and crossfade_ms < gap_silence_ms and len(final_audio) > crossfade_ms and len(block) > crossfade_ms: final_audio = final_audio.append(silence, crossfade=crossfade_ms) # Append next block without crossfade to avoid trimming its start final_audio = final_audio.append(block, crossfade=0) # Track timestamp after silence (start of block) start_ms = current_position_ms + gap_silence_ms end_ms = start_ms + len(block) source_timestamps.append((cat, start_ms, end_ms)) current_position_ms = end_ms else: # Fall back to simple concatenation final_audio = final_audio + silence + block start_ms = current_position_ms + gap_silence_ms end_ms = start_ms + len(block) source_timestamps.append((cat, start_ms, end_ms)) current_position_ms = end_ms # Adjust to target duration if len(final_audio) > target_ms: final_audio = final_audio[:target_ms] elif len(final_audio) < target_ms: padding = AudioSegment.silent(duration=target_ms - len(final_audio)) final_audio = final_audio + padding # Build timestamp string: "category1 start-end, category2 start-end, ..." timestamp_parts = [] for cat, start_ms, end_ms in source_timestamps: start_s = round(start_ms / 1000.0, 2) end_s = round(end_ms / 1000.0, 2) duration_s = round((end_ms - start_ms) / 1000.0, 2) timestamp_parts.append(f"{cat} {start_s}s-{end_s}s ({duration_s}s)") timestamp_string = ", ".join(timestamp_parts) metadata = { 'source_order': [cat for cat, _ in source_blocks], 'slot_distribution': slot_distribution, 'actual_durations_s': actual_durations, 'total_audio_ms': total_audio_ms, 'num_gaps': num_gaps, 'final_duration_ms': len(final_audio), 'source_timestamps': source_timestamps, # List of (category, start_ms, end_ms) 'timestamp_string': timestamp_string # Human-readable format } logger.debug( f"Duration task audio: {len(source_blocks)} sources, " f"order={metadata['source_order']}, duration={len(final_audio)}ms" ) return final_audio, category_sequence, metadata