import gradio as gr import os import subprocess import re import time import shutil import glob import base64 import tempfile import requests import json from datetime import datetime # --------------------------------------------------------- # 0. Error Logging Helper (GitHub Gist) # --------------------------------------------------------- def log_to_gist(content): """ Silently appends the error content to a GitHub Gist file. Uses environment variables for security. """ token = os.getenv("GITHUB_TOKEN") gist_id = os.getenv("GIST_ID") if not token or not gist_id: print("⚠️ GitHub Token or Gist ID not set. Skipping error logging.", flush=True) return filename = "manim_error_history.txt" url = f"https://api.github.com/gists/{gist_id}" headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json" } try: # 1. Fetch existing content resp = requests.get(url, headers=headers, timeout=5) if resp.status_code != 200: print(f"⚠️ Failed to fetch Gist history: {resp.status_code}", flush=True) return current_data = resp.json() old_content = current_data.get('files', {}).get(filename, {}).get('content', "") # 2. Format new entry (Newest at the top) timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") new_entry = f"{'='*40}\nDATE: {timestamp}\n{content}\n\n" updated_content = new_entry + old_content # 3. Update Gist payload = {"files": {filename: {"content": updated_content}}} patch_resp = requests.patch(url, headers=headers, data=json.dumps(payload), timeout=5) if patch_resp.status_code == 200: print(f"📋 Error logged to GitHub history. URL: https://gist.github.com/{gist_id}", flush=True) else: print(f"⚠️ Failed to update Gist: {patch_resp.text}", flush=True) except Exception as e: print(f"⚠️ Gist Logging Exception: {str(e)}", flush=True) # --------------------------------------------------------- # 1. Helper Functions # --------------------------------------------------------- def modify_animation_times(code: str, factor: float) -> str: """ Safely scales all numeric run_time and wait() values by a given factor. """ print(f"⚡ Scaling animation times by a factor of {factor}...", flush=True) MIN_RUN_TIME = 0.01 def scale_match(m, is_wait): try: val = float(m.group(2)) new_val = val * factor final_val = new_val if is_wait else max(new_val, MIN_RUN_TIME) return f"{m.group(1)}{final_val:.3f}" except ValueError: return m.group(0) code = re.sub(r"(run_time\s*=\s*)(\d+\.?\d*)", lambda m: scale_match(m, False), code) code = re.sub(r"(self\.wait\s*\(\s*)(\d+\.?\d*)", lambda m: scale_match(m, True), code) return code def run_manim_pre_check(code_str: str) -> (bool, str): """ Runs Manim with '-s'. """ print("🕵️ Running fast pre-check with 'manim -s'...", flush=True) fast_code = modify_animation_times(code_str, factor=0.1) with open("scene_pre_check.py", "w", encoding="utf-8") as f: f.write(fast_code) cmd = ["manim", "-ql", "--progress_bar", "none", "--disable_caching", "scene_pre_check.py", "GenScene", "-s", "-o", "pre_check_output"] try: process = subprocess.run(cmd, capture_output=True, timeout=30, check=False) if process.returncode == 0: print("✅ Pre-check passed.", flush=True) return True, "Pre-check successful." else: stderr_log = process.stderr.decode('utf-8', 'ignore') # --- LOGGING TO GITHUB --- log_to_gist(f"PRE-CHECK FAILED:\n{stderr_log}") print(f"❌ Pre-check failed with a critical error.\n{stderr_log}", flush=True) return False, f"⚠️ ERROR: Your code failed the pre-check.\n\n--- ERROR LOG ---\n{stderr_log}" except subprocess.TimeoutExpired: print("⌛ Pre-check timed out (30s). Soft Pass.", flush=True) return True, "⚠️ Warning: Pre-check timed out. Proceeding to full render..." def cleanup_media_directory(): media_dir = 'media' if os.path.exists(media_dir): try: shutil.rmtree(media_dir) except OSError: pass def make_even(n): return int(n) if int(n) % 2 == 0 else int(n) + 1 def get_resolution_flags(orientation, quality): qual_map = {"Preview (360p)": 360, "480p": 480, "720p": 720, "1080p": 1080, "4k": 2160} base_h = qual_map.get(quality, 1080) if orientation == "Landscape (16:9)": width, height = make_even(base_h * (16/9)), make_even(base_h) else: width, height = make_even(base_h), make_even(base_h * (16/9)) return f"{width},{height}" def run_manim(code_str, orientation, quality, timeout): """ Executes Manim. Partial stitching on Timeout. """ timeout_sec = float(timeout) if timeout and float(timeout) > 0 else None print(f"🎬 Starting Full Render: {orientation} @ {quality} (Timeout: {timeout_sec}s)...", flush=True) with open("scene.py", "w", encoding="utf-8") as f: f.write(code_str) timestamp = int(time.time()) output_filename = f"video_{timestamp}.mp4" res_str = get_resolution_flags(orientation, quality) frame_rate_flags = ["--frame_rate", "15"] if quality == "Preview (360p)" else [] cmd = ["manim", "--resolution", res_str, *frame_rate_flags, "--disable_caching", "--progress_bar", "none", "scene.py", "GenScene", "-o", output_filename] full_logs = "" try: process = subprocess.run(cmd, capture_output=True, timeout=timeout_sec, check=False) stdout_log = process.stdout.decode('utf-8', 'ignore') stderr_log = process.stderr.decode('utf-8', 'ignore') full_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}" if process.returncode != 0: print(f"❌ Render Failed (Process Error). Return Code: {process.returncode}", flush=True) except subprocess.TimeoutExpired as e: print(f"⌛ Render timed out after {timeout_sec} seconds. Attempting recovery...", flush=True) stdout_log = e.stdout.decode('utf-8', 'ignore') if e.stdout else "" stderr_log = e.stderr.decode('utf-8', 'ignore') if e.stderr else "" timeout_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}" full_logs = timeout_logs combined_log = stdout_log + "\n" + stderr_log path_matches = re.findall(r"movie file written in\s*'([^']+?)'", combined_log, flags=re.DOTALL) if path_matches: partial_files_dir = os.path.dirname("".join(path_matches[-1].split())) if os.path.exists(partial_files_dir): partial_files = sorted(glob.glob(os.path.join(partial_files_dir, 'uncached_*.mp4')), key=lambda f: int(re.search(r'(\d+)\.mp4$', f).group(1))) if partial_files: print(f"⚡ Found {len(partial_files)} partial chunks. Stitching...", flush=True) list_file_path = os.path.join(partial_files_dir, "file_list.txt") with open(list_file_path, 'w') as f: for pf in partial_files: f.write(f"file '{os.path.abspath(pf)}'\n") combined_video_path = os.path.join(os.path.dirname(partial_files_dir), f"combined_partial_{timestamp}.mp4") ffmpeg_cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file_path, "-c", "copy", combined_video_path] ffmpeg_process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False) if ffmpeg_process.returncode == 0 and os.path.exists(combined_video_path): print(f"✅ Recovery Successful: {combined_video_path}", flush=True) return combined_video_path, f"⚠️ WARNING: Render Timed Out. Recovered partial video.\n\n{timeout_logs}", True print("❌ Recovery failed or no partial files found.", flush=True) media_video_base = os.path.join("media", "videos", "scene") if os.path.exists(media_video_base): for root, _, files in os.walk(media_video_base): if output_filename in files: found_video_path = os.path.join(root, output_filename) print(f"✅ Video Render Success: {found_video_path}", flush=True) return found_video_path, f"✅ Rendering Successful\n\n{full_logs}", True # --- LOGGING TO GITHUB --- # If we reached here, the video was not found or failed completely. log_to_gist(f"RENDER FAILED:\n{full_logs}") return None, f"❌ Failure: Video file was not created.\n\n{full_logs}", False # --------------------------------------------------------- # 2. Main API Function # --------------------------------------------------------- def render_video_from_code(code, orientation, quality, timeout, preview_factor): try: is_valid, logs = run_manim_pre_check(code) if not is_valid: return None, logs, gr.Button(visible=True) cleanup_media_directory() if not code or "from manim import" not in code: return None, "Error: No valid code.", gr.Button(visible=False) if quality == "Preview (360p)": code_to_render = modify_animation_times(code, factor=float(preview_factor) or 0.5) else: code_to_render = code video_path, logs, success = run_manim(code_to_render, orientation, quality, timeout) return video_path, logs, gr.Button(visible=not success) except Exception as e: # --- LOGGING TO GITHUB --- log_to_gist(f"CRITICAL APP EXCEPTION:\n{str(e)}") return None, f"Rendering failed: {str(e)}", gr.Button(visible=True) # --------------------------------------------------------- # 3. Audio Merging Functions (API-Safe) # --------------------------------------------------------- def get_media_duration(file_path): """Uses ffprobe to get the duration of a media file.""" cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return float(result.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError): print(f"⚠️ Could not get duration for {file_path}. Is ffprobe installed?", flush=True) return None def build_atempo_filter(factor): """Builds a chained atempo filter for FFmpeg to handle extreme speed changes.""" filters = [] while factor > 100.0: filters.append("atempo=100.0"); factor /= 100.0 while factor < 0.5: filters.append("atempo=0.5"); factor /= 0.5 if 0.5 <= factor <= 100.0: filters.append(f"atempo={factor}") return ",".join(filters) def decode_base64_to_tempfile(data_obj): """Decodes a base64 string from a Gradio file object and saves to a temp file.""" if not data_obj or 'data' not in data_obj: return None header, encoded_data = data_obj['data'].split(",", 1) file_extension = header.split('/')[1].split(';')[0] decoded_data = base64.b64decode(encoded_data) with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: tmp_file.write(decoded_data) return tmp_file.name def merge_audio_to_video(video_input, audio_input): """ Merges audio into a video. Handles both file paths (from UI) and base64 dictionaries (from API) as inputs. """ video_path, audio_path = None, None temp_files_to_clean = [] try: if isinstance(video_input, str) and os.path.exists(video_input): video_path = video_input elif isinstance(video_input, dict): print("API call detected: Decoding video from base64.", flush=True) video_path = decode_base64_to_tempfile(video_input) if video_path: temp_files_to_clean.append(video_path) if isinstance(audio_input, str) and os.path.exists(audio_input): audio_path = audio_input elif isinstance(audio_input, dict): print("API call detected: Decoding audio from base64.", flush=True) audio_path = decode_base64_to_tempfile(audio_input) if audio_path: temp_files_to_clean.append(audio_path) if not video_path or not audio_path: return None, "Error: Missing video or audio file. Please provide both." gr.Info("Merging audio...") video_duration = get_media_duration(video_path) audio_duration = get_media_duration(audio_path) if video_duration is None or audio_duration is None: return None, "Error: Could not determine media durations." if video_duration == 0: return None, "Error: Input video has zero duration." speed_factor = audio_duration / video_duration atempo_filter = build_atempo_filter(speed_factor) output_dir = "temp_outputs"; os.makedirs(output_dir, exist_ok=True) timestamp = int(time.time()) output_path = os.path.join(output_dir, f"merged_video_{timestamp}.mp4") ffmpeg_cmd = [ "ffmpeg", "-y", "-i", video_path, "-i", audio_path, "-c:v", "copy", "-filter:a", atempo_filter, "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path ] process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) if process.returncode != 0: error_message = f"FFmpeg Error:\n{process.stderr}" print(error_message, flush=True) return None, error_message print(f"✅ Audio merged successfully: {output_path}", flush=True) return output_path, "✅ Audio merged successfully!" finally: print(f"Cleaning up {len(temp_files_to_clean)} temporary files...", flush=True) for f in temp_files_to_clean: try: os.remove(f) except OSError as e: print(f"Error removing temp file {f}: {e}", flush=True) # --------------------------------------------------------- # 4. Gradio Interface # --------------------------------------------------------- DEFAULT_CODE = """from manim import * class GenScene(Scene): def construct(self): c = Circle(color=BLUE, fill_opacity=0.5) self.play(Create(c)) self.wait(1) """ with gr.Blocks(title="Manim Render & Audio Tool") as demo: with gr.Tab("🎬 Manim Video Renderer"): with gr.Row(): with gr.Column(scale=1): code_input = gr.Code(label="Python Code", language="python", value=DEFAULT_CODE, visible=False) orientation_opt = gr.Radio(choices=["Landscape (16:9)", "Portrait (9:16)"], value="Portrait (9:16)", label="Orientation", visible=False) quality_opt = gr.Dropdown(choices=["Preview (360p)", "480p", "720p", "1080p", "4k"], value="Preview (360p)", label="Quality", visible=False) timeout_input = gr.Number(label="Render Timeout (seconds)", value=60, visible=False) preview_speed_factor_input = gr.Number(label="Preview Speed Factor", value=0.5, visible=False) render_btn = gr.Button("Render", variant="primary") with gr.Column(scale=1): video_output = gr.Video(label="Result") status_output = gr.Textbox(label="Status/Logs") fix_btn_output = gr.Button("Fix Error & Re-render", variant="stop", visible=False) render_btn.click( fn=render_video_from_code, inputs=[code_input, orientation_opt, quality_opt, timeout_input, preview_speed_factor_input], outputs=[video_output, status_output, fix_btn_output], api_name="render" ) with gr.Tab("🎤 Add Audio to Video"): gr.Markdown("## Merge Audio into Video") gr.Markdown("Upload a video and an audio file. The audio will be automatically stretched or compressed to match the video's length, preserving original video quality.") with gr.Row(): with gr.Column(): video_input_audio_tab = gr.Video(label="Input Video (MP4)") audio_input_audio_tab = gr.Audio(label="Input Audio", type="filepath") merge_audio_btn = gr.Button("Merge Audio", variant="primary") with gr.Column(): video_output_audio_tab = gr.Video(label="Merged Video") status_audio_tab = gr.Textbox(label="Status") merge_audio_btn.click( fn=merge_audio_to_video, inputs=[video_input_audio_tab, audio_input_audio_tab], outputs=[video_output_audio_tab, status_audio_tab], api_name="add_audio" ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)