Spaces:
Build error
Build error
File size: 4,989 Bytes
4ec3855 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 | """
Stage 7 — Audio Assembly
Stitches all TTS segments into a single continuous dubbed audio track.
Places each segment at the correct timestamp with silence padding.
"""
import logging
import subprocess
import json
from pathlib import Path
from typing import List, Dict
from config import TTS_CROSSFADE_MS
logger = logging.getLogger(__name__)
def assemble_dubbed_audio(
segments: List[Dict],
total_duration: float,
output_dir: Path,
progress_callback=None
) -> Path:
"""
Create a full-length audio track by placing each TTS segment
at its original timestamp position.
Returns path to the assembled dubbed audio file.
"""
output_path = output_dir / "dubbed_audio_full.wav"
# Filter segments that have TTS audio
valid_segments = [
s for s in segments
if s.get("tts_audio_path") and Path(s["tts_audio_path"]).exists()
]
if not valid_segments:
raise RuntimeError("No valid TTS segments to assemble.")
logger.info(f"Assembling {len(valid_segments)} segments into {total_duration:.1f}s track")
# Use ffmpeg filter_complex to place segments at correct timestamps
# This is more reliable than pydub for long audio
filter_parts = []
inputs = []
# First input: silence for the full duration
inputs.extend([
"-f", "lavfi",
"-t", str(total_duration),
"-i", f"anullsrc=r=24000:cl=mono"
])
# Add each TTS segment as an input
for idx, seg in enumerate(valid_segments):
inputs.extend(["-i", seg["tts_audio_path"]])
# Build filter: overlay each segment at its start time
# [0] is the silence base, [1], [2], etc. are the TTS segments
n_segs = len(valid_segments)
if n_segs == 0:
raise RuntimeError("No segments to assemble.")
# For large numbers of segments, build the filter in stages
# to avoid ffmpeg filter complexity limits
if n_segs > 100:
return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback)
# Build adelay filter chain
filter_parts = []
mix_inputs = ["[base]"]
# The silence base
filter_parts.append("[0]aresample=24000[base]")
for idx, seg in enumerate(valid_segments):
input_idx = idx + 1 # +1 because [0] is silence
delay_ms = int(seg["start"] * 1000)
# Resample to 24kHz, apply delay
filter_parts.append(
f"[{input_idx}]aresample=24000,adelay={delay_ms}|{delay_ms}[s{idx}]"
)
mix_inputs.append(f"[s{idx}]")
# Mix all together
all_inputs = "".join(mix_inputs)
filter_parts.append(
f"{all_inputs}amix=inputs={n_segs + 1}:duration=longest:dropout_transition=0[out]"
)
filter_complex = ";".join(filter_parts)
cmd = [
"ffmpeg", "-y",
*inputs,
"-filter_complex", filter_complex,
"-map", "[out]",
"-acodec", "pcm_s16le",
"-ar", "24000",
"-ac", "1",
str(output_path)
]
logger.info("Running ffmpeg audio assembly...")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
logger.warning(f"FFmpeg filter_complex failed. Trying chunked assembly. Error: {result.stderr[:500]}")
return _assemble_chunked(valid_segments, total_duration, output_dir, output_path, progress_callback)
logger.info(f"Audio assembly complete: {output_path}")
if progress_callback:
progress_callback(100)
return output_path
def _assemble_chunked(
segments: List[Dict],
total_duration: float,
output_dir: Path,
output_path: Path,
progress_callback=None
) -> Path:
"""
Fallback: assemble audio using pydub for better memory management.
Processes in chunks for very long videos.
"""
try:
from pydub import AudioSegment
except ImportError:
raise RuntimeError("pydub not installed. Run: pip install pydub")
logger.info("Using pydub chunked assembly (fallback)...")
# Create silent base track
total_ms = int(total_duration * 1000)
base = AudioSegment.silent(duration=total_ms, frame_rate=24000)
total_segs = len(segments)
for idx, seg in enumerate(segments):
try:
tts_audio = AudioSegment.from_file(seg["tts_audio_path"])
# Resample to 24kHz mono
tts_audio = tts_audio.set_frame_rate(24000).set_channels(1)
position_ms = int(seg["start"] * 1000)
# Overlay at the correct position
base = base.overlay(tts_audio, position=position_ms)
except Exception as e:
logger.warning(f"Failed to overlay segment {idx}: {e}")
continue
if progress_callback and idx % 50 == 0:
progress_callback(int((idx + 1) / total_segs * 100))
# Export
base.export(str(output_path), format="wav")
logger.info(f"Chunked assembly complete: {output_path}")
return output_path
|