import ast import os import re import shutil import subprocess import tempfile import time import uuid from pathlib import Path import streamlit as st # --- Configuration --- TIMEOUT_SECONDS = 600 # 10 minutes # Known Manim Scene base classes SCENE_BASE_CLASSES = { "Scene", "ThreeDScene", "MovingCameraScene", "ZoomedScene", "VectorScene", "LinearTransformationScene", "SampleSpaceScene", } st.set_page_config( page_title="Manim Render Studio", page_icon="🎬", layout="wide", initial_sidebar_state="expanded", ) st.markdown( """ """, unsafe_allow_html=True, ) # --- Helper Functions --- def create_temp_dir() -> Path: """Creates a unique temporary directory for this render session.""" unique_id = uuid.uuid4().hex[:8] temp_dir = Path(tempfile.gettempdir()) / f"manim_{unique_id}" temp_dir.mkdir(parents=True, exist_ok=True) return temp_dir def cleanup_temp_dir(temp_dir: Path) -> None: """Safely removes the temporary directory.""" try: if temp_dir.exists(): shutil.rmtree(temp_dir) except Exception: pass # Ignore cleanup errors def find_video_file(output_dir: Path, output_name: str = "output.mp4") -> Path | None: """Recursively finds the mp4 file in the output directory.""" # First try the expected output path expected_paths = [ output_dir / "videos" / "scene" / "480p15" / output_name, output_dir / "videos" / "scene" / "720p30" / output_name, output_dir / "videos" / "scene" / "1080p60" / output_name, output_dir / "videos" / "scene" / "1440p60" / output_name, output_dir / "videos" / "scene" / "2160p60" / output_name, output_dir / output_name, ] for path in expected_paths: if path.exists(): return path # Fallback: search for any mp4 file mp4_files = list(output_dir.glob("**/*.mp4")) valid_files = [f for f in mp4_files if "partial_movie_files" not in str(f)] if valid_files: return max(valid_files, key=os.path.getmtime) return None def get_quality_flag(quality_label: str) -> str: mapping = { "Low (480p, Fast)": "-ql", "Medium (720p, Standard)": "-qm", "High (1080p, HD)": "-qh", "Extra High (1440p)": "-qp", "4K (2160p)": "-qk", } return mapping.get(quality_label, "-qm") def extract_scene_classes(source_code: str) -> list[str]: """Extract all classes that inherit from any Scene type.""" try: tree = ast.parse(source_code) scene_classes: list[str] = [] for node in ast.walk(tree): if not isinstance(node, ast.ClassDef): continue for base in node.bases: base_name = None if isinstance(base, ast.Name): base_name = base.id elif isinstance(base, ast.Attribute): base_name = base.attr if base_name and (base_name in SCENE_BASE_CLASSES or "Scene" in base_name): scene_classes.append(node.name) break if scene_classes: return scene_classes except SyntaxError: pass # Fallback: regex-based detection pattern = r"class\s+(\w+)\s*\([^)]*(?:Scene|ThreeDScene|MovingCameraScene)[^)]*\)" matches = re.findall(pattern, source_code) return matches if matches else [] def count_animations(source_code: str) -> int: """Count the number of self.play() and self.wait() calls in the code.""" # Count self.play( and self.wait( calls play_count = len(re.findall(r"self\.play\s*\(", source_code)) wait_count = len(re.findall(r"self\.wait\s*\(", source_code)) return max(play_count + wait_count, 1) # At least 1 to avoid division by zero def ensure_manim_import(code: str) -> str: """Ensures the code has manim import if it uses manim classes.""" if "from manim import" in code or "import manim" in code: return code manim_indicators = ["Scene", "Circle", "Square", "Tex", "MathTex", "Create", "Write"] if any(indicator in code for indicator in manim_indicators): return "from manim import *\n\n" + code return code class RenderProgressTracker: """Tracks render progress in real-time based on manim output.""" # Render stages with their progress weight STAGES = { "init": (0, 5, "Initializing..."), "parsing": (5, 10, "Parsing scene..."), "latex": (10, 20, "Compiling LaTeX..."), "rendering": (20, 85, "Rendering animations..."), "combining": (85, 95, "Combining video segments..."), "writing": (95, 99, "Writing final video..."), "done": (100, 100, "Complete!"), } def __init__(self, total_animations: int): self.total_animations = total_animations self.current_animation = 0 self.current_stage = "init" self.current_animation_progress = 0 self.last_status = "" def parse_line(self, line: str) -> tuple[int, str]: """ Parse a line of manim output and return (progress_percent, status_message). """ line_lower = line.lower().strip() # Skip empty lines if not line_lower: return self._calculate_progress(), self.last_status # Detect stage transitions if "error" in line_lower or "traceback" in line_lower: return self._calculate_progress(), f"Error: {line[:50]}..." # LaTeX compilation if "tex" in line_lower and ("writing" in line_lower or "compiling" in line_lower): self.current_stage = "latex" self.last_status = "Compiling LaTeX..." return self._calculate_progress(), self.last_status # Animation detection - look for "Animation X:" pattern anim_match = re.search(r"animation\s+(\d+)", line_lower) if anim_match: self.current_stage = "rendering" self.current_animation = int(anim_match.group(1)) # Extract animation name if present name_match = re.search(r"animation\s+\d+\s*:\s*(\w+)", line, re.IGNORECASE) anim_name = name_match.group(1) if name_match else "animation" self.last_status = f"Rendering {anim_name} ({self.current_animation}/{self.total_animations})" # Progress percentage within current animation percent_match = re.search(r"(\d+)%", line) if percent_match and self.current_stage == "rendering": self.current_animation_progress = int(percent_match.group(1)) return self._calculate_progress(), self.last_status # Combining/concatenating partial movies if "partial movie" in line_lower or "combining" in line_lower or "concatenat" in line_lower: self.current_stage = "combining" self.last_status = "Combining video segments..." return self._calculate_progress(), self.last_status # Writing final file if ("writing" in line_lower or "saved" in line_lower) and "tex" not in line_lower: self.current_stage = "writing" self.last_status = "Writing final video..." return self._calculate_progress(), self.last_status # File ready if "file ready" in line_lower or "movie ready" in line_lower: self.current_stage = "done" self.last_status = "Complete!" return 100, self.last_status # Scene initialization if "scene" in line_lower and self.current_stage == "init": self.current_stage = "parsing" self.last_status = "Parsing scene..." return self._calculate_progress(), self.last_status or "Processing..." def _calculate_progress(self) -> int: """Calculate overall progress based on current stage and animation.""" stage_start, stage_end, _ = self.STAGES.get(self.current_stage, (0, 5, "")) if self.current_stage == "rendering" and self.total_animations > 0: # Calculate progress within rendering stage based on animations render_start, render_end, _ = self.STAGES["rendering"] render_range = render_end - render_start # Progress = completed animations + current animation progress completed_progress = (self.current_animation - 1) / self.total_animations current_progress = (self.current_animation_progress / 100) / self.total_animations total_anim_progress = completed_progress + current_progress return int(render_start + (render_range * total_anim_progress)) return stage_start def run_manim_with_progress( cmd: list, cwd: str, timeout: int, total_animations: int, progress_bar, status_text, ) -> tuple[bool, str]: """ Run manim command with real-time progress updates. Returns (success: bool, log_output: str). """ log_lines: list[str] = [] tracker = RenderProgressTracker(total_animations) # Log the command being run log_lines.append(f"Command: {' '.join(cmd)}\n") log_lines.append(f"Working directory: {cwd}\n") log_lines.append("-" * 50 + "\n") status_text.text("Starting render...") progress_bar.progress(0) try: process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=cwd, bufsize=1, universal_newlines=True, ) start_time = time.time() last_progress = 0 while True: # Check timeout elapsed = time.time() - start_time if elapsed > timeout: process.kill() raise subprocess.TimeoutExpired(cmd, timeout) if process.stdout is None: break line = process.stdout.readline() if not line and process.poll() is not None: break if line: log_lines.append(line) progress, status = tracker.parse_line(line) # Only update if progress increased (avoid flickering) if progress > last_progress: last_progress = progress progress_bar.progress(min(progress, 100)) if status: status_text.text(status) # Wait for process to complete and get return code return_code = process.wait() # Log the return code log_lines.append("-" * 50 + "\n") log_lines.append(f"Process exited with code: {return_code}\n") # Final progress if return_code == 0: progress_bar.progress(100) status_text.text("Render complete!") else: status_text.text("Render failed!") return (return_code == 0, "".join(log_lines)) except subprocess.TimeoutExpired: log_lines.append(f"\nProcess timed out after {timeout} seconds\n") raise except Exception as e: log_lines.append(f"\nException: {str(e)}\n") return (False, "".join(log_lines)) # --- UI Layout --- with st.sidebar: st.title("Settings") quality = st.selectbox( "Render Quality", [ "Low (480p, Fast)", "Medium (720p, Standard)", "High (1080p, HD)", "Extra High (1440p)", "4K (2160p)", ], index=1, ) st.info( """ **Instructions:** 1. Paste your Manim code on the right. 2. Select the Scene class to render. 3. Click 'Render Scene'. """ ) st.markdown("---") st.markdown("Made with Manim & Streamlit") st.title("Manim Render Studio") st.markdown("### Paste your code below and bring your math to life.") # Default code with LaTeX example default_code = '''from manim import * class DemoScene(Scene): def construct(self): # LaTeX formula formula = MathTex(r"e^{i\\pi} + 1 = 0", font_size=72) formula.set_color(BLUE) # Title title = Text("Euler's Identity", font_size=48) title.next_to(formula, UP, buff=0.8) # Animate self.play(Write(title)) self.play(Write(formula)) self.wait(1) # Transform circle = Circle(color=PINK, fill_opacity=0.5) self.play(ReplacementTransform(formula, circle)) self.wait(0.5) ''' code_input = st.text_area("Python Code", value=default_code, height=400) # Auto-detect scene classes scene_candidates = extract_scene_classes(code_input) if scene_candidates: scene_name = st.selectbox("Scene Class (auto-detected)", scene_candidates) else: scene_name = st.text_input( "Scene Class Name", value="DemoScene", help="Could not auto-detect. Enter the class name manually.", ) col1, col2 = st.columns([1, 4]) with col1: render_button = st.button("Render Scene") if render_button: if not code_input.strip(): st.error("Please enter some Manim code.") elif not scene_name: st.error("Please specify a Scene class name.") else: # Prepare code (add import if missing) final_code = ensure_manim_import(code_input) # Check for syntax errors first try: ast.parse(final_code) except SyntaxError as e: st.error(f"Syntax error in your code at line {e.lineno}: {e.msg}") st.stop() # Count animations for progress tracking total_animations = count_animations(final_code) # Create unique temp directory for this render temp_dir = create_temp_dir() script_path = temp_dir / "scene.py" # UI elements for progress (defined outside try for cleanup) status_text = st.empty() progress_bar = st.progress(0) try: # Write code to file with open(script_path, "w", encoding="utf-8") as f: f.write(final_code) # Construct Command quality_flag = get_quality_flag(quality) cmd = [ "manim", str(script_path), scene_name, "-o", "output.mp4", "--media_dir", str(temp_dir), quality_flag, "--disable_caching", "-v", "INFO", # Verbose output for progress tracking ] # Run render with progress tracking success, log_output = run_manim_with_progress( cmd=cmd, cwd=str(temp_dir), timeout=TIMEOUT_SECONDS, total_animations=total_animations, progress_bar=progress_bar, status_text=status_text, ) # Clear progress UI status_text.empty() progress_bar.empty() # Find video file video_file = find_video_file(temp_dir) if success and video_file and video_file.exists(): # SUCCESS - Show video and download st.success("Render Successful!") # Read video bytes BEFORE cleanup video_bytes = video_file.read_bytes() # Video preview st.video(video_bytes) # Download button st.download_button( label="Download Video", data=video_bytes, file_name=f"{scene_name}.mp4", mime="video/mp4", ) # Logs (collapsed) with st.expander("Render Logs"): st.code(log_output, language="text") elif success and not video_file: # Render succeeded but no video found all_files = list(temp_dir.rglob("*")) debug_info = "\n\nFiles in temp directory:\n" for f in all_files: if f.is_file(): debug_info += f" {f}\n" st.error("Render completed but video file was not found.") with st.expander("Render Logs", expanded=True): st.code(log_output + debug_info, language="text") else: # FAILED - Show error and logs st.error("Render Failed!") with st.expander("Render Logs", expanded=True): st.code(log_output, language="text") except subprocess.TimeoutExpired: status_text.empty() progress_bar.empty() st.error(f"Render timed out after {TIMEOUT_SECONDS // 60} minutes.") except Exception as e: status_text.empty() progress_bar.empty() st.error(f"An unexpected error occurred: {str(e)}") import traceback with st.expander("Error Details", expanded=True): st.code(traceback.format_exc(), language="text") finally: # Always cleanup temp directory cleanup_temp_dir(temp_dir)