""" Stage 7 — Audio Assembly Stitches all TTS segments into a single continuous dubbed audio track. Places each segment at the correct timestamp with silence padding. """ import logging import subprocess import json from pathlib import Path from typing import List, Dict from config import TTS_CROSSFADE_MS logger = logging.getLogger(__name__) def assemble_dubbed_audio( segments: List[Dict], total_duration: float, output_dir: Path, progress_callback=None ) -> Path: """ Create a full-length audio track by placing each TTS segment at its original timestamp position. Returns path to the assembled dubbed audio file. """ output_path = output_dir / "dubbed_audio_full.wav" # Filter segments that have TTS audio valid_segments = [ s for s in segments if s.get("tts_audio_path") and Path(s["tts_audio_path"]).exists() ] if not valid_segments: raise RuntimeError("No valid TTS segments to assemble.") logger.info(f"Assembling {len(valid_segments)} segments into {total_duration:.1f}s track") # Use ffmpeg filter_complex to place segments at correct timestamps # This is more reliable than pydub for long audio filter_parts = [] inputs = [] # First input: silence for the full duration inputs.extend([ "-f", "lavfi", "-t", str(total_duration), "-i", f"anullsrc=r=24000:cl=mono" ]) # Add each TTS segment as an input for idx, seg in enumerate(valid_segments): inputs.extend(["-i", seg["tts_audio_path"]]) # Build filter: overlay each segment at its start time # [0] is the silence base, [1], [2], etc. are the TTS segments n_segs = len(valid_segments) if n_segs == 0: raise RuntimeError("No segments to assemble.") # For large numbers of segments, build the filter in stages # to avoid ffmpeg filter complexity limits if n_segs > 100: return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback) # Build adelay filter chain filter_parts = [] mix_inputs = ["[base]"] # The silence base filter_parts.append("[0]aresample=24000[base]") for idx, seg in enumerate(valid_segments): input_idx = idx + 1 # +1 because [0] is silence delay_ms = int(seg["start"] * 1000) # Resample to 24kHz, apply delay filter_parts.append( f"[{input_idx}]aresample=24000,adelay={delay_ms}|{delay_ms}[s{idx}]" ) mix_inputs.append(f"[s{idx}]") # Mix all together all_inputs = "".join(mix_inputs) filter_parts.append( f"{all_inputs}amix=inputs={n_segs + 1}:duration=longest:dropout_transition=0[out]" ) filter_complex = ";".join(filter_parts) cmd = [ "ffmpeg", "-y", *inputs, "-filter_complex", filter_complex, "-map", "[out]", "-acodec", "pcm_s16le", "-ar", "24000", "-ac", "1", str(output_path) ] logger.info("Running ffmpeg audio assembly...") result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) if result.returncode != 0: logger.warning(f"FFmpeg filter_complex failed. Trying chunked assembly. Error: {result.stderr[:500]}") return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback) logger.info(f"Audio assembly complete: {output_path}") if progress_callback: progress_callback(100) return output_path def _assemble_chunked( segments: List[Dict], total_duration: float, output_dir: Path, output_path: Path, progress_callback=None ) -> Path: """ Fallback: assemble audio using pydub for better memory management. Processes in chunks for very long videos. """ try: from pydub import AudioSegment except ImportError: raise RuntimeError("pydub not installed. Run: pip install pydub") logger.info("Using pydub chunked assembly (fallback)...") # Create silent base track total_ms = int(total_duration * 1000) base = AudioSegment.silent(duration=total_ms, frame_rate=24000) total_segs = len(segments) for idx, seg in enumerate(segments): try: tts_audio = AudioSegment.from_file(seg["tts_audio_path"]) # Resample to 24kHz mono tts_audio = tts_audio.set_frame_rate(24000).set_channels(1) position_ms = int(seg["start"] * 1000) # Overlay at the correct position base = base.overlay(tts_audio, position=position_ms) except Exception as e: logger.warning(f"Failed to overlay segment {idx}: {e}") continue if progress_callback and idx % 50 == 0: progress_callback(int((idx + 1) / total_segs * 100)) # Export base.export(str(output_path), format="wav") logger.info(f"Chunked assembly complete: {output_path}") return output_path