Spaces:
Running
Running
| # src/rendering/scene_renderer.py | |
| import subprocess | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import Optional | |
| from src.core.ai_debugger import AIDebugger | |
| from src.utils.smart_logger import logger | |
| import os | |
| def render_and_combine_scene(project_config: dict, scene_name: str, code_file: str, manim_executable: str, renders_cache: Path, debugger: AIDebugger, retry_count: int = 0, max_retries: int = 2, regen_count: int = 0, max_regens: int = 1) -> Optional[str]: | |
| """Renders a scene with Manim and combines audio using subprocess calls.""" | |
| operation_id = f"render_{scene_name}" | |
| if retry_count == 0 and regen_count == 0: | |
| logger.start_operation(operation_id, f"π¬ Rendering {scene_name}") | |
| logger.quick_log("info", f"π Using scene file: {Path(code_file).name}") | |
| else: | |
| logger.quick_log("info", f"π Retry #{retry_count} for {scene_name} (regen: {regen_count})") | |
| if not code_file or not scene_name: | |
| logger.end_operation(operation_id, "β Missing code file or scene name") | |
| return None | |
| output_dir = renders_cache / scene_name | |
| output_dir.mkdir(exist_ok=True) | |
| final_video_path = output_dir / f"{scene_name}_final.mp4" | |
| absolute_code_file = Path(code_file).resolve() | |
| manim_command = [ | |
| manim_executable, "render", "-o", f'{scene_name}.mp4', "-ql", | |
| str(absolute_code_file), scene_name, | |
| ] | |
| logger.quick_log("info", f"Manim cmd: {manim_executable} render {scene_name}") | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| process = subprocess.run( | |
| manim_command, check=True, capture_output=True, text=True, | |
| cwd=temp_dir, timeout=300 | |
| ) | |
| # Find rendered file | |
| possible_files = [ | |
| Path(temp_dir) / f"{scene_name}.mp4", | |
| Path(temp_dir) / "media" / "videos" / "1080p60" / f"{scene_name}.mp4", | |
| Path(temp_dir) / "media" / "videos" / f"{scene_name}.mp4", | |
| ] | |
| rendered_file = None | |
| for possible_file in possible_files: | |
| if possible_file.exists(): | |
| rendered_file = possible_file | |
| break | |
| if not rendered_file: | |
| for file_path in Path(temp_dir).rglob("*.mp4"): | |
| if scene_name in file_path.name: | |
| rendered_file = file_path | |
| break | |
| if not rendered_file: | |
| logger.end_operation(operation_id, "No output file produced") | |
| return None | |
| scene_data = project_config["scenes"][scene_name] | |
| audio_path = scene_data.get("audio_file") | |
| if audio_path and Path(audio_path).exists(): | |
| # Get durations and combine with audio | |
| try: | |
| probe_cmd = ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', str(audio_path)] | |
| audio_duration_result = subprocess.run(probe_cmd, capture_output=True, text=True, check=True) | |
| audio_duration = float(audio_duration_result.stdout.strip()) | |
| video_probe_cmd = ['ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'csv=p=0', str(rendered_file)] | |
| video_duration_result = subprocess.run(video_probe_cmd, capture_output=True, text=True, check=True) | |
| video_duration = float(video_duration_result.stdout.strip()) | |
| if audio_duration > video_duration: | |
| # Extend video to match audio | |
| ffmpeg_command = [ | |
| 'ffmpeg', '-y', '-i', str(rendered_file), '-i', str(audio_path), | |
| '-filter_complex', f'[0:v]tpad=stop_mode=clone:stop_duration={audio_duration - video_duration}[v]', | |
| '-map', '[v]', '-map', '1:a', '-c:a', 'aac', '-r', '30', str(final_video_path) | |
| ] | |
| else: | |
| # Standard combine | |
| ffmpeg_command = [ | |
| 'ffmpeg', '-y', '-i', str(rendered_file), '-i', str(audio_path), | |
| '-c:v', 'copy', '-c:a', 'aac', '-shortest', str(final_video_path) | |
| ] | |
| subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True) | |
| video_size = final_video_path.stat().st_size / (1024*1024) | |
| logger.end_operation(operation_id, "Video+audio combined", f"{video_size:.1f}MB") | |
| except Exception: | |
| # Fallback to simple combination | |
| ffmpeg_command = [ | |
| 'ffmpeg', '-y', '-i', str(rendered_file), '-i', str(audio_path), | |
| '-c:v', 'copy', '-c:a', 'aac', '-shortest', str(final_video_path) | |
| ] | |
| subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True) | |
| logger.end_operation(operation_id, "Video+audio combined (fallback)") | |
| return str(final_video_path) | |
| else: | |
| # Video only | |
| shutil.move(str(rendered_file), str(final_video_path)) | |
| video_size = final_video_path.stat().st_size / (1024*1024) | |
| logger.end_operation(operation_id, "Video-only rendered", f"{video_size:.1f}MB") | |
| return str(final_video_path) | |
| except subprocess.TimeoutExpired: | |
| logger.end_operation(operation_id, "Manim rendering timed out (5min)") | |
| return None | |
| except subprocess.CalledProcessError as e: | |
| logger.error_with_context(f"β Manim failed for {scene_name}", f"Exit code: {e.returncode}") | |
| # Quick exit - if we've already tried a lot, just skip | |
| if retry_count >= max_retries or regen_count >= max_regens: | |
| logger.quick_log("error", f"β Scene {scene_name} exhausted all attempts - SKIPPING") | |
| logger.end_operation(operation_id, f"β {scene_name} SKIPPED") | |
| return None | |
| if "error" in e.stderr.lower() and retry_count < max_retries: | |
| # Try AI debugging first (up to max_retries times) | |
| logger.quick_log("info", f"π§ Attempting AI auto-debug ({retry_count + 1}/{max_retries})") | |
| fixed_file_path = debugger.debug_and_fix(code_file, e.stderr, scene_name, project_config) | |
| if fixed_file_path: | |
| logger.quick_log("success", f"β AI created NEW fixed file, retrying") | |
| return render_and_combine_scene(project_config, scene_name, fixed_file_path, manim_executable, renders_cache, debugger, retry_count + 1, max_retries, regen_count, max_regens) | |
| else: | |
| logger.quick_log("error", "β AI debugger failed - SKIPPING") | |
| return None | |
| # If AI debugging failed or we've reached max retries, try code regeneration ONCE | |
| if retry_count >= max_retries and regen_count < max_regens: | |
| logger.quick_log("info", f"π Trying code regeneration - LAST ATTEMPT") | |
| from src.generation.code_generator import regenerate_single_scene_code | |
| # Get the gemini client from the environment | |
| import os | |
| from google import genai | |
| api_key = os.environ.get("GEMINI_API_KEY") | |
| if api_key: | |
| try: | |
| gemini_client = genai.Client(api_key=api_key) | |
| temp_dir = Path(code_file).parent | |
| if regenerate_single_scene_code(project_config, gemini_client, str(temp_dir), scene_name): | |
| logger.quick_log("success", "β NEW scene code generated - FINAL ATTEMPT") | |
| new_code_file = project_config["scenes"][scene_name]["code_file"] | |
| return render_and_combine_scene(project_config, scene_name, new_code_file, manim_executable, renders_cache, debugger, 0, max_retries, regen_count + 1, max_regens) | |
| else: | |
| logger.quick_log("error", "β Code regeneration failed - SKIPPING") | |
| return None | |
| except Exception as regen_e: | |
| logger.error_with_context("β Code regeneration error - SKIPPING", str(regen_e)) | |
| return None | |
| logger.end_operation(operation_id, f"β {scene_name} failed - SKIPPED") | |
| return None | |
| except Exception as e: | |
| logger.error_with_context(f"Unexpected rendering error for {scene_name}", str(e)) | |
| logger.end_operation(operation_id, "Unexpected error") | |
| return None | |
| def render_scenes(project_config: dict, manim_executable: str, renders_cache: Path, debugger: AIDebugger): | |
| """Workflow Step 4: Render all scenes using Manim.""" | |
| logger.start_operation("render_scenes", "Starting scene rendering") | |
| # Validate manim executable | |
| if not manim_executable or manim_executable == "manim": | |
| possible_paths = [ | |
| "/home/gokulbarath/anaconda3/bin/manim", "manim", "/usr/local/bin/manim", | |
| "/usr/bin/manim", "/opt/anaconda3/bin/manim", "/conda/bin/manim", "/opt/conda/bin/manim" | |
| ] | |
| for path in possible_paths: | |
| if Path(path).exists(): | |
| manim_executable = path | |
| break | |
| else: | |
| logger.end_operation("render_scenes", "Manim executable not found") | |
| return | |
| logger.quick_log("info", f"Using manim: {manim_executable}") | |
| scenes_to_render = [ | |
| (name, data) for name, data in project_config.get("scenes", {}).items() | |
| if data.get("status") not in ["rendered"] | |
| ] | |
| if not scenes_to_render: | |
| logger.end_operation("render_scenes", "All scenes already rendered") | |
| return | |
| logger.quick_log("info", f"Rendering {len(scenes_to_render)} scenes") | |
| for i, (scene_name, scene_data) in enumerate(scenes_to_render, 1): | |
| logger.progress_update(i, len(scenes_to_render), f"Rendering {scene_name}") | |
| # Example: Before rendering | |
| logger.quick_log("info", f"Rendering scene from file: {scene_data.get('code_file')}") | |
| scene_file_path = scene_data.get('code_file') | |
| if not os.path.exists(scene_file_path): | |
| logger.quick_log("error", f"Scene file not found: {scene_file_path}") | |
| # Skip rendering for this scene | |
| continue | |
| try: | |
| video_path = render_and_combine_scene( | |
| project_config, scene_name, scene_data.get("code_file"), | |
| manim_executable, renders_cache, debugger, 0, 1, 0, 1 # VERY CONSERVATIVE: 1 AI retry, 1 code regen max | |
| ) | |
| if video_path and Path(video_path).exists(): | |
| project_config["scenes"][scene_name]["final_video_path"] = str(video_path) | |
| project_config["scenes"][scene_name]["status"] = "rendered" | |
| else: | |
| project_config["scenes"][scene_name]["status"] = "render_failed" | |
| except Exception as e: | |
| logger.error_with_context(f"Rendering {scene_name} failed", str(e)) | |
| project_config["scenes"][scene_name]["status"] = "render_failed" | |
| success_count = sum(1 for _, data in project_config["scenes"].items() if data.get("status") == "rendered") | |
| logger.end_operation("render_scenes", f"Rendering complete: {success_count}/{len(scenes_to_render)} successful") | |