Spaces:
Build error
Build error
| """ | |
| Stage 7 — Audio Assembly | |
| Stitches all TTS segments into a single continuous dubbed audio track. | |
| Places each segment at the correct timestamp with silence padding. | |
| """ | |
| import logging | |
| import subprocess | |
| import json | |
| from pathlib import Path | |
| from typing import List, Dict | |
| from config import TTS_CROSSFADE_MS | |
| logger = logging.getLogger(__name__) | |
| def assemble_dubbed_audio( | |
| segments: List[Dict], | |
| total_duration: float, | |
| output_dir: Path, | |
| progress_callback=None | |
| ) -> Path: | |
| """ | |
| Create a full-length audio track by placing each TTS segment | |
| at its original timestamp position. | |
| Returns path to the assembled dubbed audio file. | |
| """ | |
| output_path = output_dir / "dubbed_audio_full.wav" | |
| # Filter segments that have TTS audio | |
| valid_segments = [ | |
| s for s in segments | |
| if s.get("tts_audio_path") and Path(s["tts_audio_path"]).exists() | |
| ] | |
| if not valid_segments: | |
| raise RuntimeError("No valid TTS segments to assemble.") | |
| logger.info(f"Assembling {len(valid_segments)} segments into {total_duration:.1f}s track") | |
| # Use ffmpeg filter_complex to place segments at correct timestamps | |
| # This is more reliable than pydub for long audio | |
| filter_parts = [] | |
| inputs = [] | |
| # First input: silence for the full duration | |
| inputs.extend([ | |
| "-f", "lavfi", | |
| "-t", str(total_duration), | |
| "-i", f"anullsrc=r=24000:cl=mono" | |
| ]) | |
| # Add each TTS segment as an input | |
| for idx, seg in enumerate(valid_segments): | |
| inputs.extend(["-i", seg["tts_audio_path"]]) | |
| # Build filter: overlay each segment at its start time | |
| # [0] is the silence base, [1], [2], etc. are the TTS segments | |
| n_segs = len(valid_segments) | |
| if n_segs == 0: | |
| raise RuntimeError("No segments to assemble.") | |
| # For large numbers of segments, build the filter in stages | |
| # to avoid ffmpeg filter complexity limits | |
| if n_segs > 100: | |
| return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback) | |
| # Build adelay filter chain | |
| filter_parts = [] | |
| mix_inputs = ["[base]"] | |
| # The silence base | |
| filter_parts.append("[0]aresample=24000[base]") | |
| for idx, seg in enumerate(valid_segments): | |
| input_idx = idx + 1 # +1 because [0] is silence | |
| delay_ms = int(seg["start"] * 1000) | |
| # Resample to 24kHz, apply delay | |
| filter_parts.append( | |
| f"[{input_idx}]aresample=24000,adelay={delay_ms}|{delay_ms}[s{idx}]" | |
| ) | |
| mix_inputs.append(f"[s{idx}]") | |
| # Mix all together | |
| all_inputs = "".join(mix_inputs) | |
| filter_parts.append( | |
| f"{all_inputs}amix=inputs={n_segs + 1}:duration=longest:dropout_transition=0[out]" | |
| ) | |
| filter_complex = ";".join(filter_parts) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| *inputs, | |
| "-filter_complex", filter_complex, | |
| "-map", "[out]", | |
| "-acodec", "pcm_s16le", | |
| "-ar", "24000", | |
| "-ac", "1", | |
| str(output_path) | |
| ] | |
| logger.info("Running ffmpeg audio assembly...") | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=600) | |
| if result.returncode != 0: | |
| logger.warning(f"FFmpeg filter_complex failed. Trying chunked assembly. Error: {result.stderr[:500]}") | |
| return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback) | |
| logger.info(f"Audio assembly complete: {output_path}") | |
| if progress_callback: | |
| progress_callback(100) | |
| return output_path | |
| def _assemble_chunked( | |
| segments: List[Dict], | |
| total_duration: float, | |
| output_dir: Path, | |
| output_path: Path, | |
| progress_callback=None | |
| ) -> Path: | |
| """ | |
| Fallback: assemble audio using pydub for better memory management. | |
| Processes in chunks for very long videos. | |
| """ | |
| try: | |
| from pydub import AudioSegment | |
| except ImportError: | |
| raise RuntimeError("pydub not installed. Run: pip install pydub") | |
| logger.info("Using pydub chunked assembly (fallback)...") | |
| # Create silent base track | |
| total_ms = int(total_duration * 1000) | |
| base = AudioSegment.silent(duration=total_ms, frame_rate=24000) | |
| total_segs = len(segments) | |
| for idx, seg in enumerate(segments): | |
| try: | |
| tts_audio = AudioSegment.from_file(seg["tts_audio_path"]) | |
| # Resample to 24kHz mono | |
| tts_audio = tts_audio.set_frame_rate(24000).set_channels(1) | |
| position_ms = int(seg["start"] * 1000) | |
| # Overlay at the correct position | |
| base = base.overlay(tts_audio, position=position_ms) | |
| except Exception as e: | |
| logger.warning(f"Failed to overlay segment {idx}: {e}") | |
| continue | |
| if progress_callback and idx % 50 == 0: | |
| progress_callback(int((idx + 1) / total_segs * 100)) | |
| # Export | |
| base.export(str(output_path), format="wav") | |
| logger.info(f"Chunked assembly complete: {output_path}") | |
| return output_path | |