|
|
""" |
|
|
Audio processing utilities for temporal reasoning dataset generation. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import random |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Optional, Tuple, Union |
|
|
|
|
|
import numpy as np |
|
|
from pydub import AudioSegment |
|
|
|
|
|
try: |
|
|
import pyloudnorm as pyln |
|
|
PYLOUDNORM_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYLOUDNORM_AVAILABLE = False |
|
|
|
|
|
from .logger import setup_logger |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
|
|
|
def get_lufs_loudness(audio: AudioSegment) -> float: |
|
|
""" |
|
|
Calculate integrated LUFS loudness (perceived loudness) of an audio segment. |
|
|
|
|
|
LUFS (Loudness Units Full Scale) is the broadcast standard for measuring |
|
|
perceived loudness. It accounts for human hearing sensitivity to different |
|
|
frequencies using K-weighting. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment (pydub AudioSegment) |
|
|
|
|
|
Returns: |
|
|
Loudness in LUFS (negative values, typically -70 to 0) |
|
|
Returns dBFS if pyloudnorm is not available (fallback) |
|
|
""" |
|
|
if not PYLOUDNORM_AVAILABLE: |
|
|
logger.warning("pyloudnorm not available, falling back to dBFS") |
|
|
return audio.dBFS |
|
|
|
|
|
|
|
|
samples = np.array(audio.get_array_of_samples()) |
|
|
|
|
|
|
|
|
if audio.channels == 2: |
|
|
samples = samples.reshape((-1, 2)) |
|
|
|
|
|
|
|
|
if audio.sample_width == 1: |
|
|
samples = samples.astype(np.float64) / 128.0 - 1.0 |
|
|
elif audio.sample_width == 2: |
|
|
samples = samples.astype(np.float64) / 32768.0 |
|
|
elif audio.sample_width == 4: |
|
|
samples = samples.astype(np.float64) / 2147483648.0 |
|
|
else: |
|
|
samples = samples.astype(np.float64) / 32768.0 |
|
|
|
|
|
|
|
|
meter = pyln.Meter(audio.frame_rate) |
|
|
|
|
|
|
|
|
try: |
|
|
loudness = meter.integrated_loudness(samples) |
|
|
|
|
|
if np.isinf(loudness): |
|
|
loudness = -70.0 |
|
|
return loudness |
|
|
except Exception as e: |
|
|
logger.warning(f"LUFS measurement failed: {e}, falling back to dBFS") |
|
|
return audio.dBFS |
|
|
|
|
|
|
|
|
def normalize_to_lufs(audio: AudioSegment, target_lufs: float = -23.0) -> AudioSegment: |
|
|
""" |
|
|
Normalize audio to a target LUFS level (perceived loudness normalization). |
|
|
|
|
|
This is superior to dBFS normalization for comparing different sound types |
|
|
because it accounts for human hearing sensitivity. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
target_lufs: Target loudness level in LUFS (default: -23 LUFS, EBU R128 standard) |
|
|
|
|
|
Returns: |
|
|
Loudness-normalized audio segment |
|
|
""" |
|
|
if not PYLOUDNORM_AVAILABLE: |
|
|
logger.warning("pyloudnorm not available, falling back to dBFS normalization") |
|
|
change_db = target_lufs - audio.dBFS |
|
|
return audio.apply_gain(change_db) |
|
|
|
|
|
current_lufs = get_lufs_loudness(audio) |
|
|
|
|
|
|
|
|
gain_db = target_lufs - current_lufs |
|
|
|
|
|
|
|
|
normalized = audio.apply_gain(gain_db) |
|
|
|
|
|
logger.debug(f"Normalized LUFS: {current_lufs:.2f} -> {get_lufs_loudness(normalized):.2f} LUFS") |
|
|
|
|
|
return normalized |
|
|
|
|
|
|
|
|
class AudioProcessor: |
|
|
"""Handles audio loading, processing, and concatenation.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
crossfade_duration: int = 500, |
|
|
silence_duration: int = 1000, |
|
|
with_silence: bool = True, |
|
|
normalize: bool = False, |
|
|
normalize_target_dBFS: float = -20.0, |
|
|
synthetic_silence_path: Optional[str] = None |
|
|
): |
|
|
""" |
|
|
Initialize the audio processor. |
|
|
|
|
|
Args: |
|
|
crossfade_duration: Duration of crossfade in milliseconds |
|
|
silence_duration: Duration of silence between clips in milliseconds |
|
|
with_silence: Whether to add silence between clips |
|
|
normalize: Whether to normalize audio levels |
|
|
normalize_target_dBFS: Target dBFS level for normalization |
|
|
synthetic_silence_path: Path to synthetic silence audio files |
|
|
""" |
|
|
self.crossfade_duration = crossfade_duration |
|
|
self.silence_duration = silence_duration |
|
|
self.with_silence = with_silence |
|
|
self.normalize = normalize |
|
|
self.normalize_target_dBFS = normalize_target_dBFS |
|
|
self.synthetic_silence_path = synthetic_silence_path |
|
|
self._silence_cache = {} |
|
|
|
|
|
def load_audio(self, audio_path: str) -> AudioSegment: |
|
|
""" |
|
|
Load an audio file. |
|
|
|
|
|
Args: |
|
|
audio_path: Path to the audio file |
|
|
|
|
|
Returns: |
|
|
Loaded audio segment |
|
|
""" |
|
|
try: |
|
|
audio = AudioSegment.from_file(audio_path, format="wav") |
|
|
logger.debug(f"Loaded audio: {audio_path}, duration: {len(audio)}ms") |
|
|
return audio |
|
|
except Exception as e: |
|
|
logger.error(f"Error loading audio {audio_path}: {e}") |
|
|
raise |
|
|
|
|
|
def normalize_audio(self, audio: AudioSegment, target_dBFS: Optional[float] = None) -> AudioSegment: |
|
|
""" |
|
|
Normalize audio to a target dBFS level. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
target_dBFS: Target dBFS level (uses default if None) |
|
|
|
|
|
Returns: |
|
|
Normalized audio segment |
|
|
""" |
|
|
if target_dBFS is None: |
|
|
target_dBFS = self.normalize_target_dBFS |
|
|
|
|
|
change_in_dBFS = target_dBFS - audio.dBFS |
|
|
normalized = audio.apply_gain(change_in_dBFS) |
|
|
logger.debug(f"Normalized audio: {audio.dBFS:.2f} dBFS -> {normalized.dBFS:.2f} dBFS") |
|
|
return normalized |
|
|
|
|
|
def adjust_volume(self, audio: AudioSegment, volume_db: float) -> AudioSegment: |
|
|
""" |
|
|
Adjust audio volume by a specific dB amount. |
|
|
|
|
|
Args: |
|
|
audio: Input audio segment |
|
|
volume_db: Volume adjustment in dB (positive = louder, negative = quieter) |
|
|
|
|
|
Returns: |
|
|
Volume-adjusted audio segment |
|
|
""" |
|
|
adjusted = audio.apply_gain(volume_db) |
|
|
logger.debug(f"Adjusted volume by {volume_db} dB: {audio.dBFS:.2f} -> {adjusted.dBFS:.2f} dBFS") |
|
|
return adjusted |
|
|
|
|
|
def get_silence(self, duration: Optional[int] = None) -> AudioSegment: |
|
|
""" |
|
|
Get a silence audio segment, using synthetic silence if available. |
|
|
|
|
|
Args: |
|
|
duration: Duration in milliseconds (uses default if None) |
|
|
|
|
|
Returns: |
|
|
Silence audio segment |
|
|
""" |
|
|
if duration is None: |
|
|
duration = self.silence_duration |
|
|
|
|
|
|
|
|
if duration in self._silence_cache: |
|
|
return self._silence_cache[duration] |
|
|
|
|
|
|
|
|
if self.synthetic_silence_path and os.path.exists(self.synthetic_silence_path): |
|
|
silence_files = list(Path(self.synthetic_silence_path).glob("*.wav")) |
|
|
if silence_files: |
|
|
silence = self.load_audio(str(random.choice(silence_files))) |
|
|
|
|
|
if len(silence) < duration: |
|
|
|
|
|
repetitions = (duration // len(silence)) + 1 |
|
|
silence = silence * repetitions |
|
|
silence = silence[:duration] |
|
|
self._silence_cache[duration] = silence |
|
|
logger.debug(f"Using synthetic silence: {duration}ms") |
|
|
return silence |
|
|
|
|
|
|
|
|
silence = AudioSegment.silent(duration=duration) |
|
|
self._silence_cache[duration] = silence |
|
|
logger.debug(f"Using pure silence: {duration}ms") |
|
|
return silence |
|
|
|
|
|
def concatenate_audios( |
|
|
self, |
|
|
audio_list: List[AudioSegment], |
|
|
normalize_each: bool = False, |
|
|
volume_adjustments: Optional[List[float]] = None |
|
|
) -> AudioSegment: |
|
|
""" |
|
|
Concatenate multiple audio segments with crossfade and optional silence. |
|
|
|
|
|
Args: |
|
|
audio_list: List of audio segments to concatenate |
|
|
normalize_each: Whether to normalize each audio before concatenation |
|
|
volume_adjustments: Optional list of volume adjustments (in dB) for each audio |
|
|
|
|
|
Returns: |
|
|
Concatenated audio segment |
|
|
""" |
|
|
if not audio_list: |
|
|
raise ValueError("audio_list cannot be empty") |
|
|
|
|
|
if len(audio_list) == 1: |
|
|
audio = audio_list[0] |
|
|
if normalize_each and self.normalize: |
|
|
audio = self.normalize_audio(audio) |
|
|
if volume_adjustments and len(volume_adjustments) > 0: |
|
|
audio = self.adjust_volume(audio, volume_adjustments[0]) |
|
|
return audio |
|
|
|
|
|
|
|
|
merged = audio_list[0] |
|
|
if normalize_each and self.normalize: |
|
|
merged = self.normalize_audio(merged) |
|
|
if volume_adjustments and len(volume_adjustments) > 0: |
|
|
merged = self.adjust_volume(merged, volume_adjustments[0]) |
|
|
|
|
|
|
|
|
for i, audio in enumerate(audio_list[1:], start=1): |
|
|
|
|
|
current = audio |
|
|
if normalize_each and self.normalize: |
|
|
current = self.normalize_audio(current) |
|
|
if volume_adjustments and len(volume_adjustments) > i: |
|
|
current = self.adjust_volume(current, volume_adjustments[i]) |
|
|
|
|
|
|
|
|
if self.with_silence: |
|
|
silence = self.get_silence() |
|
|
|
|
|
merged = merged.append(silence, crossfade=self.crossfade_duration) |
|
|
|
|
|
|
|
|
|
|
|
merged = merged.append(current, crossfade=0) |
|
|
|
|
|
logger.debug(f"Concatenated {len(audio_list)} audio segments, total duration: {len(merged)}ms") |
|
|
return merged |
|
|
|
|
|
def concatenate_audio_files( |
|
|
self, |
|
|
audio_paths: List[str], |
|
|
output_path: str, |
|
|
normalize_each: bool = False, |
|
|
volume_adjustments: Optional[List[float]] = None, |
|
|
target_durations: Optional[List[float]] = None |
|
|
) -> Tuple[AudioSegment, dict]: |
|
|
""" |
|
|
Load, concatenate, and save multiple audio files. |
|
|
|
|
|
Args: |
|
|
audio_paths: List of paths to audio files |
|
|
output_path: Path to save the concatenated audio |
|
|
normalize_each: Whether to normalize each audio before concatenation |
|
|
volume_adjustments: Optional list of volume adjustments (in dB) for each audio |
|
|
target_durations: Optional list of target durations (in seconds) for each clip |
|
|
|
|
|
Returns: |
|
|
Tuple of (concatenated audio segment, metadata dict) |
|
|
""" |
|
|
|
|
|
audio_segments = [] |
|
|
for i, path in enumerate(audio_paths): |
|
|
audio = self.load_audio(path) |
|
|
|
|
|
|
|
|
if target_durations and i < len(target_durations): |
|
|
target_ms = int(target_durations[i] * 1000) |
|
|
audio = trim_or_repeat_audio(audio, target_ms) |
|
|
logger.debug(f"Adjusted clip {i} to {len(audio)}ms (target: {target_ms}ms)") |
|
|
|
|
|
audio_segments.append(audio) |
|
|
|
|
|
|
|
|
merged = self.concatenate_audios(audio_segments, normalize_each, volume_adjustments) |
|
|
|
|
|
|
|
|
output_path = Path(output_path) |
|
|
output_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
merged.export(str(output_path), format="wav") |
|
|
logger.info(f"Saved concatenated audio: {output_path}") |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"output_path": str(output_path), |
|
|
"source_files": audio_paths, |
|
|
"num_sources": len(audio_paths), |
|
|
"total_duration_ms": len(merged), |
|
|
"total_duration_s": len(merged) / 1000.0, |
|
|
"individual_durations_ms": [len(a) for a in audio_segments], |
|
|
"individual_durations_s": [len(a) / 1000.0 for a in audio_segments], |
|
|
"target_durations_s": target_durations if target_durations else [], |
|
|
"volume_adjustments_db": volume_adjustments if volume_adjustments else [] |
|
|
} |
|
|
|
|
|
return merged, metadata |
|
|
|
|
|
|
|
|
def generate_sample_durations_for_task( |
|
|
task_duration_hours: float, |
|
|
min_clip_duration: float, |
|
|
max_clip_duration: float |
|
|
) -> list: |
|
|
""" |
|
|
Generate sample durations that exactly fill the target task duration. |
|
|
|
|
|
Algorithm: |
|
|
1. Start with remaining = total_seconds |
|
|
2. While remaining >= min_clip_duration: |
|
|
- Sample d ~ Uniform(min, min(max, remaining)) |
|
|
- Append d to durations list |
|
|
- Subtract d from remaining |
|
|
3. Return shuffled list of durations |
|
|
|
|
|
This ensures: |
|
|
- Total of all durations ≈ task_duration (within min_clip_duration tolerance) |
|
|
- Each duration is uniformly sampled within valid range |
|
|
- No overshoot of target duration |
|
|
|
|
|
Args: |
|
|
task_duration_hours: Total duration for the task in hours |
|
|
min_clip_duration: Minimum duration per clip in seconds |
|
|
max_clip_duration: Maximum duration per clip in seconds |
|
|
|
|
|
Returns: |
|
|
List of sample durations in seconds (shuffled) |
|
|
""" |
|
|
task_duration_seconds = task_duration_hours * 3600 |
|
|
remaining = task_duration_seconds |
|
|
durations = [] |
|
|
|
|
|
while remaining >= min_clip_duration: |
|
|
|
|
|
effective_max = min(max_clip_duration, remaining) |
|
|
|
|
|
|
|
|
if effective_max < min_clip_duration: |
|
|
break |
|
|
|
|
|
|
|
|
d = random.uniform(min_clip_duration, effective_max) |
|
|
durations.append(d) |
|
|
remaining -= d |
|
|
|
|
|
|
|
|
random.shuffle(durations) |
|
|
|
|
|
total_duration = sum(durations) |
|
|
logger.info(f"Task duration target: {task_duration_hours}h ({task_duration_seconds:.1f}s)") |
|
|
logger.info(f"Generated {len(durations)} sample durations, total: {total_duration:.1f}s") |
|
|
logger.info(f"Duration range: [{min(durations):.1f}s, {max(durations):.1f}s], " |
|
|
f"mean: {total_duration/len(durations):.1f}s") |
|
|
logger.info(f"Unused remainder: {remaining:.1f}s ({remaining/task_duration_seconds*100:.2f}%)") |
|
|
|
|
|
return durations |
|
|
|
|
|
|
|
|
def calculate_num_samples_for_task( |
|
|
task_duration_hours: float, |
|
|
min_clip_duration: float, |
|
|
max_clip_duration: float |
|
|
) -> int: |
|
|
""" |
|
|
Calculate number of samples needed to fill the task duration. |
|
|
|
|
|
DEPRECATED: Use generate_sample_durations_for_task() instead for exact duration filling. |
|
|
This function is kept for backward compatibility but uses average-based estimation. |
|
|
|
|
|
Args: |
|
|
task_duration_hours: Total duration for the task in hours |
|
|
min_clip_duration: Minimum duration per clip in seconds |
|
|
max_clip_duration: Maximum duration per clip in seconds |
|
|
|
|
|
Returns: |
|
|
Number of samples to generate (estimate) |
|
|
""" |
|
|
task_duration_seconds = task_duration_hours * 3600 |
|
|
avg_clip_duration = (min_clip_duration + max_clip_duration) / 2 |
|
|
num_samples = int(task_duration_seconds / avg_clip_duration) |
|
|
|
|
|
logger.info(f"Task duration: {task_duration_hours}h ({task_duration_seconds}s)") |
|
|
logger.info(f"Avg clip duration: {avg_clip_duration}s (min: {min_clip_duration}s, max: {max_clip_duration}s)") |
|
|
logger.info(f"Calculated number of samples: {num_samples}") |
|
|
|
|
|
return max(1, num_samples) |
|
|
|
|
|
|
|
|
def generate_single_clip_duration( |
|
|
min_duration: float, |
|
|
max_duration: float |
|
|
) -> float: |
|
|
""" |
|
|
Generate a random clip duration between min and max. |
|
|
|
|
|
Args: |
|
|
min_duration: Minimum duration in seconds |
|
|
max_duration: Maximum duration in seconds |
|
|
|
|
|
Returns: |
|
|
Random duration in seconds |
|
|
""" |
|
|
return random.uniform(min_duration, max_duration) |
|
|
|
|
|
|
|
|
def concatenate_to_target_duration( |
|
|
base_audio: AudioSegment, |
|
|
target_duration_seconds: float, |
|
|
crossfade_ms: int = 0 |
|
|
) -> AudioSegment: |
|
|
""" |
|
|
Concatenate a base audio clip to reach target duration. |
|
|
|
|
|
This takes a 5-second ESC-50 clip and repeats it to create a longer clip. |
|
|
|
|
|
Args: |
|
|
base_audio: Original 5s audio segment |
|
|
target_duration_seconds: Target duration in seconds |
|
|
crossfade_ms: Crossfade between repetitions in milliseconds |
|
|
|
|
|
Returns: |
|
|
Audio segment of target duration |
|
|
""" |
|
|
target_duration_ms = int(target_duration_seconds * 1000) |
|
|
base_duration_ms = len(base_audio) |
|
|
|
|
|
if target_duration_ms <= base_duration_ms: |
|
|
|
|
|
return base_audio[:target_duration_ms] |
|
|
|
|
|
|
|
|
num_repetitions = (target_duration_ms // base_duration_ms) + 1 |
|
|
|
|
|
|
|
|
result = base_audio |
|
|
for i in range(1, num_repetitions): |
|
|
if crossfade_ms > 0: |
|
|
result = result.append(base_audio, crossfade=crossfade_ms) |
|
|
else: |
|
|
result = result + base_audio |
|
|
|
|
|
|
|
|
if len(result) >= target_duration_ms: |
|
|
break |
|
|
|
|
|
|
|
|
return result[:target_duration_ms] |
|
|
|
|
|
|
|
|
def set_random_seed(seed: int): |
|
|
"""Set random seed for reproducibility.""" |
|
|
random.seed(seed) |
|
|
np.random.seed(seed) |
|
|
logger.info(f"Random seed set to: {seed}") |
|
|
|
|
|
|
|
|
def get_max_clip_num_to_be_joined( |
|
|
target_duration_seconds: float, |
|
|
source_clip_duration_seconds: float, |
|
|
min_silence_ms: int = 100 |
|
|
) -> Tuple[int, float]: |
|
|
""" |
|
|
Calculate the maximum number of source clips needed to reach target duration. |
|
|
|
|
|
Pipeline: pick dataset -> pick class -> pick audio clip -> get duration -> |
|
|
concatenate clips to reach target duration -> modulo to get num clips -> |
|
|
inserting silences randomly based on remainder. |
|
|
|
|
|
Args: |
|
|
target_duration_seconds: Target total duration in seconds |
|
|
source_clip_duration_seconds: Duration of each source clip (e.g., 5s for ESC-50) |
|
|
min_silence_ms: Minimum silence between clips in milliseconds |
|
|
|
|
|
Returns: |
|
|
Tuple of (num_clips_needed, remainder_seconds_for_silences) |
|
|
- num_clips_needed: How many source clips to concatenate |
|
|
- remainder_seconds_for_silences: Extra time to distribute as random silences |
|
|
|
|
|
Example: |
|
|
target=30s, source=5s -> (6, 0.0) - exactly 6 clips, no extra silence |
|
|
target=32s, source=5s -> (6, 2.0) - 6 clips + 2s distributed as silences |
|
|
""" |
|
|
target_ms = target_duration_seconds * 1000 |
|
|
source_ms = source_clip_duration_seconds * 1000 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
num_clips = int(target_ms // source_ms) |
|
|
num_clips = max(1, num_clips) |
|
|
|
|
|
|
|
|
clips_duration_ms = num_clips * source_ms |
|
|
|
|
|
|
|
|
num_gaps = max(0, num_clips - 1) |
|
|
min_total_silence_ms = num_gaps * min_silence_ms |
|
|
|
|
|
|
|
|
while num_clips > 1 and (clips_duration_ms + min_total_silence_ms) > target_ms: |
|
|
num_clips -= 1 |
|
|
clips_duration_ms = num_clips * source_ms |
|
|
num_gaps = num_clips - 1 |
|
|
min_total_silence_ms = num_gaps * min_silence_ms |
|
|
|
|
|
|
|
|
remainder_ms = target_ms - clips_duration_ms - min_total_silence_ms |
|
|
remainder_seconds = max(0, remainder_ms / 1000.0) |
|
|
|
|
|
logger.debug( |
|
|
f"get_max_clip_num: target={target_duration_seconds}s, source={source_clip_duration_seconds}s " |
|
|
f"-> {num_clips} clips, {remainder_seconds:.3f}s remainder for extra silences" |
|
|
) |
|
|
|
|
|
return num_clips, remainder_seconds |
|
|
|
|
|
|
|
|
def build_clip_sequence_with_silences( |
|
|
audio_segments: List[AudioSegment], |
|
|
target_duration_seconds: float, |
|
|
min_silence_ms: int = 100, |
|
|
max_extra_silence_per_gap_ms: int = 500, |
|
|
crossfade_ms: int = 0 |
|
|
) -> AudioSegment: |
|
|
""" |
|
|
Build a final audio clip by concatenating segments with guaranteed silences. |
|
|
|
|
|
Ensures: |
|
|
1. All clips are joined with at least min_silence_ms between them |
|
|
2. Any remainder duration is distributed as random extra silences in gaps |
|
|
3. Final duration matches target_duration_seconds exactly |
|
|
|
|
|
Args: |
|
|
audio_segments: List of audio segments to concatenate |
|
|
target_duration_seconds: Target total duration in seconds |
|
|
min_silence_ms: Minimum silence between each pair of clips (always inserted) |
|
|
max_extra_silence_per_gap_ms: Maximum extra silence to add per gap |
|
|
crossfade_ms: Crossfade duration in ms (applied when joining) |
|
|
|
|
|
Returns: |
|
|
Concatenated audio segment of exact target duration |
|
|
""" |
|
|
if not audio_segments: |
|
|
raise ValueError("audio_segments cannot be empty") |
|
|
|
|
|
target_ms = int(target_duration_seconds * 1000) |
|
|
|
|
|
if len(audio_segments) == 1: |
|
|
|
|
|
audio = audio_segments[0] |
|
|
if len(audio) >= target_ms: |
|
|
return audio[:target_ms] |
|
|
else: |
|
|
|
|
|
return concatenate_to_target_duration(audio, target_duration_seconds, crossfade_ms) |
|
|
|
|
|
|
|
|
total_audio_ms = sum(len(seg) for seg in audio_segments) |
|
|
num_gaps = len(audio_segments) - 1 |
|
|
|
|
|
|
|
|
min_total_silence_ms = num_gaps * min_silence_ms |
|
|
|
|
|
|
|
|
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms |
|
|
|
|
|
if available_extra_ms < 0: |
|
|
|
|
|
logger.warning( |
|
|
f"Clips too long for target duration. Total audio: {total_audio_ms}ms, " |
|
|
f"target: {target_ms}ms. Will trim final result." |
|
|
) |
|
|
available_extra_ms = 0 |
|
|
|
|
|
|
|
|
extra_silences_ms = distribute_remainder_as_silences( |
|
|
available_extra_ms, |
|
|
num_gaps, |
|
|
max_extra_silence_per_gap_ms |
|
|
) |
|
|
|
|
|
|
|
|
result = audio_segments[0] |
|
|
|
|
|
for i, audio in enumerate(audio_segments[1:]): |
|
|
|
|
|
gap_silence_ms = min_silence_ms + extra_silences_ms[i] |
|
|
|
|
|
|
|
|
silence = AudioSegment.silent(duration=gap_silence_ms) |
|
|
|
|
|
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms: |
|
|
|
|
|
result = result.append(silence, crossfade=crossfade_ms) |
|
|
result = result.append(audio, crossfade=0) |
|
|
else: |
|
|
result = result + silence + audio |
|
|
|
|
|
|
|
|
if len(result) > target_ms: |
|
|
result = result[:target_ms] |
|
|
elif len(result) < target_ms: |
|
|
|
|
|
padding = AudioSegment.silent(duration=target_ms - len(result)) |
|
|
result = result + padding |
|
|
|
|
|
logger.debug( |
|
|
f"Built clip sequence: {len(audio_segments)} segments, " |
|
|
f"final duration: {len(result)}ms (target: {target_ms}ms)" |
|
|
) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def distribute_remainder_as_silences( |
|
|
remainder_ms: float, |
|
|
num_gaps: int, |
|
|
max_per_gap_ms: int = 500 |
|
|
) -> List[int]: |
|
|
""" |
|
|
Distribute remainder time as random silences across gaps. |
|
|
|
|
|
Args: |
|
|
remainder_ms: Total extra time to distribute (in ms) |
|
|
num_gaps: Number of gaps between clips |
|
|
max_per_gap_ms: Maximum extra silence per gap |
|
|
|
|
|
Returns: |
|
|
List of extra silence durations (in ms) for each gap |
|
|
""" |
|
|
if num_gaps <= 0: |
|
|
return [] |
|
|
|
|
|
remainder_ms = int(max(0, remainder_ms)) |
|
|
|
|
|
if remainder_ms == 0: |
|
|
return [0] * num_gaps |
|
|
|
|
|
|
|
|
weights = [random.random() for _ in range(num_gaps)] |
|
|
total_weight = sum(weights) |
|
|
|
|
|
if total_weight == 0: |
|
|
|
|
|
weights = [1.0] * num_gaps |
|
|
total_weight = num_gaps |
|
|
|
|
|
|
|
|
extra_silences = [] |
|
|
remaining = remainder_ms |
|
|
|
|
|
for i, w in enumerate(weights): |
|
|
if i == num_gaps - 1: |
|
|
|
|
|
extra = min(remaining, max_per_gap_ms) |
|
|
else: |
|
|
proportion = w / total_weight |
|
|
extra = int(remainder_ms * proportion) |
|
|
extra = min(extra, max_per_gap_ms, remaining) |
|
|
|
|
|
extra_silences.append(extra) |
|
|
remaining -= extra |
|
|
total_weight -= w |
|
|
|
|
|
|
|
|
while remaining > 0: |
|
|
for i in range(num_gaps): |
|
|
if extra_silences[i] < max_per_gap_ms and remaining > 0: |
|
|
add = min(remaining, max_per_gap_ms - extra_silences[i]) |
|
|
extra_silences[i] += add |
|
|
remaining -= add |
|
|
if remaining > 0: |
|
|
|
|
|
break |
|
|
|
|
|
logger.debug(f"Distributed {remainder_ms}ms across {num_gaps} gaps: {extra_silences}") |
|
|
|
|
|
return extra_silences |
|
|
|
|
|
|
|
|
def repeat_clips_to_fill_duration( |
|
|
source_audios: List[AudioSegment], |
|
|
source_categories: List[str], |
|
|
target_duration_seconds: float, |
|
|
source_clip_duration_seconds: float = 5.0, |
|
|
min_silence_ms: int = 100 |
|
|
) -> Tuple[List[AudioSegment], List[str], int]: |
|
|
""" |
|
|
Repeat source clips to fill target duration, cycling through all sources. |
|
|
|
|
|
This ensures all unique sources appear and are repeated proportionally. |
|
|
|
|
|
Args: |
|
|
source_audios: List of unique source audio segments |
|
|
source_categories: List of category names corresponding to source_audios |
|
|
target_duration_seconds: Target total duration |
|
|
source_clip_duration_seconds: Duration of each source clip |
|
|
min_silence_ms: Minimum silence between clips |
|
|
|
|
|
Returns: |
|
|
Tuple of (expanded_audio_list, expanded_categories, num_clips) |
|
|
""" |
|
|
num_clips, remainder = get_max_clip_num_to_be_joined( |
|
|
target_duration_seconds, |
|
|
source_clip_duration_seconds, |
|
|
min_silence_ms |
|
|
) |
|
|
|
|
|
num_sources = len(source_audios) |
|
|
|
|
|
if num_sources == 0: |
|
|
raise ValueError("source_audios cannot be empty") |
|
|
|
|
|
|
|
|
expanded_audios = [] |
|
|
expanded_categories = [] |
|
|
|
|
|
for i in range(num_clips): |
|
|
idx = i % num_sources |
|
|
expanded_audios.append(source_audios[idx]) |
|
|
expanded_categories.append(source_categories[idx]) |
|
|
|
|
|
logger.debug( |
|
|
f"Repeated {num_sources} sources to {num_clips} clips for " |
|
|
f"{target_duration_seconds}s target duration" |
|
|
) |
|
|
|
|
|
return expanded_audios, expanded_categories, num_clips |
|
|
|
|
|
|
|
|
def build_consecutive_sources_for_count_task( |
|
|
source_audios: List[AudioSegment], |
|
|
source_categories: List[str], |
|
|
target_duration_seconds: float, |
|
|
source_clip_duration_seconds: float = 5.0, |
|
|
min_silence_between_sources_ms: int = 100, |
|
|
max_extra_silence_per_gap_ms: int = 500, |
|
|
crossfade_within_source_ms: int = 50 |
|
|
) -> Tuple[AudioSegment, List[str], dict]: |
|
|
""" |
|
|
Build audio for COUNT task with consecutive same-class clips. |
|
|
|
|
|
For count task, same-class clips must be consecutive (AAA BBB CCC) so they |
|
|
are perceived as ONE sound source. Silences are only inserted BETWEEN |
|
|
different classes, not within same-class repetitions. |
|
|
|
|
|
Pipeline: pick classes -> for each class concatenate clips consecutively -> |
|
|
insert silences only between different classes -> distribute remainder |
|
|
|
|
|
Args: |
|
|
source_audios: List of unique source audio segments (one per class) |
|
|
source_categories: List of category names |
|
|
target_duration_seconds: Target total duration |
|
|
source_clip_duration_seconds: Duration of each source clip |
|
|
min_silence_between_sources_ms: Minimum silence between different sources |
|
|
max_extra_silence_per_gap_ms: Max extra silence per gap for remainder distribution |
|
|
crossfade_within_source_ms: Small crossfade within same-source repetitions |
|
|
|
|
|
Returns: |
|
|
Tuple of (final_audio, category_sequence, metadata_dict) |
|
|
""" |
|
|
target_ms = int(target_duration_seconds * 1000) |
|
|
source_ms = int(source_clip_duration_seconds * 1000) |
|
|
num_sources = len(source_audios) |
|
|
|
|
|
if num_sources == 0: |
|
|
raise ValueError("source_audios cannot be empty") |
|
|
|
|
|
|
|
|
num_clips, remainder_seconds = get_max_clip_num_to_be_joined( |
|
|
target_duration_seconds, |
|
|
source_clip_duration_seconds, |
|
|
min_silence_between_sources_ms |
|
|
) |
|
|
|
|
|
|
|
|
if num_sources > num_clips: |
|
|
logger.warning( |
|
|
f"More sources ({num_sources}) than clips that fit ({num_clips}). " |
|
|
f"Each source needs at least 1 clip, so output may exceed target duration. " |
|
|
f"Consider capping n_unique_audios <= max_clips in task_count.py" |
|
|
) |
|
|
|
|
|
num_clips = num_sources |
|
|
|
|
|
|
|
|
|
|
|
base_reps = num_clips // num_sources |
|
|
extra_reps = num_clips % num_sources |
|
|
|
|
|
repetitions_per_source = [] |
|
|
for i in range(num_sources): |
|
|
reps = base_reps + (1 if i < extra_reps else 0) |
|
|
repetitions_per_source.append(reps) |
|
|
|
|
|
|
|
|
random.shuffle(repetitions_per_source) |
|
|
|
|
|
|
|
|
source_blocks = [] |
|
|
category_sequence = [] |
|
|
|
|
|
for i, (audio, category, reps) in enumerate(zip(source_audios, source_categories, repetitions_per_source)): |
|
|
if reps == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
block = audio |
|
|
for _ in range(reps - 1): |
|
|
if crossfade_within_source_ms > 0: |
|
|
block = block.append(audio, crossfade=crossfade_within_source_ms) |
|
|
else: |
|
|
block = block + audio |
|
|
|
|
|
source_blocks.append(block) |
|
|
category_sequence.append(category) |
|
|
|
|
|
|
|
|
|
|
|
num_gaps = len(source_blocks) - 1 |
|
|
|
|
|
if num_gaps <= 0: |
|
|
|
|
|
final_audio = source_blocks[0] |
|
|
else: |
|
|
|
|
|
total_blocks_ms = sum(len(block) for block in source_blocks) |
|
|
min_total_silence_ms = num_gaps * min_silence_between_sources_ms |
|
|
|
|
|
|
|
|
available_extra_ms = target_ms - total_blocks_ms - min_total_silence_ms |
|
|
available_extra_ms = max(0, available_extra_ms) |
|
|
|
|
|
|
|
|
extra_silences = distribute_remainder_as_silences( |
|
|
available_extra_ms, |
|
|
num_gaps, |
|
|
max_extra_silence_per_gap_ms |
|
|
) |
|
|
|
|
|
|
|
|
final_audio = source_blocks[0] |
|
|
for i, block in enumerate(source_blocks[1:]): |
|
|
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] |
|
|
silence = AudioSegment.silent(duration=gap_silence_ms) |
|
|
final_audio = final_audio + silence + block |
|
|
|
|
|
|
|
|
if len(final_audio) > target_ms: |
|
|
final_audio = final_audio[:target_ms] |
|
|
elif len(final_audio) < target_ms: |
|
|
padding = AudioSegment.silent(duration=target_ms - len(final_audio)) |
|
|
final_audio = final_audio + padding |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'num_unique_sources': num_sources, |
|
|
'total_clips': num_clips, |
|
|
'ordering_mode': 'consecutive', |
|
|
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), |
|
|
'target_duration_ms': target_ms, |
|
|
'actual_duration_ms': len(final_audio), |
|
|
'num_gaps_between_sources': num_gaps |
|
|
} |
|
|
|
|
|
logger.debug( |
|
|
f"Count task (consecutive): {num_sources} sources, {num_clips} total clips, " |
|
|
f"reps={repetitions_per_source}, duration={len(final_audio)}ms" |
|
|
) |
|
|
|
|
|
return final_audio, category_sequence, metadata |
|
|
|
|
|
|
|
|
def build_random_order_for_count_task( |
|
|
source_audios: List[AudioSegment], |
|
|
source_categories: List[str], |
|
|
target_duration_seconds: float, |
|
|
source_clip_duration_seconds: float = 5.0, |
|
|
min_silence_ms: int = 100, |
|
|
max_extra_silence_per_gap_ms: int = 500 |
|
|
) -> Tuple[AudioSegment, List[str], dict]: |
|
|
""" |
|
|
Build audio for COUNT task with RANDOM ordering of clips. |
|
|
|
|
|
Clips from different sources are shuffled randomly (A B A C B A C...). |
|
|
This tests whether the model can recognize recurring sounds as the same source. |
|
|
Silences are inserted between ALL clips (same or different source). |
|
|
|
|
|
Pipeline: |
|
|
1. Calculate total clips needed |
|
|
2. Distribute clips across sources |
|
|
3. Create expanded list with all clip instances |
|
|
4. Shuffle randomly |
|
|
5. Insert silences between ALL clips |
|
|
6. Distribute remainder as extra random silences |
|
|
|
|
|
Args: |
|
|
source_audios: List of unique source audio segments (one per class) |
|
|
source_categories: List of category names |
|
|
target_duration_seconds: Target total duration |
|
|
source_clip_duration_seconds: Duration of each source clip |
|
|
min_silence_ms: Minimum silence between ALL clips |
|
|
max_extra_silence_per_gap_ms: Max extra silence per gap |
|
|
|
|
|
Returns: |
|
|
Tuple of (final_audio, clip_sequence, metadata_dict) |
|
|
""" |
|
|
target_ms = int(target_duration_seconds * 1000) |
|
|
source_ms = int(source_clip_duration_seconds * 1000) |
|
|
num_sources = len(source_audios) |
|
|
|
|
|
if num_sources == 0: |
|
|
raise ValueError("source_audios cannot be empty") |
|
|
|
|
|
|
|
|
num_clips, remainder_seconds = get_max_clip_num_to_be_joined( |
|
|
target_duration_seconds, |
|
|
source_clip_duration_seconds, |
|
|
min_silence_ms |
|
|
) |
|
|
|
|
|
|
|
|
if num_sources > num_clips: |
|
|
logger.warning( |
|
|
f"More sources ({num_sources}) than clips that fit ({num_clips}). " |
|
|
f"Each source needs at least 1 clip, so output may exceed target duration. " |
|
|
f"Consider capping n_unique_audios <= max_clips in task_count.py" |
|
|
) |
|
|
|
|
|
num_clips = num_sources |
|
|
|
|
|
|
|
|
base_reps = num_clips // num_sources |
|
|
extra_reps = num_clips % num_sources |
|
|
|
|
|
repetitions_per_source = [] |
|
|
for i in range(num_sources): |
|
|
reps = base_reps + (1 if i < extra_reps else 0) |
|
|
repetitions_per_source.append(reps) |
|
|
|
|
|
|
|
|
expanded_clips = [] |
|
|
for audio, category, reps in zip(source_audios, source_categories, repetitions_per_source): |
|
|
for _ in range(reps): |
|
|
expanded_clips.append((audio, category)) |
|
|
|
|
|
|
|
|
random.shuffle(expanded_clips) |
|
|
|
|
|
|
|
|
shuffled_audios = [clip[0] for clip in expanded_clips] |
|
|
clip_sequence = [clip[1] for clip in expanded_clips] |
|
|
|
|
|
|
|
|
final_audio = build_clip_sequence_with_silences( |
|
|
shuffled_audios, |
|
|
target_duration_seconds, |
|
|
min_silence_ms=min_silence_ms, |
|
|
max_extra_silence_per_gap_ms=max_extra_silence_per_gap_ms, |
|
|
crossfade_ms=0 |
|
|
) |
|
|
|
|
|
|
|
|
metadata = { |
|
|
'num_unique_sources': num_sources, |
|
|
'total_clips': len(expanded_clips), |
|
|
'ordering_mode': 'random', |
|
|
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)), |
|
|
'clip_sequence': clip_sequence, |
|
|
'target_duration_ms': target_ms, |
|
|
'actual_duration_ms': len(final_audio), |
|
|
'num_gaps': len(expanded_clips) - 1 |
|
|
} |
|
|
|
|
|
logger.debug( |
|
|
f"Count task (random): {num_sources} sources, {len(expanded_clips)} clips, " |
|
|
f"sequence={clip_sequence[:5]}..., duration={len(final_audio)}ms" |
|
|
) |
|
|
|
|
|
return final_audio, clip_sequence, metadata |
|
|
|
|
|
|
|
|
def build_count_task_audio( |
|
|
source_audios: List[AudioSegment], |
|
|
source_categories: List[str], |
|
|
target_duration_seconds: float, |
|
|
ordering_mode: str = "random", |
|
|
source_clip_duration_seconds: float = 5.0, |
|
|
min_silence_ms: int = 100, |
|
|
max_extra_silence_per_gap_ms: int = 500, |
|
|
crossfade_within_source_ms: int = 50 |
|
|
) -> Tuple[AudioSegment, List[str], dict]: |
|
|
""" |
|
|
Build audio for COUNT task with configurable ordering mode. |
|
|
|
|
|
Args: |
|
|
source_audios: List of unique source audio segments (one per class) |
|
|
source_categories: List of category names |
|
|
target_duration_seconds: Target total duration |
|
|
ordering_mode: "random" or "consecutive" |
|
|
- "random": Clips shuffled (A B A C B A C) - tests sound recognition |
|
|
- "consecutive": Same-source grouped (AAA BBB CCC) - easier |
|
|
source_clip_duration_seconds: Duration of each source clip |
|
|
min_silence_ms: Minimum silence between clips |
|
|
max_extra_silence_per_gap_ms: Max extra silence per gap |
|
|
crossfade_within_source_ms: Crossfade for consecutive mode only |
|
|
|
|
|
Returns: |
|
|
Tuple of (final_audio, clip_sequence, metadata_dict) |
|
|
""" |
|
|
if ordering_mode == "consecutive": |
|
|
return build_consecutive_sources_for_count_task( |
|
|
source_audios, |
|
|
source_categories, |
|
|
target_duration_seconds, |
|
|
source_clip_duration_seconds, |
|
|
min_silence_ms, |
|
|
max_extra_silence_per_gap_ms, |
|
|
crossfade_within_source_ms |
|
|
) |
|
|
else: |
|
|
return build_random_order_for_count_task( |
|
|
source_audios, |
|
|
source_categories, |
|
|
target_duration_seconds, |
|
|
source_clip_duration_seconds, |
|
|
min_silence_ms, |
|
|
max_extra_silence_per_gap_ms |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_duration_slot_distribution( |
|
|
target_total_duration_s: float, |
|
|
effective_durations: Dict[str, float], |
|
|
target_category: str, |
|
|
question_type: str, |
|
|
multiplier_longest: float = 1.5, |
|
|
multiplier_shortest: float = 0.5, |
|
|
min_silence_between_sources_ms: int = 100 |
|
|
) -> Tuple[Dict[str, int], bool, Dict]: |
|
|
""" |
|
|
Calculate how many repetitions each source gets for duration task. |
|
|
|
|
|
For LONGEST: target gets max repetitions, backgrounds get 1 each |
|
|
For SHORTEST: target gets 1, backgrounds share remaining duration |
|
|
|
|
|
Args: |
|
|
target_total_duration_s: Target total audio duration |
|
|
effective_durations: Dict mapping category -> effective duration in seconds |
|
|
target_category: The category that should be longest/shortest |
|
|
question_type: "longest" or "shortest" |
|
|
multiplier_longest: target >= max_background * this |
|
|
multiplier_shortest: target <= min_background * this |
|
|
min_silence_between_sources_ms: Minimum silence between different sources |
|
|
|
|
|
Returns: |
|
|
Tuple of (slot_distribution, gap_satisfied, metadata) |
|
|
slot_distribution: Dict mapping category -> number of repetitions |
|
|
gap_satisfied: Whether the duration gap constraint is met |
|
|
metadata: Additional info about the calculation |
|
|
""" |
|
|
categories = list(effective_durations.keys()) |
|
|
n_sources = len(categories) |
|
|
|
|
|
if n_sources < 2: |
|
|
|
|
|
reps = max(1, int(target_total_duration_s / effective_durations[target_category])) |
|
|
return {target_category: reps}, True, {'note': 'single_source'} |
|
|
|
|
|
|
|
|
total_silence_s = (n_sources - 1) * min_silence_between_sources_ms / 1000.0 |
|
|
available_for_audio_s = target_total_duration_s - total_silence_s |
|
|
|
|
|
background_categories = [c for c in categories if c != target_category] |
|
|
|
|
|
if question_type == "longest": |
|
|
|
|
|
background_duration_s = sum(effective_durations[c] for c in background_categories) |
|
|
|
|
|
|
|
|
remaining_for_target_s = available_for_audio_s - background_duration_s |
|
|
target_duration_per_rep = effective_durations[target_category] |
|
|
|
|
|
|
|
|
target_reps = max(1, int(remaining_for_target_s / target_duration_per_rep)) |
|
|
actual_target_duration = target_reps * target_duration_per_rep |
|
|
|
|
|
|
|
|
max_background_duration = max(effective_durations[c] for c in background_categories) |
|
|
required_target_duration = max_background_duration * multiplier_longest |
|
|
gap_satisfied = actual_target_duration >= required_target_duration |
|
|
|
|
|
slot_distribution = {c: 1 for c in background_categories} |
|
|
slot_distribution[target_category] = target_reps |
|
|
|
|
|
metadata = { |
|
|
'available_for_audio_s': available_for_audio_s, |
|
|
'background_duration_s': background_duration_s, |
|
|
'remaining_for_target_s': remaining_for_target_s, |
|
|
'target_reps': target_reps, |
|
|
'actual_target_duration_s': actual_target_duration, |
|
|
'max_background_duration_s': max_background_duration, |
|
|
'required_target_duration_s': required_target_duration, |
|
|
'multiplier_used': multiplier_longest |
|
|
} |
|
|
|
|
|
else: |
|
|
|
|
|
target_duration_s = effective_durations[target_category] |
|
|
|
|
|
|
|
|
remaining_for_backgrounds_s = available_for_audio_s - target_duration_s |
|
|
|
|
|
|
|
|
|
|
|
slot_distribution = {target_category: 1} |
|
|
|
|
|
|
|
|
min_background_required = target_duration_s / multiplier_shortest |
|
|
|
|
|
background_reps = {} |
|
|
for cat in background_categories: |
|
|
eff_dur = effective_durations[cat] |
|
|
|
|
|
min_reps = max(1, int(min_background_required / eff_dur) + 1) |
|
|
background_reps[cat] = min_reps |
|
|
|
|
|
|
|
|
total_background_needed = sum( |
|
|
background_reps[c] * effective_durations[c] |
|
|
for c in background_categories |
|
|
) |
|
|
|
|
|
if total_background_needed <= remaining_for_backgrounds_s: |
|
|
|
|
|
extra_available = remaining_for_backgrounds_s - total_background_needed |
|
|
|
|
|
|
|
|
while extra_available > 0: |
|
|
added_any = False |
|
|
for cat in background_categories: |
|
|
eff_dur = effective_durations[cat] |
|
|
if extra_available >= eff_dur: |
|
|
background_reps[cat] += 1 |
|
|
extra_available -= eff_dur |
|
|
added_any = True |
|
|
if not added_any: |
|
|
break |
|
|
|
|
|
slot_distribution.update(background_reps) |
|
|
gap_satisfied = True |
|
|
else: |
|
|
|
|
|
slot_distribution.update(background_reps) |
|
|
gap_satisfied = False |
|
|
|
|
|
|
|
|
actual_durations = { |
|
|
cat: slot_distribution[cat] * effective_durations[cat] |
|
|
for cat in categories |
|
|
} |
|
|
min_background_actual = min( |
|
|
actual_durations[c] for c in background_categories |
|
|
) |
|
|
|
|
|
|
|
|
gap_satisfied = actual_durations[target_category] <= min_background_actual * multiplier_shortest |
|
|
|
|
|
metadata = { |
|
|
'available_for_audio_s': available_for_audio_s, |
|
|
'target_duration_s': target_duration_s, |
|
|
'remaining_for_backgrounds_s': remaining_for_backgrounds_s, |
|
|
'min_background_required_s': min_background_required, |
|
|
'actual_durations_s': actual_durations, |
|
|
'min_background_actual_s': min_background_actual, |
|
|
'multiplier_used': multiplier_shortest |
|
|
} |
|
|
|
|
|
return slot_distribution, gap_satisfied, metadata |
|
|
|
|
|
|
|
|
def build_duration_task_audio( |
|
|
source_audio_lists: Dict[str, List[AudioSegment]], |
|
|
slot_distribution: Dict[str, int], |
|
|
effective_durations: Dict[str, float], |
|
|
target_total_duration_s: float, |
|
|
min_silence_between_sources_ms: int = 100, |
|
|
max_extra_silence_per_gap_ms: int = 500, |
|
|
crossfade_within_source_ms: int = 50 |
|
|
) -> Tuple[AudioSegment, List[str], Dict]: |
|
|
""" |
|
|
Build audio for DURATION task with consecutive ordering per source. |
|
|
|
|
|
Structure: [SourceA × n] + silence + [SourceB × m] + silence + ... |
|
|
Order of sources is randomized to avoid patterns. |
|
|
|
|
|
Args: |
|
|
source_audio_lists: Dict mapping category -> list of audio segments |
|
|
slot_distribution: Dict mapping category -> number of repetitions |
|
|
effective_durations: Dict mapping category -> effective duration per clip |
|
|
target_total_duration_s: Target total duration |
|
|
min_silence_between_sources_ms: Min silence between different sources |
|
|
max_extra_silence_per_gap_ms: Max extra silence per gap |
|
|
crossfade_within_source_ms: Crossfade between same-source repetitions |
|
|
|
|
|
Returns: |
|
|
Tuple of (final_audio, category_sequence, metadata) |
|
|
""" |
|
|
categories = list(slot_distribution.keys()) |
|
|
|
|
|
|
|
|
random.shuffle(categories) |
|
|
|
|
|
|
|
|
source_blocks = [] |
|
|
category_sequence = [] |
|
|
actual_durations = {} |
|
|
block_durations_ms = [] |
|
|
|
|
|
for category in categories: |
|
|
reps = slot_distribution[category] |
|
|
audio_list = source_audio_lists[category] |
|
|
|
|
|
if reps == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
block = audio_list[0] |
|
|
for i in range(1, reps): |
|
|
|
|
|
next_clip = audio_list[i % len(audio_list)] |
|
|
|
|
|
|
|
|
if crossfade_within_source_ms > 0: |
|
|
if len(block) > crossfade_within_source_ms and len(next_clip) > crossfade_within_source_ms: |
|
|
block = block.append(next_clip, crossfade=crossfade_within_source_ms) |
|
|
else: |
|
|
block = block + next_clip |
|
|
else: |
|
|
block = block + next_clip |
|
|
|
|
|
source_blocks.append((category, block)) |
|
|
block_durations_ms.append(len(block)) |
|
|
category_sequence.extend([category] * reps) |
|
|
actual_durations[category] = len(block) / 1000.0 |
|
|
|
|
|
|
|
|
total_audio_ms = sum(len(block) for _, block in source_blocks) |
|
|
num_gaps = len(source_blocks) - 1 |
|
|
min_total_silence_ms = num_gaps * min_silence_between_sources_ms |
|
|
|
|
|
target_ms = int(target_total_duration_s * 1000) |
|
|
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms |
|
|
|
|
|
|
|
|
if available_extra_ms > 0 and num_gaps > 0: |
|
|
extra_silences = distribute_remainder_as_silences( |
|
|
available_extra_ms, |
|
|
num_gaps, |
|
|
max_extra_silence_per_gap_ms |
|
|
) |
|
|
else: |
|
|
extra_silences = [0] * max(num_gaps, 1) |
|
|
|
|
|
|
|
|
source_timestamps = [] |
|
|
current_position_ms = 0 |
|
|
|
|
|
if len(source_blocks) == 1: |
|
|
final_audio = source_blocks[0][1] |
|
|
cat, block = source_blocks[0] |
|
|
source_timestamps.append((cat, 0, len(block))) |
|
|
else: |
|
|
final_audio = source_blocks[0][1] |
|
|
cat, block = source_blocks[0] |
|
|
source_timestamps.append((cat, 0, len(block))) |
|
|
current_position_ms = len(block) |
|
|
|
|
|
for i, (cat, block) in enumerate(source_blocks[1:]): |
|
|
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i] |
|
|
silence = AudioSegment.silent(duration=gap_silence_ms) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
crossfade_ms = min(500, gap_silence_ms) |
|
|
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms and len(final_audio) > crossfade_ms and len(block) > crossfade_ms: |
|
|
final_audio = final_audio.append(silence, crossfade=crossfade_ms) |
|
|
|
|
|
final_audio = final_audio.append(block, crossfade=0) |
|
|
|
|
|
start_ms = current_position_ms + gap_silence_ms |
|
|
end_ms = start_ms + len(block) |
|
|
source_timestamps.append((cat, start_ms, end_ms)) |
|
|
current_position_ms = end_ms |
|
|
else: |
|
|
|
|
|
final_audio = final_audio + silence + block |
|
|
start_ms = current_position_ms + gap_silence_ms |
|
|
end_ms = start_ms + len(block) |
|
|
source_timestamps.append((cat, start_ms, end_ms)) |
|
|
current_position_ms = end_ms |
|
|
|
|
|
|
|
|
if len(final_audio) > target_ms: |
|
|
final_audio = final_audio[:target_ms] |
|
|
elif len(final_audio) < target_ms: |
|
|
padding = AudioSegment.silent(duration=target_ms - len(final_audio)) |
|
|
final_audio = final_audio + padding |
|
|
|
|
|
|
|
|
timestamp_parts = [] |
|
|
for cat, start_ms, end_ms in source_timestamps: |
|
|
start_s = round(start_ms / 1000.0, 2) |
|
|
end_s = round(end_ms / 1000.0, 2) |
|
|
duration_s = round((end_ms - start_ms) / 1000.0, 2) |
|
|
timestamp_parts.append(f"{cat} {start_s}s-{end_s}s ({duration_s}s)") |
|
|
timestamp_string = ", ".join(timestamp_parts) |
|
|
|
|
|
metadata = { |
|
|
'source_order': [cat for cat, _ in source_blocks], |
|
|
'slot_distribution': slot_distribution, |
|
|
'actual_durations_s': actual_durations, |
|
|
'total_audio_ms': total_audio_ms, |
|
|
'num_gaps': num_gaps, |
|
|
'final_duration_ms': len(final_audio), |
|
|
'source_timestamps': source_timestamps, |
|
|
'timestamp_string': timestamp_string |
|
|
} |
|
|
|
|
|
logger.debug( |
|
|
f"Duration task audio: {len(source_blocks)} sources, " |
|
|
f"order={metadata['source_order']}, duration={len(final_audio)}ms" |
|
|
) |
|
|
|
|
|
return final_audio, category_sequence, metadata |
|
|
|