Spaces:
Sleeping
Sleeping
| import uuid | |
| import subprocess | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import List, Optional, Union | |
| import ast | |
| import logging | |
| import time | |
| import concurrent.futures | |
| import os | |
| logger = logging.getLogger(__name__) | |
| # Use absolute path for MEDIA_ROOT | |
| MEDIA_ROOT = Path(os.path.abspath("media/videos")) | |
| MEDIA_ROOT.mkdir(parents=True, exist_ok=True) | |
| class RenderError(Exception): | |
| """Custom exception for rendering errors.""" | |
| pass | |
| class ManimRenderer: | |
| """Enhanced Manim renderer with better error handling and optimization.""" | |
| def __init__(self, quality: str = "m", timeout: int = 300): | |
| self.quality = quality | |
| self.timeout = timeout | |
| self.temp_dirs = [] | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup_temp_dirs() | |
| def cleanup_temp_dirs(self): | |
| """Clean up temporary directories.""" | |
| for temp_dir in self.temp_dirs: | |
| try: | |
| if temp_dir.exists(): | |
| shutil.rmtree(temp_dir) | |
| except Exception as e: | |
| logger.warning(f"Failed to cleanup temp dir {temp_dir}: {e}") | |
| self.temp_dirs.clear() | |
| def _extract_scene_names(code: str) -> List[str]: | |
| """ | |
| Parse the Python AST and return all Scene subclass names in definition order. | |
| Enhanced with better error handling. | |
| """ | |
| try: | |
| tree = ast.parse(code) | |
| except SyntaxError as e: | |
| raise RenderError(f"Invalid Python syntax in generated code: {e}") | |
| scene_names = [] | |
| for node in tree.body: | |
| if isinstance(node, ast.ClassDef): | |
| for base in node.bases: | |
| if isinstance(base, ast.Name) and base.id == "Scene": | |
| scene_names.append(node.name) | |
| elif isinstance(base, ast.Attribute) and base.attr == "Scene": | |
| scene_names.append(node.name) | |
| if not scene_names: | |
| raise RenderError("No Scene subclasses found in generated code") | |
| logger.info(f"Found {len(scene_names)} scene(s): {', '.join(scene_names)}") | |
| return scene_names | |
| def _validate_code_safety(code: str) -> None: | |
| """Validate code for potentially dangerous operations.""" | |
| dangerous_patterns = [ | |
| "import os", | |
| "import sys", | |
| "import subprocess", | |
| "__import__", | |
| "eval(", | |
| "exec(", | |
| "open(", | |
| "file(", | |
| "input(", | |
| "raw_input(" | |
| ] | |
| for pattern in dangerous_patterns: | |
| if pattern in code.lower(): | |
| logger.warning(f"Potentially unsafe pattern detected: {pattern}") | |
| # In production, you might want to raise an error here | |
| # raise RenderError(f"Unsafe code pattern detected: {pattern}") | |
| def _create_render_environment() -> Path: | |
| """ | |
| Create a clean, temporary environment for rendering. | |
| """ | |
| temp_dir = Path(tempfile.mkdtemp(prefix="manim_render_")) | |
| # Create subdirectories | |
| (temp_dir / "media").mkdir(exist_ok=True) | |
| (temp_dir / "scenes").mkdir(exist_ok=True) | |
| return temp_dir | |
| def _run_manim_command(cmd: List[str], timeout: int = 300) -> tuple[int, str, str]: | |
| """ | |
| Run a Manim command with proper error handling and timeout. | |
| """ | |
| logger.info(f"Running command: {' '.join(cmd)}") | |
| process = None | |
| try: | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| env={**os.environ, "MANIM_DISABLE_CACHING": "1"} | |
| ) | |
| stdout, stderr = process.communicate(timeout=timeout) | |
| return process.returncode, stdout, stderr | |
| except subprocess.TimeoutExpired: | |
| if process: | |
| try: | |
| process.kill() | |
| process.wait(timeout=5) # Wait for process to terminate | |
| except subprocess.TimeoutExpired: | |
| process.terminate() # Force terminate if kill doesn't work | |
| process.wait(timeout=5) | |
| stdout, stderr = process.communicate() if process else ("", "") | |
| raise RenderError(f"Manim command timed out after {timeout} seconds") | |
| except Exception as e: | |
| if process: | |
| try: | |
| process.kill() | |
| process.wait(timeout=5) | |
| except: | |
| pass | |
| raise RenderError(f"Failed to execute Manim command: {e}") | |
| finally: | |
| if process and process.poll() is None: | |
| try: | |
| process.kill() | |
| process.wait(timeout=5) | |
| except: | |
| pass | |
| def _concatenate_videos(video_paths: List[Path], output_path: Path) -> None: | |
| """ | |
| Concatenate multiple video files using ffmpeg with better error handling. | |
| """ | |
| if len(video_paths) == 1: | |
| # Just copy the single file | |
| try: | |
| shutil.copy2(video_paths[0], output_path) | |
| return | |
| except (PermissionError, OSError) as e: | |
| raise RenderError(f"Failed to copy video file: {e}") | |
| # Create a temporary file list for ffmpeg | |
| concat_file = None | |
| try: | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: | |
| concat_file = Path(f.name) | |
| for video_path in video_paths: | |
| if not video_path.exists(): | |
| raise RenderError(f"Input video file not found: {video_path}") | |
| # Use as_posix() to ensure forward slashes, avoiding escape char issues on Windows | |
| f.write(f"file '{video_path.resolve().as_posix()}'\n") | |
| # Try lossless concatenation first | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", str(concat_file), | |
| "-c", "copy", | |
| str(output_path) | |
| ] | |
| returncode, stdout, stderr = _run_manim_command(cmd, timeout=120) | |
| if returncode != 0: | |
| logger.warning("Lossless concatenation failed, trying re-encoding...") | |
| # Fallback to re-encoding | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-f", "concat", | |
| "-safe", "0", | |
| "-i", str(concat_file), | |
| "-c:v", "libx264", | |
| "-c:a", "aac", | |
| "-preset", "fast", | |
| str(output_path) | |
| ] | |
| returncode, stdout, stderr = _run_manim_command(cmd, timeout=120) | |
| if returncode != 0: | |
| raise RenderError(f"Video concatenation failed: {stderr}") | |
| except (PermissionError, OSError) as e: | |
| raise RenderError(f"Failed to concatenate videos: {e}") | |
| finally: | |
| # Clean up temporary concat file | |
| if concat_file and concat_file.exists(): | |
| try: | |
| concat_file.unlink() | |
| except Exception as e: | |
| logger.warning(f"Failed to delete temporary concat file: {e}") | |
| def render_and_concat_all( | |
| code: str, | |
| quality: str = "m", | |
| timeout: int = 300 | |
| ) -> Path: | |
| """ | |
| Enhanced rendering function with better error handling, validation, and performance. | |
| Args: | |
| code: Python code containing Manim scene(s) | |
| quality: Render quality ("l", "m", "h") | |
| timeout: Maximum time allowed for rendering | |
| Returns: | |
| Path to the final rendered video | |
| Raises: | |
| RenderError: If rendering fails | |
| """ | |
| start_time = time.time() | |
| # Validate inputs | |
| if not isinstance(code, str) or not code.strip(): | |
| raise RenderError("Code must be a non-empty string") | |
| if quality not in ["l", "m", "h"]: | |
| raise RenderError(f"Invalid quality '{quality}'. Must be 'l', 'm', or 'h'") | |
| # Safety validation | |
| _validate_code_safety(code) | |
| # Extract scene names | |
| scene_names = _extract_scene_names(code) | |
| with ManimRenderer(quality, timeout) as renderer: | |
| # Create temporary working directory | |
| work_dir = _create_render_environment() | |
| renderer.temp_dirs.append(work_dir) | |
| # Create unique run ID | |
| run_id = f"{uuid.uuid4().hex}_{int(time.time())}" | |
| final_output_dir = MEDIA_ROOT / run_id | |
| final_output_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # Write code to script file | |
| script_path = work_dir / "animation_script.py" | |
| script_path.write_text(code, encoding="utf-8") | |
| # Setup media directory | |
| media_dir = work_dir / "media" | |
| # Build Manim command | |
| cmd = [ | |
| "manim", | |
| "render", | |
| str(script_path), | |
| *scene_names, | |
| f"-q{quality}", | |
| "--disable_caching", | |
| "--media_dir", str(media_dir), | |
| "--verbosity", "WARNING" # Reduce log verbosity | |
| ] | |
| # Add performance optimizations | |
| if quality == "l": | |
| cmd.extend(["--frame_rate", "15"]) | |
| # Execute Manim rendering | |
| logger.info(f"Starting Manim render for {len(scene_names)} scene(s)") | |
| returncode, stdout, stderr = _run_manim_command(cmd, timeout) | |
| if returncode != 0: | |
| error_msg = f"Manim rendering failed (exit code {returncode})" | |
| if stderr: | |
| error_msg += f"\nError output:\n{stderr}" | |
| if stdout: | |
| error_msg += f"\nStandard output:\n{stdout}" | |
| raise RenderError(error_msg) | |
| # Find generated video files | |
| video_files = list(media_dir.rglob("*.mp4")) | |
| if not video_files: | |
| raise RenderError(f"No video files generated in {media_dir}") | |
| logger.info(f"Found {len(video_files)} video file(s)") | |
| # Sort videos by scene order if multiple | |
| if len(video_files) > 1: | |
| # Try to match scene names to file names for proper ordering | |
| ordered_videos = [] | |
| for scene_name in scene_names: | |
| matching_videos = [v for v in video_files if scene_name in v.name] | |
| if matching_videos: | |
| ordered_videos.extend(matching_videos) | |
| # Add any remaining videos | |
| for video in video_files: | |
| if video not in ordered_videos: | |
| ordered_videos.append(video) | |
| video_files = ordered_videos | |
| # Determine final output path | |
| final_video_path = final_output_dir / "final_animation.mp4" | |
| # Concatenate videos if multiple, otherwise just copy | |
| if len(video_files) == 1: | |
| logger.info("Single video file, copying to final location") | |
| shutil.copy2(video_files[0], final_video_path) | |
| else: | |
| logger.info(f"Concatenating {len(video_files)} video files") | |
| _concatenate_videos(video_files, final_video_path) | |
| # Verify final video exists and has reasonable size | |
| if not final_video_path.exists(): | |
| raise RenderError("Final video file was not created") | |
| file_size = final_video_path.stat().st_size | |
| if file_size < 1024: # Less than 1KB is suspicious | |
| raise RenderError(f"Generated video file is too small ({file_size} bytes)") | |
| elapsed_time = time.time() - start_time | |
| logger.info(f"Rendering completed successfully in {elapsed_time:.2f}s, output: {final_video_path}") | |
| logger.info(f"Final video size: {file_size / 1024 / 1024:.2f} MB") | |
| return final_video_path | |
| except Exception as e: | |
| logger.error(f"Rendering failed: {str(e)}") | |
| # Clean up failed output directory | |
| if final_output_dir.exists(): | |
| try: | |
| shutil.rmtree(final_output_dir) | |
| except Exception: | |
| pass | |
| raise RenderError(f"Rendering pipeline failed: {str(e)}") | |
| def get_video_info(video_path: Path) -> dict: | |
| """ | |
| Get information about a rendered video file. | |
| """ | |
| if not video_path.exists(): | |
| return {"error": "Video file not found"} | |
| try: | |
| stat = video_path.stat() | |
| return { | |
| "path": str(video_path), | |
| "size_bytes": stat.st_size, | |
| "size_mb": round(stat.st_size / 1024 / 1024, 2), | |
| "created_time": stat.st_ctime, | |
| "modified_time": stat.st_mtime, | |
| } | |
| except Exception as e: | |
| return {"error": f"Failed to get video info: {e}"} | |
| def cleanup_old_renders(max_age_hours: int = 24, max_total_size_gb: float = 5.0): | |
| """ | |
| Clean up old render directories to manage disk space. | |
| Args: | |
| max_age_hours: Delete files older than this many hours | |
| max_total_size_gb: If total size exceeds this, delete oldest first | |
| """ | |
| try: | |
| current_time = time.time() | |
| cutoff_time = current_time - (max_age_hours * 3600) | |
| render_dirs = [] | |
| total_size = 0 | |
| # Collect all render directories with metadata | |
| for item in MEDIA_ROOT.iterdir(): | |
| if item.is_dir(): | |
| try: | |
| dir_size = sum(f.stat().st_size for f in item.rglob('*') if f.is_file()) | |
| dir_mtime = max((f.stat().st_mtime for f in item.rglob('*') if f.is_file()), default=0) | |
| render_dirs.append({ | |
| 'path': item, | |
| 'size': dir_size, | |
| 'mtime': dir_mtime, | |
| 'age_hours': (current_time - dir_mtime) / 3600 | |
| }) | |
| total_size += dir_size | |
| except Exception: | |
| continue | |
| deleted_count = 0 | |
| freed_space = 0 | |
| # Sort by modification time (oldest first) | |
| render_dirs.sort(key=lambda x: x['mtime']) | |
| # Delete old directories | |
| for dir_info in render_dirs: | |
| should_delete = False | |
| # Delete if too old | |
| if dir_info['mtime'] < cutoff_time: | |
| should_delete = True | |
| reason = f"older than {max_age_hours} hours" | |
| # Delete oldest if total size too large | |
| elif total_size > max_total_size_gb * 1024 * 1024 * 1024: | |
| should_delete = True | |
| reason = f"total size exceeded {max_total_size_gb}GB" | |
| if should_delete: | |
| try: | |
| shutil.rmtree(dir_info['path']) | |
| deleted_count += 1 | |
| freed_space += dir_info['size'] | |
| total_size -= dir_info['size'] | |
| logger.info(f"Deleted render directory {dir_info['path'].name} ({reason})") | |
| except Exception as e: | |
| logger.warning(f"Failed to delete {dir_info['path']}: {e}") | |
| if deleted_count > 0: | |
| logger.info(f"Cleanup completed: deleted {deleted_count} directories, freed {freed_space / 1024 / 1024:.1f} MB") | |
| return { | |
| "deleted_directories": deleted_count, | |
| "freed_space_mb": round(freed_space / 1024 / 1024, 1), | |
| "remaining_directories": len(render_dirs) - deleted_count, | |
| "total_size_mb": round(total_size / 1024 / 1024, 1) | |
| } | |
| except Exception as e: | |
| logger.error(f"Cleanup failed: {e}") | |
| return {"error": str(e)} |