Manim-Agent / worker /renderer.py
github-actions[bot]
[API] Cuong2004/Manim-Agent @ 1d7c417 (run 25583057312)
9bed109
import json
import logging
import subprocess
import tempfile
from pathlib import Path
from typing import NamedTuple
from uuid import UUID
from backend.core.config import settings
from backend.db.content_store import get_content_store
from backend.services.job_store import RedisRenderJobStore
from backend.services.redis_client import get_redis
from shared.pipeline_log import (
get_pipeline_trace_id,
pipeline_debug,
pipeline_error,
pipeline_event,
)
from shared.schemas.render_job import RenderJobType, RenderQuality
logger = logging.getLogger(__name__)
class RenderManimResult(NamedTuple):
video_path: Path
job_dir: Path
stdout_tail: str
stderr_tail: str
command: list[str]
silent_video_path: Path | None = None
audio_path: Path | None = None
def manim_quality_flags(*, job_type: RenderJobType, quality: RenderQuality) -> list[str]:
if job_type == "preview":
return ["-qh"]
if quality == "4k":
return ["-qk"]
if quality == "1080p":
return ["-qh"]
return ["-qh"] # Default to high for non-preview if not specified
def render_manim_scene_to_disk(
*,
job_id: UUID,
job_type: RenderJobType,
quality: RenderQuality,
timeout: int = 1200,
) -> RenderManimResult:
"""Run `manim render` into an isolated per-job output directory."""
repo_root = Path(settings.repo_root).resolve()
# Create a temporary directory for this job to ensure no local leakage
job_dir = Path(tempfile.mkdtemp(prefix=f"manim_job_{job_id}_")).resolve()
media_dir = job_dir / "media"
media_dir.mkdir(parents=True, exist_ok=True)
job_store = RedisRenderJobStore(get_redis())
job = job_store.get(job_id)
if job is None:
msg = f"Render job not found: {job_id}"
raise RuntimeError(msg)
scene = None
if job.scene_id is not None:
content = get_content_store()
scene = content.get_scene(job.scene_id)
if scene is None:
msg = f"Scene {job.scene_id} not found in content store for job {job_id}"
logger.error(msg)
raise RuntimeError(msg)
mcode = (scene.manim_code or "").strip()
logger.info(
"Retrieved scene_id=%s manim_code_len=%d version=%d",
job.scene_id,
len(mcode),
scene.manim_code_version,
)
if not mcode:
msg = f"Scene {job.scene_id} missing manim_code for render (job_id={job_id})"
logger.error(msg)
raise RuntimeError(msg)
# Inject metadata: durations for Dynamic Template
durations_json = json.dumps(scene.sync_segments or {}, ensure_ascii=False)
metadata_injection = f"\n# Dynamic Template Metadata\nBEAT_DURATIONS = {durations_json}\n\n"
scene_file = (job_dir / "generated_scene.py").resolve()
# Ensure __future__ imports are at the very top
# (SyntaxError if BEAT_DURATIONS is before them)
lines = mcode.splitlines(keepends=True)
future_lines = []
other_lines = []
for line in lines:
if "__future__" in line and ("import" in line or "from" in line):
future_lines.append(line)
else:
other_lines.append(line)
full_code = "".join(future_lines) + metadata_injection + "".join(other_lines)
scene_file.write_text(full_code, encoding="utf-8")
scene_class = settings.generated_scene_class
else:
scene_file = (repo_root / settings.manim_scene_file).resolve()
if not scene_file.is_file():
msg = f"Scene file not found: {scene_file}"
raise FileNotFoundError(msg)
scene_class = settings.manim_scene_class
q_flags = manim_quality_flags(job_type=job_type, quality=quality)
cmd = [
"manim",
"render",
*q_flags,
str(scene_file),
scene_class,
"--media_dir",
str(media_dir),
]
tid = get_pipeline_trace_id()
pipeline_event(
"worker.manim",
"subprocess_start",
"Running manim render",
trace_id=tid,
job_id=str(job_id),
details={"scene_class": scene_class, "flags": q_flags, "scene_file": str(scene_file)},
)
pipeline_debug(
"worker.manim",
"subprocess_cmd",
"Full manim command",
trace_id=tid,
job_id=str(job_id),
details={"argv": cmd},
)
try:
proc = subprocess.run(
cmd,
cwd=str(repo_root),
check=True,
capture_output=True,
text=True,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
pipeline_error(
"worker.manim",
"subprocess_timeout",
"manim render timed out",
trace_id=tid,
job_id=str(job_id),
details={"timeout_seconds": timeout},
)
raise RuntimeError(f"Manim render timed out after {timeout} seconds") from exc
except subprocess.CalledProcessError as exc:
stdout_tail = (exc.stdout or "")[-8000:]
stderr_tail = (exc.stderr or "")[-8000:]
pipeline_error(
"worker.manim",
"subprocess_failed",
"manim exited non-zero",
trace_id=tid,
job_id=str(job_id),
details={
"returncode": exc.returncode,
"stderr_tail": (stderr_tail or "")[:1500],
},
)
detail = (stderr_tail or stdout_tail or "").strip() or f"manim exit code {exc.returncode}"
raise RuntimeError(detail) from exc
stdout_tail = (proc.stdout or "")[-8000:]
stderr_tail = (proc.stderr or "")[-8000:]
matches = sorted(media_dir.rglob(f"{scene_class}.mp4"))
if not matches:
msg = f"No mp4 produced under {media_dir}"
raise FileNotFoundError(msg)
video_path = matches[-1]
pipeline_event(
"worker.manim",
"subprocess_ok",
"mp4 produced",
trace_id=tid,
job_id=str(job_id),
details={"mp4": str(video_path)},
)
silent_video = matches[-1]
local_audio_path = None
# --- Audio Merging Logic ---
if scene and scene.audio_url:
try:
logger.info("Merging audio for scene: %s, URL: %s", job_id, scene.audio_url)
pipeline_event(
"worker.manim",
"audio_merge_start",
"Downloading and merging audio",
job_id=str(job_id),
details={"audio_url": scene.audio_url},
)
audio_ext = "mp3"
if ".wav" in scene.audio_url.lower():
audio_ext = "wav"
local_audio = job_dir / f"voice.{audio_ext}"
import httpx
full_audio_url = scene.audio_url
if not full_audio_url.startswith("http"):
base_url = (settings.supabase_url or "").strip()
if base_url and not base_url.endswith("/storage/v1"):
base_url = f"{base_url}/storage/v1"
full_audio_url = f"{base_url}{scene.audio_url}"
logger.info("Downloading audio from: %s", full_audio_url)
pipeline_event(
"worker.manim",
"audio_download_start",
"Downloading audio from resolved URL",
details={"url": full_audio_url},
)
with httpx.stream("GET", full_audio_url, follow_redirects=True) as r:
r.raise_for_status()
with open(local_audio, "wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
local_audio_path = local_audio
logger.info(
"Audio downloaded to: %s (Size: %s bytes)", local_audio, local_audio.stat().st_size
)
merged_video = job_dir / f"{scene_class}_merged.mp4"
merge_cmd = [
"ffmpeg",
"-y",
"-i",
str(video_path),
"-i",
str(local_audio),
"-c:v",
"copy",
"-c:a",
"aac",
"-map",
"0:v:0",
"-map",
"1:a:0",
str(merged_video),
]
logger.info("Running merge command: %s", " ".join(merge_cmd))
merge_proc = subprocess.run(merge_cmd, check=True, capture_output=True, text=True)
logger.info("FFmpeg merge stdout: %s", merge_proc.stdout)
logger.info("FFmpeg merge stderr: %s", merge_proc.stderr)
if merged_video.exists():
video_path = merged_video
logger.info("Audio merged successfully. New video_path: %s", video_path)
pipeline_event(
"worker.manim",
"audio_merge_ok",
"Audio merged successfully",
job_id=str(job_id),
)
except Exception as e:
logger.exception("Failed to merge audio: %s", e)
pipeline_error("worker.manim", "audio_merge_failed", str(e), job_id=str(job_id))
return RenderManimResult(
video_path=video_path,
job_dir=job_dir,
stdout_tail=stdout_tail,
stderr_tail=stderr_tail,
command=cmd,
silent_video_path=silent_video,
audio_path=local_audio_path,
)