File size: 53,139 Bytes
fec9168 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 |
"""
Audio processing utilities for temporal reasoning dataset generation.
"""
import os
import random
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
from pydub import AudioSegment
try:
import pyloudnorm as pyln
PYLOUDNORM_AVAILABLE = True
except ImportError:
PYLOUDNORM_AVAILABLE = False
from .logger import setup_logger
logger = setup_logger(__name__)
def get_lufs_loudness(audio: AudioSegment) -> float:
"""
Calculate integrated LUFS loudness (perceived loudness) of an audio segment.
LUFS (Loudness Units Full Scale) is the broadcast standard for measuring
perceived loudness. It accounts for human hearing sensitivity to different
frequencies using K-weighting.
Args:
audio: Input audio segment (pydub AudioSegment)
Returns:
Loudness in LUFS (negative values, typically -70 to 0)
Returns dBFS if pyloudnorm is not available (fallback)
"""
if not PYLOUDNORM_AVAILABLE:
logger.warning("pyloudnorm not available, falling back to dBFS")
return audio.dBFS
# Convert pydub AudioSegment to numpy array
samples = np.array(audio.get_array_of_samples())
# Handle stereo by reshaping
if audio.channels == 2:
samples = samples.reshape((-1, 2))
# Normalize to float [-1, 1]
if audio.sample_width == 1:
samples = samples.astype(np.float64) / 128.0 - 1.0
elif audio.sample_width == 2:
samples = samples.astype(np.float64) / 32768.0
elif audio.sample_width == 4:
samples = samples.astype(np.float64) / 2147483648.0
else:
samples = samples.astype(np.float64) / 32768.0 # default to 16-bit
# Create meter with sample rate
meter = pyln.Meter(audio.frame_rate)
# Measure integrated loudness
try:
loudness = meter.integrated_loudness(samples)
# Handle -inf for silent audio
if np.isinf(loudness):
loudness = -70.0 # Return very quiet value instead of -inf
return loudness
except Exception as e:
logger.warning(f"LUFS measurement failed: {e}, falling back to dBFS")
return audio.dBFS
def normalize_to_lufs(audio: AudioSegment, target_lufs: float = -23.0) -> AudioSegment:
"""
Normalize audio to a target LUFS level (perceived loudness normalization).
This is superior to dBFS normalization for comparing different sound types
because it accounts for human hearing sensitivity.
Args:
audio: Input audio segment
target_lufs: Target loudness level in LUFS (default: -23 LUFS, EBU R128 standard)
Returns:
Loudness-normalized audio segment
"""
if not PYLOUDNORM_AVAILABLE:
logger.warning("pyloudnorm not available, falling back to dBFS normalization")
change_db = target_lufs - audio.dBFS
return audio.apply_gain(change_db)
current_lufs = get_lufs_loudness(audio)
# Calculate required gain change
gain_db = target_lufs - current_lufs
# Apply gain
normalized = audio.apply_gain(gain_db)
logger.debug(f"Normalized LUFS: {current_lufs:.2f} -> {get_lufs_loudness(normalized):.2f} LUFS")
return normalized
class AudioProcessor:
"""Handles audio loading, processing, and concatenation."""
def __init__(
self,
crossfade_duration: int = 500,
silence_duration: int = 1000,
with_silence: bool = True,
normalize: bool = False,
normalize_target_dBFS: float = -20.0,
synthetic_silence_path: Optional[str] = None
):
"""
Initialize the audio processor.
Args:
crossfade_duration: Duration of crossfade in milliseconds
silence_duration: Duration of silence between clips in milliseconds
with_silence: Whether to add silence between clips
normalize: Whether to normalize audio levels
normalize_target_dBFS: Target dBFS level for normalization
synthetic_silence_path: Path to synthetic silence audio files
"""
self.crossfade_duration = crossfade_duration
self.silence_duration = silence_duration
self.with_silence = with_silence
self.normalize = normalize
self.normalize_target_dBFS = normalize_target_dBFS
self.synthetic_silence_path = synthetic_silence_path
self._silence_cache = {}
def load_audio(self, audio_path: str) -> AudioSegment:
"""
Load an audio file.
Args:
audio_path: Path to the audio file
Returns:
Loaded audio segment
"""
try:
audio = AudioSegment.from_file(audio_path, format="wav")
logger.debug(f"Loaded audio: {audio_path}, duration: {len(audio)}ms")
return audio
except Exception as e:
logger.error(f"Error loading audio {audio_path}: {e}")
raise
def normalize_audio(self, audio: AudioSegment, target_dBFS: Optional[float] = None) -> AudioSegment:
"""
Normalize audio to a target dBFS level.
Args:
audio: Input audio segment
target_dBFS: Target dBFS level (uses default if None)
Returns:
Normalized audio segment
"""
if target_dBFS is None:
target_dBFS = self.normalize_target_dBFS
change_in_dBFS = target_dBFS - audio.dBFS
normalized = audio.apply_gain(change_in_dBFS)
logger.debug(f"Normalized audio: {audio.dBFS:.2f} dBFS -> {normalized.dBFS:.2f} dBFS")
return normalized
def adjust_volume(self, audio: AudioSegment, volume_db: float) -> AudioSegment:
"""
Adjust audio volume by a specific dB amount.
Args:
audio: Input audio segment
volume_db: Volume adjustment in dB (positive = louder, negative = quieter)
Returns:
Volume-adjusted audio segment
"""
adjusted = audio.apply_gain(volume_db)
logger.debug(f"Adjusted volume by {volume_db} dB: {audio.dBFS:.2f} -> {adjusted.dBFS:.2f} dBFS")
return adjusted
def get_silence(self, duration: Optional[int] = None) -> AudioSegment:
"""
Get a silence audio segment, using synthetic silence if available.
Args:
duration: Duration in milliseconds (uses default if None)
Returns:
Silence audio segment
"""
if duration is None:
duration = self.silence_duration
# Check cache first
if duration in self._silence_cache:
return self._silence_cache[duration]
# Try to load synthetic silence
if self.synthetic_silence_path and os.path.exists(self.synthetic_silence_path):
silence_files = list(Path(self.synthetic_silence_path).glob("*.wav"))
if silence_files:
silence = self.load_audio(str(random.choice(silence_files)))
# Adjust duration if needed
if len(silence) < duration:
# Repeat the silence
repetitions = (duration // len(silence)) + 1
silence = silence * repetitions
silence = silence[:duration]
self._silence_cache[duration] = silence
logger.debug(f"Using synthetic silence: {duration}ms")
return silence
# Fall back to pure silence
silence = AudioSegment.silent(duration=duration)
self._silence_cache[duration] = silence
logger.debug(f"Using pure silence: {duration}ms")
return silence
def concatenate_audios(
self,
audio_list: List[AudioSegment],
normalize_each: bool = False,
volume_adjustments: Optional[List[float]] = None
) -> AudioSegment:
"""
Concatenate multiple audio segments with crossfade and optional silence.
Args:
audio_list: List of audio segments to concatenate
normalize_each: Whether to normalize each audio before concatenation
volume_adjustments: Optional list of volume adjustments (in dB) for each audio
Returns:
Concatenated audio segment
"""
if not audio_list:
raise ValueError("audio_list cannot be empty")
if len(audio_list) == 1:
audio = audio_list[0]
if normalize_each and self.normalize:
audio = self.normalize_audio(audio)
if volume_adjustments and len(volume_adjustments) > 0:
audio = self.adjust_volume(audio, volume_adjustments[0])
return audio
# Process first audio
merged = audio_list[0]
if normalize_each and self.normalize:
merged = self.normalize_audio(merged)
if volume_adjustments and len(volume_adjustments) > 0:
merged = self.adjust_volume(merged, volume_adjustments[0])
# Concatenate remaining audios
for i, audio in enumerate(audio_list[1:], start=1):
# Process current audio
current = audio
if normalize_each and self.normalize:
current = self.normalize_audio(current)
if volume_adjustments and len(volume_adjustments) > i:
current = self.adjust_volume(current, volume_adjustments[i])
# Add silence if configured
if self.with_silence:
silence = self.get_silence()
# Crossfade between audio and silence for smooth transition
merged = merged.append(silence, crossfade=self.crossfade_duration)
# Append current audio WITHOUT crossfade to avoid cutting it
# The crossfade with silence already provides smooth transition
merged = merged.append(current, crossfade=0)
logger.debug(f"Concatenated {len(audio_list)} audio segments, total duration: {len(merged)}ms")
return merged
def concatenate_audio_files(
self,
audio_paths: List[str],
output_path: str,
normalize_each: bool = False,
volume_adjustments: Optional[List[float]] = None,
target_durations: Optional[List[float]] = None
) -> Tuple[AudioSegment, dict]:
"""
Load, concatenate, and save multiple audio files.
Args:
audio_paths: List of paths to audio files
output_path: Path to save the concatenated audio
normalize_each: Whether to normalize each audio before concatenation
volume_adjustments: Optional list of volume adjustments (in dB) for each audio
target_durations: Optional list of target durations (in seconds) for each clip
Returns:
Tuple of (concatenated audio segment, metadata dict)
"""
# Load all audio files
audio_segments = []
for i, path in enumerate(audio_paths):
audio = self.load_audio(path)
# Adjust duration if specified
if target_durations and i < len(target_durations):
target_ms = int(target_durations[i] * 1000)
audio = trim_or_repeat_audio(audio, target_ms)
logger.debug(f"Adjusted clip {i} to {len(audio)}ms (target: {target_ms}ms)")
audio_segments.append(audio)
# Concatenate
merged = self.concatenate_audios(audio_segments, normalize_each, volume_adjustments)
# Save
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
merged.export(str(output_path), format="wav")
logger.info(f"Saved concatenated audio: {output_path}")
# Create metadata
metadata = {
"output_path": str(output_path),
"source_files": audio_paths,
"num_sources": len(audio_paths),
"total_duration_ms": len(merged),
"total_duration_s": len(merged) / 1000.0,
"individual_durations_ms": [len(a) for a in audio_segments],
"individual_durations_s": [len(a) / 1000.0 for a in audio_segments],
"target_durations_s": target_durations if target_durations else [],
"volume_adjustments_db": volume_adjustments if volume_adjustments else []
}
return merged, metadata
def generate_sample_durations_for_task(
task_duration_hours: float,
min_clip_duration: float,
max_clip_duration: float
) -> list:
"""
Generate sample durations that exactly fill the target task duration.
Algorithm:
1. Start with remaining = total_seconds
2. While remaining >= min_clip_duration:
- Sample d ~ Uniform(min, min(max, remaining))
- Append d to durations list
- Subtract d from remaining
3. Return shuffled list of durations
This ensures:
- Total of all durations ≈ task_duration (within min_clip_duration tolerance)
- Each duration is uniformly sampled within valid range
- No overshoot of target duration
Args:
task_duration_hours: Total duration for the task in hours
min_clip_duration: Minimum duration per clip in seconds
max_clip_duration: Maximum duration per clip in seconds
Returns:
List of sample durations in seconds (shuffled)
"""
task_duration_seconds = task_duration_hours * 3600
remaining = task_duration_seconds
durations = []
while remaining >= min_clip_duration:
# Cap max at remaining to avoid overshoot
effective_max = min(max_clip_duration, remaining)
# If remaining is less than min, we can't fit another sample
if effective_max < min_clip_duration:
break
# Sample uniformly within valid range
d = random.uniform(min_clip_duration, effective_max)
durations.append(d)
remaining -= d
# Shuffle to randomize order (durations were generated sequentially)
random.shuffle(durations)
total_duration = sum(durations)
logger.info(f"Task duration target: {task_duration_hours}h ({task_duration_seconds:.1f}s)")
logger.info(f"Generated {len(durations)} sample durations, total: {total_duration:.1f}s")
logger.info(f"Duration range: [{min(durations):.1f}s, {max(durations):.1f}s], "
f"mean: {total_duration/len(durations):.1f}s")
logger.info(f"Unused remainder: {remaining:.1f}s ({remaining/task_duration_seconds*100:.2f}%)")
return durations
def calculate_num_samples_for_task(
task_duration_hours: float,
min_clip_duration: float,
max_clip_duration: float
) -> int:
"""
Calculate number of samples needed to fill the task duration.
DEPRECATED: Use generate_sample_durations_for_task() instead for exact duration filling.
This function is kept for backward compatibility but uses average-based estimation.
Args:
task_duration_hours: Total duration for the task in hours
min_clip_duration: Minimum duration per clip in seconds
max_clip_duration: Maximum duration per clip in seconds
Returns:
Number of samples to generate (estimate)
"""
task_duration_seconds = task_duration_hours * 3600
avg_clip_duration = (min_clip_duration + max_clip_duration) / 2
num_samples = int(task_duration_seconds / avg_clip_duration)
logger.info(f"Task duration: {task_duration_hours}h ({task_duration_seconds}s)")
logger.info(f"Avg clip duration: {avg_clip_duration}s (min: {min_clip_duration}s, max: {max_clip_duration}s)")
logger.info(f"Calculated number of samples: {num_samples}")
return max(1, num_samples) # At least 1 sample
def generate_single_clip_duration(
min_duration: float,
max_duration: float
) -> float:
"""
Generate a random clip duration between min and max.
Args:
min_duration: Minimum duration in seconds
max_duration: Maximum duration in seconds
Returns:
Random duration in seconds
"""
return random.uniform(min_duration, max_duration)
def concatenate_to_target_duration(
base_audio: AudioSegment,
target_duration_seconds: float,
crossfade_ms: int = 0
) -> AudioSegment:
"""
Concatenate a base audio clip to reach target duration.
This takes a 5-second ESC-50 clip and repeats it to create a longer clip.
Args:
base_audio: Original 5s audio segment
target_duration_seconds: Target duration in seconds
crossfade_ms: Crossfade between repetitions in milliseconds
Returns:
Audio segment of target duration
"""
target_duration_ms = int(target_duration_seconds * 1000)
base_duration_ms = len(base_audio)
if target_duration_ms <= base_duration_ms:
# Just trim if target is shorter
return base_audio[:target_duration_ms]
# Calculate number of repetitions needed
num_repetitions = (target_duration_ms // base_duration_ms) + 1
# Concatenate with crossfade
result = base_audio
for i in range(1, num_repetitions):
if crossfade_ms > 0:
result = result.append(base_audio, crossfade=crossfade_ms)
else:
result = result + base_audio
# Stop if we've reached target
if len(result) >= target_duration_ms:
break
# Trim to exact duration
return result[:target_duration_ms]
def set_random_seed(seed: int):
"""Set random seed for reproducibility."""
random.seed(seed)
np.random.seed(seed)
logger.info(f"Random seed set to: {seed}")
def get_max_clip_num_to_be_joined(
target_duration_seconds: float,
source_clip_duration_seconds: float,
min_silence_ms: int = 100
) -> Tuple[int, float]:
"""
Calculate the maximum number of source clips needed to reach target duration.
Pipeline: pick dataset -> pick class -> pick audio clip -> get duration ->
concatenate clips to reach target duration -> modulo to get num clips ->
inserting silences randomly based on remainder.
Args:
target_duration_seconds: Target total duration in seconds
source_clip_duration_seconds: Duration of each source clip (e.g., 5s for ESC-50)
min_silence_ms: Minimum silence between clips in milliseconds
Returns:
Tuple of (num_clips_needed, remainder_seconds_for_silences)
- num_clips_needed: How many source clips to concatenate
- remainder_seconds_for_silences: Extra time to distribute as random silences
Example:
target=30s, source=5s -> (6, 0.0) - exactly 6 clips, no extra silence
target=32s, source=5s -> (6, 2.0) - 6 clips + 2s distributed as silences
"""
target_ms = target_duration_seconds * 1000
source_ms = source_clip_duration_seconds * 1000
# Account for minimum silence between each pair of clips
# If we have N clips, we have (N-1) gaps for silence
# Each gap needs at least min_silence_ms
# Start by computing raw number of clips (floor division)
num_clips = int(target_ms // source_ms)
num_clips = max(1, num_clips) # At least 1 clip
# Total audio content from clips
clips_duration_ms = num_clips * source_ms
# Minimum required silence for gaps
num_gaps = max(0, num_clips - 1)
min_total_silence_ms = num_gaps * min_silence_ms
# Check if we need to reduce clips to fit silences
while num_clips > 1 and (clips_duration_ms + min_total_silence_ms) > target_ms:
num_clips -= 1
clips_duration_ms = num_clips * source_ms
num_gaps = num_clips - 1
min_total_silence_ms = num_gaps * min_silence_ms
# Calculate remainder for extra silences
remainder_ms = target_ms - clips_duration_ms - min_total_silence_ms
remainder_seconds = max(0, remainder_ms / 1000.0)
logger.debug(
f"get_max_clip_num: target={target_duration_seconds}s, source={source_clip_duration_seconds}s "
f"-> {num_clips} clips, {remainder_seconds:.3f}s remainder for extra silences"
)
return num_clips, remainder_seconds
def build_clip_sequence_with_silences(
audio_segments: List[AudioSegment],
target_duration_seconds: float,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_ms: int = 0
) -> AudioSegment:
"""
Build a final audio clip by concatenating segments with guaranteed silences.
Ensures:
1. All clips are joined with at least min_silence_ms between them
2. Any remainder duration is distributed as random extra silences in gaps
3. Final duration matches target_duration_seconds exactly
Args:
audio_segments: List of audio segments to concatenate
target_duration_seconds: Target total duration in seconds
min_silence_ms: Minimum silence between each pair of clips (always inserted)
max_extra_silence_per_gap_ms: Maximum extra silence to add per gap
crossfade_ms: Crossfade duration in ms (applied when joining)
Returns:
Concatenated audio segment of exact target duration
"""
if not audio_segments:
raise ValueError("audio_segments cannot be empty")
target_ms = int(target_duration_seconds * 1000)
if len(audio_segments) == 1:
# Single clip: just trim/repeat to target
audio = audio_segments[0]
if len(audio) >= target_ms:
return audio[:target_ms]
else:
# Repeat to reach target
return concatenate_to_target_duration(audio, target_duration_seconds, crossfade_ms)
# Calculate total audio content duration
total_audio_ms = sum(len(seg) for seg in audio_segments)
num_gaps = len(audio_segments) - 1
# Minimum silence needed
min_total_silence_ms = num_gaps * min_silence_ms
# Available time for extra silences
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms
if available_extra_ms < 0:
# Not enough room - need to trim clips
logger.warning(
f"Clips too long for target duration. Total audio: {total_audio_ms}ms, "
f"target: {target_ms}ms. Will trim final result."
)
available_extra_ms = 0
# Distribute extra silence randomly across gaps
extra_silences_ms = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
# Build the final audio
result = audio_segments[0]
for i, audio in enumerate(audio_segments[1:]):
# Calculate total silence for this gap
gap_silence_ms = min_silence_ms + extra_silences_ms[i]
# Add silence
silence = AudioSegment.silent(duration=gap_silence_ms)
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms:
# Crossfade audio->silence for smooth transition, but NOT silence->audio
result = result.append(silence, crossfade=crossfade_ms)
result = result.append(audio, crossfade=0) # No crossfade to avoid cutting audio
else:
result = result + silence + audio
# Trim to exact target duration
if len(result) > target_ms:
result = result[:target_ms]
elif len(result) < target_ms:
# Pad with silence if slightly short
padding = AudioSegment.silent(duration=target_ms - len(result))
result = result + padding
logger.debug(
f"Built clip sequence: {len(audio_segments)} segments, "
f"final duration: {len(result)}ms (target: {target_ms}ms)"
)
return result
def distribute_remainder_as_silences(
remainder_ms: float,
num_gaps: int,
max_per_gap_ms: int = 500
) -> List[int]:
"""
Distribute remainder time as random silences across gaps.
Args:
remainder_ms: Total extra time to distribute (in ms)
num_gaps: Number of gaps between clips
max_per_gap_ms: Maximum extra silence per gap
Returns:
List of extra silence durations (in ms) for each gap
"""
if num_gaps <= 0:
return []
remainder_ms = int(max(0, remainder_ms))
if remainder_ms == 0:
return [0] * num_gaps
# Generate random weights for distribution
weights = [random.random() for _ in range(num_gaps)]
total_weight = sum(weights)
if total_weight == 0:
# Fallback to uniform distribution
weights = [1.0] * num_gaps
total_weight = num_gaps
# Distribute proportionally, respecting max_per_gap
extra_silences = []
remaining = remainder_ms
for i, w in enumerate(weights):
if i == num_gaps - 1:
# Last gap gets whatever is left
extra = min(remaining, max_per_gap_ms)
else:
proportion = w / total_weight
extra = int(remainder_ms * proportion)
extra = min(extra, max_per_gap_ms, remaining)
extra_silences.append(extra)
remaining -= extra
total_weight -= w
# If there's still remainder (due to max_per_gap limits), do another pass
while remaining > 0:
for i in range(num_gaps):
if extra_silences[i] < max_per_gap_ms and remaining > 0:
add = min(remaining, max_per_gap_ms - extra_silences[i])
extra_silences[i] += add
remaining -= add
if remaining > 0:
# Can't distribute more (all gaps at max)
break
logger.debug(f"Distributed {remainder_ms}ms across {num_gaps} gaps: {extra_silences}")
return extra_silences
def repeat_clips_to_fill_duration(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100
) -> Tuple[List[AudioSegment], List[str], int]:
"""
Repeat source clips to fill target duration, cycling through all sources.
This ensures all unique sources appear and are repeated proportionally.
Args:
source_audios: List of unique source audio segments
source_categories: List of category names corresponding to source_audios
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between clips
Returns:
Tuple of (expanded_audio_list, expanded_categories, num_clips)
"""
num_clips, remainder = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms
)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Build expanded lists by cycling through sources
expanded_audios = []
expanded_categories = []
for i in range(num_clips):
idx = i % num_sources
expanded_audios.append(source_audios[idx])
expanded_categories.append(source_categories[idx])
logger.debug(
f"Repeated {num_sources} sources to {num_clips} clips for "
f"{target_duration_seconds}s target duration"
)
return expanded_audios, expanded_categories, num_clips
def build_consecutive_sources_for_count_task(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_between_sources_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with consecutive same-class clips.
For count task, same-class clips must be consecutive (AAA BBB CCC) so they
are perceived as ONE sound source. Silences are only inserted BETWEEN
different classes, not within same-class repetitions.
Pipeline: pick classes -> for each class concatenate clips consecutively ->
insert silences only between different classes -> distribute remainder
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_between_sources_ms: Minimum silence between different sources
max_extra_silence_per_gap_ms: Max extra silence per gap for remainder distribution
crossfade_within_source_ms: Small crossfade within same-source repetitions
Returns:
Tuple of (final_audio, category_sequence, metadata_dict)
"""
target_ms = int(target_duration_seconds * 1000)
source_ms = int(source_clip_duration_seconds * 1000)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Calculate total clips needed
num_clips, remainder_seconds = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_between_sources_ms
)
# Safety check: if more sources than clips can fit, warn
if num_sources > num_clips:
logger.warning(
f"More sources ({num_sources}) than clips that fit ({num_clips}). "
f"Each source needs at least 1 clip, so output may exceed target duration. "
f"Consider capping n_unique_audios <= max_clips in task_count.py"
)
# Each source gets exactly 1 rep if there are more sources than clips
num_clips = num_sources # This will exceed target but ensures each source is included
# Distribute clips across sources as evenly as possible
# Each source gets at least 1 clip since num_sources <= num_clips
base_reps = num_clips // num_sources
extra_reps = num_clips % num_sources
repetitions_per_source = []
for i in range(num_sources):
reps = base_reps + (1 if i < extra_reps else 0)
repetitions_per_source.append(reps)
# Shuffle repetition assignment to add variety
random.shuffle(repetitions_per_source)
# Build each source's audio block (consecutive clips of same class)
source_blocks = []
category_sequence = []
for i, (audio, category, reps) in enumerate(zip(source_audios, source_categories, repetitions_per_source)):
if reps == 0:
continue
# Concatenate same-source clips with minimal/no gap (just small crossfade)
block = audio
for _ in range(reps - 1):
if crossfade_within_source_ms > 0:
block = block.append(audio, crossfade=crossfade_within_source_ms)
else:
block = block + audio
source_blocks.append(block)
category_sequence.append(category)
# Now we have N source blocks, need to join them with silences
# Number of gaps = num_source_blocks - 1
num_gaps = len(source_blocks) - 1
if num_gaps <= 0:
# Only one source block
final_audio = source_blocks[0]
else:
# Calculate total audio duration from blocks
total_blocks_ms = sum(len(block) for block in source_blocks)
min_total_silence_ms = num_gaps * min_silence_between_sources_ms
# Available for extra silences
available_extra_ms = target_ms - total_blocks_ms - min_total_silence_ms
available_extra_ms = max(0, available_extra_ms)
# Distribute extra silence across gaps
extra_silences = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
# Build final audio with silences between source blocks
final_audio = source_blocks[0]
for i, block in enumerate(source_blocks[1:]):
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i]
silence = AudioSegment.silent(duration=gap_silence_ms)
final_audio = final_audio + silence + block
# Trim or pad to exact target duration
if len(final_audio) > target_ms:
final_audio = final_audio[:target_ms]
elif len(final_audio) < target_ms:
padding = AudioSegment.silent(duration=target_ms - len(final_audio))
final_audio = final_audio + padding
# Create metadata
metadata = {
'num_unique_sources': num_sources,
'total_clips': num_clips,
'ordering_mode': 'consecutive',
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)),
'target_duration_ms': target_ms,
'actual_duration_ms': len(final_audio),
'num_gaps_between_sources': num_gaps
}
logger.debug(
f"Count task (consecutive): {num_sources} sources, {num_clips} total clips, "
f"reps={repetitions_per_source}, duration={len(final_audio)}ms"
)
return final_audio, category_sequence, metadata
def build_random_order_for_count_task(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with RANDOM ordering of clips.
Clips from different sources are shuffled randomly (A B A C B A C...).
This tests whether the model can recognize recurring sounds as the same source.
Silences are inserted between ALL clips (same or different source).
Pipeline:
1. Calculate total clips needed
2. Distribute clips across sources
3. Create expanded list with all clip instances
4. Shuffle randomly
5. Insert silences between ALL clips
6. Distribute remainder as extra random silences
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between ALL clips
max_extra_silence_per_gap_ms: Max extra silence per gap
Returns:
Tuple of (final_audio, clip_sequence, metadata_dict)
"""
target_ms = int(target_duration_seconds * 1000)
source_ms = int(source_clip_duration_seconds * 1000)
num_sources = len(source_audios)
if num_sources == 0:
raise ValueError("source_audios cannot be empty")
# Calculate total clips needed
num_clips, remainder_seconds = get_max_clip_num_to_be_joined(
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms
)
# Safety check: if more sources than clips can fit, warn and cap sources
if num_sources > num_clips:
logger.warning(
f"More sources ({num_sources}) than clips that fit ({num_clips}). "
f"Each source needs at least 1 clip, so output may exceed target duration. "
f"Consider capping n_unique_audios <= max_clips in task_count.py"
)
# Each source gets exactly 1 rep if there are more sources than clips
num_clips = num_sources # This will exceed target but ensures each source is included
# Distribute clips across sources as evenly as possible
base_reps = num_clips // num_sources # At least 1 since num_sources <= num_clips (after cap)
extra_reps = num_clips % num_sources
repetitions_per_source = []
for i in range(num_sources):
reps = base_reps + (1 if i < extra_reps else 0)
repetitions_per_source.append(reps)
# Build expanded list of (audio, category) pairs
expanded_clips = []
for audio, category, reps in zip(source_audios, source_categories, repetitions_per_source):
for _ in range(reps):
expanded_clips.append((audio, category))
# Shuffle the clips randomly
random.shuffle(expanded_clips)
# Extract shuffled audios and categories
shuffled_audios = [clip[0] for clip in expanded_clips]
clip_sequence = [clip[1] for clip in expanded_clips]
# Build final audio with silences between ALL clips
final_audio = build_clip_sequence_with_silences(
shuffled_audios,
target_duration_seconds,
min_silence_ms=min_silence_ms,
max_extra_silence_per_gap_ms=max_extra_silence_per_gap_ms,
crossfade_ms=0 # No crossfade for random ordering
)
# Create metadata
metadata = {
'num_unique_sources': num_sources,
'total_clips': len(expanded_clips),
'ordering_mode': 'random',
'repetitions_per_source': dict(zip(source_categories, repetitions_per_source)),
'clip_sequence': clip_sequence,
'target_duration_ms': target_ms,
'actual_duration_ms': len(final_audio),
'num_gaps': len(expanded_clips) - 1
}
logger.debug(
f"Count task (random): {num_sources} sources, {len(expanded_clips)} clips, "
f"sequence={clip_sequence[:5]}..., duration={len(final_audio)}ms"
)
return final_audio, clip_sequence, metadata
def build_count_task_audio(
source_audios: List[AudioSegment],
source_categories: List[str],
target_duration_seconds: float,
ordering_mode: str = "random",
source_clip_duration_seconds: float = 5.0,
min_silence_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], dict]:
"""
Build audio for COUNT task with configurable ordering mode.
Args:
source_audios: List of unique source audio segments (one per class)
source_categories: List of category names
target_duration_seconds: Target total duration
ordering_mode: "random" or "consecutive"
- "random": Clips shuffled (A B A C B A C) - tests sound recognition
- "consecutive": Same-source grouped (AAA BBB CCC) - easier
source_clip_duration_seconds: Duration of each source clip
min_silence_ms: Minimum silence between clips
max_extra_silence_per_gap_ms: Max extra silence per gap
crossfade_within_source_ms: Crossfade for consecutive mode only
Returns:
Tuple of (final_audio, clip_sequence, metadata_dict)
"""
if ordering_mode == "consecutive":
return build_consecutive_sources_for_count_task(
source_audios,
source_categories,
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms,
max_extra_silence_per_gap_ms,
crossfade_within_source_ms
)
else: # random (default)
return build_random_order_for_count_task(
source_audios,
source_categories,
target_duration_seconds,
source_clip_duration_seconds,
min_silence_ms,
max_extra_silence_per_gap_ms
)
# =============================================================================
# DURATION TASK FUNCTIONS
# =============================================================================
def calculate_duration_slot_distribution(
target_total_duration_s: float,
effective_durations: Dict[str, float],
target_category: str,
question_type: str,
multiplier_longest: float = 1.5,
multiplier_shortest: float = 0.5,
min_silence_between_sources_ms: int = 100
) -> Tuple[Dict[str, int], bool, Dict]:
"""
Calculate how many repetitions each source gets for duration task.
For LONGEST: target gets max repetitions, backgrounds get 1 each
For SHORTEST: target gets 1, backgrounds share remaining duration
Args:
target_total_duration_s: Target total audio duration
effective_durations: Dict mapping category -> effective duration in seconds
target_category: The category that should be longest/shortest
question_type: "longest" or "shortest"
multiplier_longest: target >= max_background * this
multiplier_shortest: target <= min_background * this
min_silence_between_sources_ms: Minimum silence between different sources
Returns:
Tuple of (slot_distribution, gap_satisfied, metadata)
slot_distribution: Dict mapping category -> number of repetitions
gap_satisfied: Whether the duration gap constraint is met
metadata: Additional info about the calculation
"""
categories = list(effective_durations.keys())
n_sources = len(categories)
if n_sources < 2:
# Single source - always satisfies constraint
reps = max(1, int(target_total_duration_s / effective_durations[target_category]))
return {target_category: reps}, True, {'note': 'single_source'}
# Total silence between sources
total_silence_s = (n_sources - 1) * min_silence_between_sources_ms / 1000.0
available_for_audio_s = target_total_duration_s - total_silence_s
background_categories = [c for c in categories if c != target_category]
if question_type == "longest":
# Backgrounds get 1 rep each
background_duration_s = sum(effective_durations[c] for c in background_categories)
# Remaining for target
remaining_for_target_s = available_for_audio_s - background_duration_s
target_duration_per_rep = effective_durations[target_category]
# Calculate reps for target
target_reps = max(1, int(remaining_for_target_s / target_duration_per_rep))
actual_target_duration = target_reps * target_duration_per_rep
# Verify gap
max_background_duration = max(effective_durations[c] for c in background_categories)
required_target_duration = max_background_duration * multiplier_longest
gap_satisfied = actual_target_duration >= required_target_duration
slot_distribution = {c: 1 for c in background_categories}
slot_distribution[target_category] = target_reps
metadata = {
'available_for_audio_s': available_for_audio_s,
'background_duration_s': background_duration_s,
'remaining_for_target_s': remaining_for_target_s,
'target_reps': target_reps,
'actual_target_duration_s': actual_target_duration,
'max_background_duration_s': max_background_duration,
'required_target_duration_s': required_target_duration,
'multiplier_used': multiplier_longest
}
else: # shortest
# Target gets 1 rep
target_duration_s = effective_durations[target_category]
# Remaining for backgrounds
remaining_for_backgrounds_s = available_for_audio_s - target_duration_s
# Distribute remaining to backgrounds as evenly as possible
# while ensuring each background is longer than target * 1/multiplier
slot_distribution = {target_category: 1}
# Calculate minimum required duration for each background
min_background_required = target_duration_s / multiplier_shortest
background_reps = {}
for cat in background_categories:
eff_dur = effective_durations[cat]
# How many reps needed to exceed min_background_required?
min_reps = max(1, int(min_background_required / eff_dur) + 1)
background_reps[cat] = min_reps
# Check if we have room for all backgrounds
total_background_needed = sum(
background_reps[c] * effective_durations[c]
for c in background_categories
)
if total_background_needed <= remaining_for_backgrounds_s:
# Distribute extra reps
extra_available = remaining_for_backgrounds_s - total_background_needed
# Add extra reps to backgrounds proportionally
while extra_available > 0:
added_any = False
for cat in background_categories:
eff_dur = effective_durations[cat]
if extra_available >= eff_dur:
background_reps[cat] += 1
extra_available -= eff_dur
added_any = True
if not added_any:
break
slot_distribution.update(background_reps)
gap_satisfied = True
else:
# Not enough room - use minimum reps anyway
slot_distribution.update(background_reps)
gap_satisfied = False
# Calculate actual durations
actual_durations = {
cat: slot_distribution[cat] * effective_durations[cat]
for cat in categories
}
min_background_actual = min(
actual_durations[c] for c in background_categories
)
# Re-verify gap
gap_satisfied = actual_durations[target_category] <= min_background_actual * multiplier_shortest
metadata = {
'available_for_audio_s': available_for_audio_s,
'target_duration_s': target_duration_s,
'remaining_for_backgrounds_s': remaining_for_backgrounds_s,
'min_background_required_s': min_background_required,
'actual_durations_s': actual_durations,
'min_background_actual_s': min_background_actual,
'multiplier_used': multiplier_shortest
}
return slot_distribution, gap_satisfied, metadata
def build_duration_task_audio(
source_audio_lists: Dict[str, List[AudioSegment]],
slot_distribution: Dict[str, int],
effective_durations: Dict[str, float],
target_total_duration_s: float,
min_silence_between_sources_ms: int = 100,
max_extra_silence_per_gap_ms: int = 500,
crossfade_within_source_ms: int = 50
) -> Tuple[AudioSegment, List[str], Dict]:
"""
Build audio for DURATION task with consecutive ordering per source.
Structure: [SourceA × n] + silence + [SourceB × m] + silence + ...
Order of sources is randomized to avoid patterns.
Args:
source_audio_lists: Dict mapping category -> list of audio segments
slot_distribution: Dict mapping category -> number of repetitions
effective_durations: Dict mapping category -> effective duration per clip
target_total_duration_s: Target total duration
min_silence_between_sources_ms: Min silence between different sources
max_extra_silence_per_gap_ms: Max extra silence per gap
crossfade_within_source_ms: Crossfade between same-source repetitions
Returns:
Tuple of (final_audio, category_sequence, metadata)
"""
categories = list(slot_distribution.keys())
# Randomize source order
random.shuffle(categories)
# Build audio blocks for each source
source_blocks = []
category_sequence = []
actual_durations = {}
block_durations_ms = [] # Track duration of each block for timestamp calculation
for category in categories:
reps = slot_distribution[category]
audio_list = source_audio_lists[category]
if reps == 0:
continue
# Build block for this source
block = audio_list[0]
for i in range(1, reps):
# Use same clip or cycle through available clips
next_clip = audio_list[i % len(audio_list)]
# Crossfade within same source
if crossfade_within_source_ms > 0:
if len(block) > crossfade_within_source_ms and len(next_clip) > crossfade_within_source_ms:
block = block.append(next_clip, crossfade=crossfade_within_source_ms)
else:
block = block + next_clip
else:
block = block + next_clip
source_blocks.append((category, block))
block_durations_ms.append(len(block))
category_sequence.extend([category] * reps)
actual_durations[category] = len(block) / 1000.0
# Calculate total audio duration and available extra silence
total_audio_ms = sum(len(block) for _, block in source_blocks)
num_gaps = len(source_blocks) - 1
min_total_silence_ms = num_gaps * min_silence_between_sources_ms
target_ms = int(target_total_duration_s * 1000)
available_extra_ms = target_ms - total_audio_ms - min_total_silence_ms
# Distribute extra silence
if available_extra_ms > 0 and num_gaps > 0:
extra_silences = distribute_remainder_as_silences(
available_extra_ms,
num_gaps,
max_extra_silence_per_gap_ms
)
else:
extra_silences = [0] * max(num_gaps, 1)
# Concatenate with silences and track timestamps
source_timestamps = [] # List of (category, start_ms, end_ms)
current_position_ms = 0
if len(source_blocks) == 1:
final_audio = source_blocks[0][1]
cat, block = source_blocks[0]
source_timestamps.append((cat, 0, len(block)))
else:
final_audio = source_blocks[0][1]
cat, block = source_blocks[0]
source_timestamps.append((cat, 0, len(block)))
current_position_ms = len(block)
for i, (cat, block) in enumerate(source_blocks[1:]):
gap_silence_ms = min_silence_between_sources_ms + extra_silences[i]
silence = AudioSegment.silent(duration=gap_silence_ms)
# Prefer crossfading from audio -> silence for a smooth transition,
# but avoid crossfading silence -> audio (it cuts the start of the next clip).
# Conditions for safe crossfade:
# - crossfade length should be less than gap silence
# - both segments must be longer than crossfade
crossfade_ms = min(500, gap_silence_ms)
if crossfade_ms > 0 and crossfade_ms < gap_silence_ms and len(final_audio) > crossfade_ms and len(block) > crossfade_ms:
final_audio = final_audio.append(silence, crossfade=crossfade_ms)
# Append next block without crossfade to avoid trimming its start
final_audio = final_audio.append(block, crossfade=0)
# Track timestamp after silence (start of block)
start_ms = current_position_ms + gap_silence_ms
end_ms = start_ms + len(block)
source_timestamps.append((cat, start_ms, end_ms))
current_position_ms = end_ms
else:
# Fall back to simple concatenation
final_audio = final_audio + silence + block
start_ms = current_position_ms + gap_silence_ms
end_ms = start_ms + len(block)
source_timestamps.append((cat, start_ms, end_ms))
current_position_ms = end_ms
# Adjust to target duration
if len(final_audio) > target_ms:
final_audio = final_audio[:target_ms]
elif len(final_audio) < target_ms:
padding = AudioSegment.silent(duration=target_ms - len(final_audio))
final_audio = final_audio + padding
# Build timestamp string: "category1 start-end, category2 start-end, ..."
timestamp_parts = []
for cat, start_ms, end_ms in source_timestamps:
start_s = round(start_ms / 1000.0, 2)
end_s = round(end_ms / 1000.0, 2)
duration_s = round((end_ms - start_ms) / 1000.0, 2)
timestamp_parts.append(f"{cat} {start_s}s-{end_s}s ({duration_s}s)")
timestamp_string = ", ".join(timestamp_parts)
metadata = {
'source_order': [cat for cat, _ in source_blocks],
'slot_distribution': slot_distribution,
'actual_durations_s': actual_durations,
'total_audio_ms': total_audio_ms,
'num_gaps': num_gaps,
'final_duration_ms': len(final_audio),
'source_timestamps': source_timestamps, # List of (category, start_ms, end_ms)
'timestamp_string': timestamp_string # Human-readable format
}
logger.debug(
f"Duration task audio: {len(source_blocks)} sources, "
f"order={metadata['source_order']}, duration={len(final_audio)}ms"
)
return final_audio, category_sequence, metadata
|