from __future__ import annotations import logging from datetime import UTC, datetime from pathlib import Path from uuid import UUID, uuid4 import httpx from ai_engine.config import default_agent_models_path, load_agent_models_yaml, load_runtime_limits from backend.core.config import settings from backend.services.job_store import RedisRenderJobStore from backend.services.redis_client import get_redis from backend.services.supabase_pipeline_rest import insert_worker_service_audit_row from shared.pipeline_log import ( get_pipeline_trace_id, pipeline_error, pipeline_event, ) from worker.renderer import render_manim_scene_to_disk from worker.supabase_storage import upload_render_artifact_if_configured logger = logging.getLogger(__name__) def execute_render_job(job_id: UUID) -> None: tid = get_pipeline_trace_id() logger.info("Worker: execute_render_job started job_id=%s trace_id=%s", job_id, tid) pipeline_event( "worker.render", "execute_start", "Worker starting render execution logic", job_id=str(job_id), trace_id=tid, ) store = RedisRenderJobStore(get_redis()) job = store.get(job_id) if job is None: logger.error("render job missing: %s", job_id) pipeline_event( "worker.render", "job_missing", "Render job not found in Redis", job_id=str(job_id), trace_id=tid, ) return pipeline_event( "worker.render", "job_loaded", "Loaded render job; starting pipeline", job_id=str(job_id), trace_id=tid, project_id=str(job.project_id), scene_id=str(job.scene_id) if job.scene_id else None, details={"job_type": job.job_type, "render_quality": job.render_quality}, ) store.update( job_id, status="rendering", started_at=datetime.now(tz=UTC), progress=5, logs="Starting Manim render...", ) job_dir: Path | None = None result = None job_dir = None try: quality = job.render_quality or "720p" pipeline_event( "worker.render", "manim_start", "Invoking Manim render", job_id=str(job_id), trace_id=tid, details={"quality": quality, "job_type": job.job_type}, ) yaml_path = ( Path(settings.agent_models_yaml).expanduser() if settings.agent_models_yaml else default_agent_models_path() ) yaml_data = load_agent_models_yaml(yaml_path) rt = load_runtime_limits(yaml_data) manim_timeout = rt.worker_man_render_timeout_seconds result = render_manim_scene_to_disk( job_id=job_id, job_type=job.job_type, quality=quality, timeout=manim_timeout, ) video_path = result.video_path job_dir = result.job_dir pipeline_event( "worker.render", "manim_done", "Manim finished", job_id=str(job_id), trace_id=tid, details={"video_path": str(video_path), "returncode_ok": True}, ) remote_url = upload_render_artifact_if_configured( video_path=video_path, project_id=job.project_id, job_id=job_id, ) asset_url = remote_url if remote_url else f"file://{video_path}" pipeline_event( "worker.render", "artifact_ready", "Upload/local asset URL resolved", job_id=str(job_id), trace_id=tid, details={"has_remote_url": bool(remote_url)}, ) video_duration = 0.0 try: from worker.tts_runtime import _ffprobe_duration_seconds video_duration = _ffprobe_duration_seconds(video_path) except Exception: logger.warning("Failed to calculate video duration for job_id=%s", job_id) store.update( job_id, status="completed", progress=100, asset_url=asset_url, completed_at=datetime.now(tz=UTC), logs="Render completed.", metadata={"video_duration": video_duration}, ) pipeline_event( "worker.render", "job_completed", "Render job marked completed in Redis", job_id=str(job_id), trace_id=tid, ) try: insert_worker_service_audit_row( audit_id=uuid4(), project_id=job.project_id, scene_id=job.scene_id, worker_kind="manim", worker_name=settings.worker_name, render_job_id=job_id, payload={ "status": "completed", "command": result.command, "stdout_tail": result.stdout_tail, "stderr_tail": result.stderr_tail, "asset_url": asset_url, "video_path": str(video_path), }, ) except Exception: logger.exception("Audit insertion failed for job_id=%s (non-fatal)", job_id) if job.webhook_url: try: _post_webhook( job.webhook_url, job_id=job_id, job_status="completed", asset_url=asset_url, error=None, ) except Exception: logger.exception("Webhook failed for job_id=%s (non-fatal)", job_id) except Exception as exc: # noqa: BLE001 — surface failure to job record logger.exception("Render failed job_id=%s", job_id) pipeline_error( "worker.render", "job_failed", "Render pipeline raised", job_id=str(job_id), trace_id=tid, details={"error": str(exc)[:2000]}, ) store.update( job_id, status="failed", error_code="render_failed", completed_at=datetime.now(tz=UTC), logs=str(exc), ) insert_worker_service_audit_row( audit_id=uuid4(), project_id=job.project_id, scene_id=job.scene_id, worker_kind="manim", worker_name=settings.worker_name, render_job_id=job_id, payload={"status": "failed", "error": str(exc)}, ) if job.webhook_url: _post_webhook( job.webhook_url, job_id=job_id, job_status="failed", asset_url=None, error=str(exc), ) finally: if job_dir and job_dir.exists(): import shutil try: # New Structured Storage: storage/outputs/// project_dir = Path(settings.output_dir) / str(job.project_id) scene_dir = project_dir / (str(job.scene_id) if job.scene_id else "misc") scene_dir.mkdir(parents=True, exist_ok=True) # 1. Final Combined (result.video_path) if result and result.video_path and result.video_path.exists(): shutil.copy2(result.video_path, scene_dir / "final_combined.mp4") # 2. Silent Manim (result.silent_video_path) if result and result.silent_video_path and result.silent_video_path.exists(): shutil.copy2(result.silent_video_path, scene_dir / "manim_silent.mp4") # 3. Voice Audio (result.audio_path) if result and result.audio_path and result.audio_path.exists(): shutil.copy2(result.audio_path, scene_dir / "voice_audio.wav") # Intermediates in sub-folder intermediates_dir = scene_dir / "intermediates" intermediates_dir.mkdir(parents=True, exist_ok=True) # Copy logs and generated script for file in job_dir.glob("*"): if file.is_file() and file.suffix != ".mp4" and file.suffix != ".wav": shutil.copy2(file, intermediates_dir / file.name) logger.info("Structured outputs saved to %s", scene_dir) except Exception as local_err: logger.warning("Failed to save structured outputs: %s", local_err) logger.info("Cleaning up job_dir: %s", job_dir) shutil.rmtree(job_dir, ignore_errors=True) def _post_webhook( url: str, *, job_id: UUID, job_status: str, asset_url: str | None, error: str | None, ) -> None: payload: dict[str, object] = { "job_id": str(job_id), "status": job_status, "asset_url": asset_url, "metadata": {"error": error, "worker": settings.worker_name}, } try: httpx.post(url, json=payload, timeout=10.0) except Exception: logger.exception("Webhook POST failed job_id=%s url=%s", job_id, url)