import json import logging import subprocess import tempfile from pathlib import Path from typing import NamedTuple from uuid import UUID from backend.core.config import settings from backend.db.content_store import get_content_store from backend.services.job_store import RedisRenderJobStore from backend.services.redis_client import get_redis from shared.pipeline_log import ( get_pipeline_trace_id, pipeline_debug, pipeline_error, pipeline_event, ) from shared.schemas.render_job import RenderJobType, RenderQuality logger = logging.getLogger(__name__) class RenderManimResult(NamedTuple): video_path: Path job_dir: Path stdout_tail: str stderr_tail: str command: list[str] silent_video_path: Path | None = None audio_path: Path | None = None def manim_quality_flags(*, job_type: RenderJobType, quality: RenderQuality) -> list[str]: if job_type == "preview": return ["-qh"] if quality == "4k": return ["-qk"] if quality == "1080p": return ["-qh"] return ["-qh"] # Default to high for non-preview if not specified def render_manim_scene_to_disk( *, job_id: UUID, job_type: RenderJobType, quality: RenderQuality, timeout: int = 1200, ) -> RenderManimResult: """Run `manim render` into an isolated per-job output directory.""" repo_root = Path(settings.repo_root).resolve() # Create a temporary directory for this job to ensure no local leakage job_dir = Path(tempfile.mkdtemp(prefix=f"manim_job_{job_id}_")).resolve() media_dir = job_dir / "media" media_dir.mkdir(parents=True, exist_ok=True) job_store = RedisRenderJobStore(get_redis()) job = job_store.get(job_id) if job is None: msg = f"Render job not found: {job_id}" raise RuntimeError(msg) scene = None if job.scene_id is not None: content = get_content_store() scene = content.get_scene(job.scene_id) if scene is None: msg = f"Scene {job.scene_id} not found in content store for job {job_id}" logger.error(msg) raise RuntimeError(msg) mcode = (scene.manim_code or "").strip() logger.info( "Retrieved scene_id=%s manim_code_len=%d version=%d", job.scene_id, len(mcode), scene.manim_code_version, ) if not mcode: msg = f"Scene {job.scene_id} missing manim_code for render (job_id={job_id})" logger.error(msg) raise RuntimeError(msg) # Inject metadata: durations for Dynamic Template durations_json = json.dumps(scene.sync_segments or {}, ensure_ascii=False) metadata_injection = f"\n# Dynamic Template Metadata\nBEAT_DURATIONS = {durations_json}\n\n" scene_file = (job_dir / "generated_scene.py").resolve() # Ensure __future__ imports are at the very top # (SyntaxError if BEAT_DURATIONS is before them) lines = mcode.splitlines(keepends=True) future_lines = [] other_lines = [] for line in lines: if "__future__" in line and ("import" in line or "from" in line): future_lines.append(line) else: other_lines.append(line) full_code = "".join(future_lines) + metadata_injection + "".join(other_lines) scene_file.write_text(full_code, encoding="utf-8") scene_class = settings.generated_scene_class else: scene_file = (repo_root / settings.manim_scene_file).resolve() if not scene_file.is_file(): msg = f"Scene file not found: {scene_file}" raise FileNotFoundError(msg) scene_class = settings.manim_scene_class q_flags = manim_quality_flags(job_type=job_type, quality=quality) cmd = [ "manim", "render", *q_flags, str(scene_file), scene_class, "--media_dir", str(media_dir), ] tid = get_pipeline_trace_id() pipeline_event( "worker.manim", "subprocess_start", "Running manim render", trace_id=tid, job_id=str(job_id), details={"scene_class": scene_class, "flags": q_flags, "scene_file": str(scene_file)}, ) pipeline_debug( "worker.manim", "subprocess_cmd", "Full manim command", trace_id=tid, job_id=str(job_id), details={"argv": cmd}, ) try: proc = subprocess.run( cmd, cwd=str(repo_root), check=True, capture_output=True, text=True, timeout=timeout, ) except subprocess.TimeoutExpired as exc: pipeline_error( "worker.manim", "subprocess_timeout", "manim render timed out", trace_id=tid, job_id=str(job_id), details={"timeout_seconds": timeout}, ) raise RuntimeError(f"Manim render timed out after {timeout} seconds") from exc except subprocess.CalledProcessError as exc: stdout_tail = (exc.stdout or "")[-8000:] stderr_tail = (exc.stderr or "")[-8000:] pipeline_error( "worker.manim", "subprocess_failed", "manim exited non-zero", trace_id=tid, job_id=str(job_id), details={ "returncode": exc.returncode, "stderr_tail": (stderr_tail or "")[:1500], }, ) detail = (stderr_tail or stdout_tail or "").strip() or f"manim exit code {exc.returncode}" raise RuntimeError(detail) from exc stdout_tail = (proc.stdout or "")[-8000:] stderr_tail = (proc.stderr or "")[-8000:] matches = sorted(media_dir.rglob(f"{scene_class}.mp4")) if not matches: msg = f"No mp4 produced under {media_dir}" raise FileNotFoundError(msg) video_path = matches[-1] pipeline_event( "worker.manim", "subprocess_ok", "mp4 produced", trace_id=tid, job_id=str(job_id), details={"mp4": str(video_path)}, ) silent_video = matches[-1] local_audio_path = None # --- Audio Merging Logic --- if scene and scene.audio_url: try: logger.info("Merging audio for scene: %s, URL: %s", job_id, scene.audio_url) pipeline_event( "worker.manim", "audio_merge_start", "Downloading and merging audio", job_id=str(job_id), details={"audio_url": scene.audio_url}, ) audio_ext = "mp3" if ".wav" in scene.audio_url.lower(): audio_ext = "wav" local_audio = job_dir / f"voice.{audio_ext}" import httpx full_audio_url = scene.audio_url if not full_audio_url.startswith("http"): base_url = (settings.supabase_url or "").strip() if base_url and not base_url.endswith("/storage/v1"): base_url = f"{base_url}/storage/v1" full_audio_url = f"{base_url}{scene.audio_url}" logger.info("Downloading audio from: %s", full_audio_url) pipeline_event( "worker.manim", "audio_download_start", "Downloading audio from resolved URL", details={"url": full_audio_url}, ) with httpx.stream("GET", full_audio_url, follow_redirects=True) as r: r.raise_for_status() with open(local_audio, "wb") as f: for chunk in r.iter_bytes(): f.write(chunk) local_audio_path = local_audio logger.info( "Audio downloaded to: %s (Size: %s bytes)", local_audio, local_audio.stat().st_size ) merged_video = job_dir / f"{scene_class}_merged.mp4" merge_cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-i", str(local_audio), "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", str(merged_video), ] logger.info("Running merge command: %s", " ".join(merge_cmd)) merge_proc = subprocess.run(merge_cmd, check=True, capture_output=True, text=True) logger.info("FFmpeg merge stdout: %s", merge_proc.stdout) logger.info("FFmpeg merge stderr: %s", merge_proc.stderr) if merged_video.exists(): video_path = merged_video logger.info("Audio merged successfully. New video_path: %s", video_path) pipeline_event( "worker.manim", "audio_merge_ok", "Audio merged successfully", job_id=str(job_id), ) except Exception as e: logger.exception("Failed to merge audio: %s", e) pipeline_error("worker.manim", "audio_merge_failed", str(e), job_id=str(job_id)) return RenderManimResult( video_path=video_path, job_dir=job_dir, stdout_tail=stdout_tail, stderr_tail=stderr_tail, command=cmd, silent_video_path=silent_video, audio_path=local_audio_path, )