Spaces:
Running
Running
| import gradio as gr | |
| import os | |
| import subprocess | |
| import re | |
| import time | |
| import shutil | |
| import glob | |
| import base64 | |
| import tempfile | |
| import requests | |
| import json | |
| from datetime import datetime | |
| # --------------------------------------------------------- | |
| # 0. Error Logging Helper (GitHub Gist) | |
| # --------------------------------------------------------- | |
| def log_to_gist(content): | |
| """ | |
| Silently appends the error content to a GitHub Gist file. | |
| Uses environment variables for security. | |
| """ | |
| token = os.getenv("GITHUB_TOKEN") | |
| gist_id = os.getenv("GIST_ID") | |
| if not token or not gist_id: | |
| print("⚠️ GitHub Token or Gist ID not set. Skipping error logging.", flush=True) | |
| return | |
| filename = "manim_error_history.txt" | |
| url = f"https://api.github.com/gists/{gist_id}" | |
| headers = { | |
| "Authorization": f"token {token}", | |
| "Accept": "application/vnd.github.v3+json" | |
| } | |
| try: | |
| # 1. Fetch existing content | |
| resp = requests.get(url, headers=headers, timeout=5) | |
| if resp.status_code != 200: | |
| print(f"⚠️ Failed to fetch Gist history: {resp.status_code}", flush=True) | |
| return | |
| current_data = resp.json() | |
| old_content = current_data.get('files', {}).get(filename, {}).get('content', "") | |
| # 2. Format new entry (Newest at the top) | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| new_entry = f"{'='*40}\nDATE: {timestamp}\n{content}\n\n" | |
| updated_content = new_entry + old_content | |
| # 3. Update Gist | |
| payload = {"files": {filename: {"content": updated_content}}} | |
| patch_resp = requests.patch(url, headers=headers, data=json.dumps(payload), timeout=5) | |
| if patch_resp.status_code == 200: | |
| print(f"📋 Error logged to GitHub history. URL: https://gist.github.com/{gist_id}", flush=True) | |
| else: | |
| print(f"⚠️ Failed to update Gist: {patch_resp.text}", flush=True) | |
| except Exception as e: | |
| print(f"⚠️ Gist Logging Exception: {str(e)}", flush=True) | |
| # --------------------------------------------------------- | |
| # 1. Helper Functions | |
| # --------------------------------------------------------- | |
| def modify_animation_times(code: str, factor: float) -> str: | |
| """ | |
| Safely scales all numeric run_time and wait() values by a given factor. | |
| """ | |
| print(f"⚡ Scaling animation times by a factor of {factor}...", flush=True) | |
| MIN_RUN_TIME = 0.01 | |
| def scale_match(m, is_wait): | |
| try: | |
| val = float(m.group(2)) | |
| new_val = val * factor | |
| final_val = new_val if is_wait else max(new_val, MIN_RUN_TIME) | |
| return f"{m.group(1)}{final_val:.3f}" | |
| except ValueError: | |
| return m.group(0) | |
| code = re.sub(r"(run_time\s*=\s*)(\d+\.?\d*)", lambda m: scale_match(m, False), code) | |
| code = re.sub(r"(self\.wait\s*\(\s*)(\d+\.?\d*)", lambda m: scale_match(m, True), code) | |
| return code | |
| def run_manim_pre_check(code_str: str) -> (bool, str): | |
| """ | |
| Runs Manim with '-s'. | |
| """ | |
| print("🕵️ Running fast pre-check with 'manim -s'...", flush=True) | |
| fast_code = modify_animation_times(code_str, factor=0.1) | |
| with open("scene_pre_check.py", "w", encoding="utf-8") as f: | |
| f.write(fast_code) | |
| cmd = ["manim", "-ql", "--progress_bar", "none", "--disable_caching", "scene_pre_check.py", "GenScene", "-s", "-o", "pre_check_output"] | |
| try: | |
| process = subprocess.run(cmd, capture_output=True, timeout=30, check=False) | |
| if process.returncode == 0: | |
| print("✅ Pre-check passed.", flush=True) | |
| return True, "Pre-check successful." | |
| else: | |
| stderr_log = process.stderr.decode('utf-8', 'ignore') | |
| # --- LOGGING TO GITHUB --- | |
| log_to_gist(f"PRE-CHECK FAILED:\n{stderr_log}") | |
| print(f"❌ Pre-check failed with a critical error.\n{stderr_log}", flush=True) | |
| return False, f"⚠️ ERROR: Your code failed the pre-check.\n\n--- ERROR LOG ---\n{stderr_log}" | |
| except subprocess.TimeoutExpired: | |
| print("⌛ Pre-check timed out (30s). Soft Pass.", flush=True) | |
| return True, "⚠️ Warning: Pre-check timed out. Proceeding to full render..." | |
| def cleanup_media_directory(): | |
| media_dir = 'media' | |
| if os.path.exists(media_dir): | |
| try: shutil.rmtree(media_dir) | |
| except OSError: pass | |
| def make_even(n): | |
| return int(n) if int(n) % 2 == 0 else int(n) + 1 | |
| def get_resolution_flags(orientation, quality): | |
| qual_map = {"Preview (360p)": 360, "480p": 480, "720p": 720, "1080p": 1080, "4k": 2160} | |
| base_h = qual_map.get(quality, 1080) | |
| if orientation == "Landscape (16:9)": | |
| width, height = make_even(base_h * (16/9)), make_even(base_h) | |
| else: | |
| width, height = make_even(base_h), make_even(base_h * (16/9)) | |
| return f"{width},{height}" | |
| def run_manim(code_str, orientation, quality, timeout): | |
| """ | |
| Executes Manim. Partial stitching on Timeout. | |
| """ | |
| timeout_sec = float(timeout) if timeout and float(timeout) > 0 else None | |
| print(f"🎬 Starting Full Render: {orientation} @ {quality} (Timeout: {timeout_sec}s)...", flush=True) | |
| with open("scene.py", "w", encoding="utf-8") as f: f.write(code_str) | |
| timestamp = int(time.time()) | |
| output_filename = f"video_{timestamp}.mp4" | |
| res_str = get_resolution_flags(orientation, quality) | |
| frame_rate_flags = ["--frame_rate", "15"] if quality == "Preview (360p)" else [] | |
| cmd = ["manim", "--resolution", res_str, *frame_rate_flags, "--disable_caching", | |
| "--progress_bar", "none", "scene.py", "GenScene", "-o", output_filename] | |
| full_logs = "" | |
| try: | |
| process = subprocess.run(cmd, capture_output=True, timeout=timeout_sec, check=False) | |
| stdout_log = process.stdout.decode('utf-8', 'ignore') | |
| stderr_log = process.stderr.decode('utf-8', 'ignore') | |
| full_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}" | |
| if process.returncode != 0: | |
| print(f"❌ Render Failed (Process Error). Return Code: {process.returncode}", flush=True) | |
| except subprocess.TimeoutExpired as e: | |
| print(f"⌛ Render timed out after {timeout_sec} seconds. Attempting recovery...", flush=True) | |
| stdout_log = e.stdout.decode('utf-8', 'ignore') if e.stdout else "" | |
| stderr_log = e.stderr.decode('utf-8', 'ignore') if e.stderr else "" | |
| timeout_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}" | |
| full_logs = timeout_logs | |
| combined_log = stdout_log + "\n" + stderr_log | |
| path_matches = re.findall(r"movie file written in\s*'([^']+?)'", combined_log, flags=re.DOTALL) | |
| if path_matches: | |
| partial_files_dir = os.path.dirname("".join(path_matches[-1].split())) | |
| if os.path.exists(partial_files_dir): | |
| partial_files = sorted(glob.glob(os.path.join(partial_files_dir, 'uncached_*.mp4')), | |
| key=lambda f: int(re.search(r'(\d+)\.mp4$', f).group(1))) | |
| if partial_files: | |
| print(f"⚡ Found {len(partial_files)} partial chunks. Stitching...", flush=True) | |
| list_file_path = os.path.join(partial_files_dir, "file_list.txt") | |
| with open(list_file_path, 'w') as f: | |
| for pf in partial_files: | |
| f.write(f"file '{os.path.abspath(pf)}'\n") | |
| combined_video_path = os.path.join(os.path.dirname(partial_files_dir), f"combined_partial_{timestamp}.mp4") | |
| ffmpeg_cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file_path, "-c", "copy", combined_video_path] | |
| ffmpeg_process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False) | |
| if ffmpeg_process.returncode == 0 and os.path.exists(combined_video_path): | |
| print(f"✅ Recovery Successful: {combined_video_path}", flush=True) | |
| return combined_video_path, f"⚠️ WARNING: Render Timed Out. Recovered partial video.\n\n{timeout_logs}", True | |
| print("❌ Recovery failed or no partial files found.", flush=True) | |
| media_video_base = os.path.join("media", "videos", "scene") | |
| if os.path.exists(media_video_base): | |
| for root, _, files in os.walk(media_video_base): | |
| if output_filename in files: | |
| found_video_path = os.path.join(root, output_filename) | |
| print(f"✅ Video Render Success: {found_video_path}", flush=True) | |
| return found_video_path, f"✅ Rendering Successful\n\n{full_logs}", True | |
| # --- LOGGING TO GITHUB --- | |
| # If we reached here, the video was not found or failed completely. | |
| log_to_gist(f"RENDER FAILED:\n{full_logs}") | |
| return None, f"❌ Failure: Video file was not created.\n\n{full_logs}", False | |
| # --------------------------------------------------------- | |
| # 2. Main API Function | |
| # --------------------------------------------------------- | |
| def render_video_from_code(code, orientation, quality, timeout, preview_factor): | |
| try: | |
| is_valid, logs = run_manim_pre_check(code) | |
| if not is_valid: | |
| return None, logs, gr.Button(visible=True) | |
| cleanup_media_directory() | |
| if not code or "from manim import" not in code: | |
| return None, "Error: No valid code.", gr.Button(visible=False) | |
| if quality == "Preview (360p)": | |
| code_to_render = modify_animation_times(code, factor=float(preview_factor) or 0.5) | |
| else: | |
| code_to_render = code | |
| video_path, logs, success = run_manim(code_to_render, orientation, quality, timeout) | |
| return video_path, logs, gr.Button(visible=not success) | |
| except Exception as e: | |
| # --- LOGGING TO GITHUB --- | |
| log_to_gist(f"CRITICAL APP EXCEPTION:\n{str(e)}") | |
| return None, f"Rendering failed: {str(e)}", gr.Button(visible=True) | |
| # --------------------------------------------------------- | |
| # 3. Audio Merging Functions (API-Safe) | |
| # --------------------------------------------------------- | |
| def get_media_duration(file_path): | |
| """Uses ffprobe to get the duration of a media file.""" | |
| cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except (subprocess.CalledProcessError, FileNotFoundError): | |
| print(f"⚠️ Could not get duration for {file_path}. Is ffprobe installed?", flush=True) | |
| return None | |
| def build_atempo_filter(factor): | |
| """Builds a chained atempo filter for FFmpeg to handle extreme speed changes.""" | |
| filters = [] | |
| while factor > 100.0: | |
| filters.append("atempo=100.0"); factor /= 100.0 | |
| while factor < 0.5: | |
| filters.append("atempo=0.5"); factor /= 0.5 | |
| if 0.5 <= factor <= 100.0: | |
| filters.append(f"atempo={factor}") | |
| return ",".join(filters) | |
| def decode_base64_to_tempfile(data_obj): | |
| """Decodes a base64 string from a Gradio file object and saves to a temp file.""" | |
| if not data_obj or 'data' not in data_obj: return None | |
| header, encoded_data = data_obj['data'].split(",", 1) | |
| file_extension = header.split('/')[1].split(';')[0] | |
| decoded_data = base64.b64decode(encoded_data) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file: | |
| tmp_file.write(decoded_data) | |
| return tmp_file.name | |
| def merge_audio_to_video(video_input, audio_input): | |
| """ | |
| Merges audio into a video. Handles both file paths (from UI) and | |
| base64 dictionaries (from API) as inputs. | |
| """ | |
| video_path, audio_path = None, None | |
| temp_files_to_clean = [] | |
| try: | |
| if isinstance(video_input, str) and os.path.exists(video_input): | |
| video_path = video_input | |
| elif isinstance(video_input, dict): | |
| print("API call detected: Decoding video from base64.", flush=True) | |
| video_path = decode_base64_to_tempfile(video_input) | |
| if video_path: temp_files_to_clean.append(video_path) | |
| if isinstance(audio_input, str) and os.path.exists(audio_input): | |
| audio_path = audio_input | |
| elif isinstance(audio_input, dict): | |
| print("API call detected: Decoding audio from base64.", flush=True) | |
| audio_path = decode_base64_to_tempfile(audio_input) | |
| if audio_path: temp_files_to_clean.append(audio_path) | |
| if not video_path or not audio_path: | |
| return None, "Error: Missing video or audio file. Please provide both." | |
| gr.Info("Merging audio...") | |
| video_duration = get_media_duration(video_path) | |
| audio_duration = get_media_duration(audio_path) | |
| if video_duration is None or audio_duration is None: | |
| return None, "Error: Could not determine media durations." | |
| if video_duration == 0: | |
| return None, "Error: Input video has zero duration." | |
| speed_factor = audio_duration / video_duration | |
| atempo_filter = build_atempo_filter(speed_factor) | |
| output_dir = "temp_outputs"; os.makedirs(output_dir, exist_ok=True) | |
| timestamp = int(time.time()) | |
| output_path = os.path.join(output_dir, f"merged_video_{timestamp}.mp4") | |
| ffmpeg_cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", video_path, | |
| "-i", audio_path, | |
| "-c:v", "copy", | |
| "-filter:a", atempo_filter, | |
| "-map", "0:v:0", | |
| "-map", "1:a:0", | |
| "-shortest", | |
| output_path | |
| ] | |
| process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True) | |
| if process.returncode != 0: | |
| error_message = f"FFmpeg Error:\n{process.stderr}" | |
| print(error_message, flush=True) | |
| return None, error_message | |
| print(f"✅ Audio merged successfully: {output_path}", flush=True) | |
| return output_path, "✅ Audio merged successfully!" | |
| finally: | |
| print(f"Cleaning up {len(temp_files_to_clean)} temporary files...", flush=True) | |
| for f in temp_files_to_clean: | |
| try: os.remove(f) | |
| except OSError as e: print(f"Error removing temp file {f}: {e}", flush=True) | |
| # --------------------------------------------------------- | |
| # 4. Gradio Interface | |
| # --------------------------------------------------------- | |
| DEFAULT_CODE = """from manim import * | |
| class GenScene(Scene): | |
| def construct(self): | |
| c = Circle(color=BLUE, fill_opacity=0.5) | |
| self.play(Create(c)) | |
| self.wait(1) | |
| """ | |
| with gr.Blocks(title="Manim Render & Audio Tool") as demo: | |
| with gr.Tab("🎬 Manim Video Renderer"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| code_input = gr.Code(label="Python Code", language="python", value=DEFAULT_CODE, visible=False) | |
| orientation_opt = gr.Radio(choices=["Landscape (16:9)", "Portrait (9:16)"], value="Portrait (9:16)", label="Orientation", visible=False) | |
| quality_opt = gr.Dropdown(choices=["Preview (360p)", "480p", "720p", "1080p", "4k"], value="Preview (360p)", label="Quality", visible=False) | |
| timeout_input = gr.Number(label="Render Timeout (seconds)", value=60, visible=False) | |
| preview_speed_factor_input = gr.Number(label="Preview Speed Factor", value=0.5, visible=False) | |
| render_btn = gr.Button("Render", variant="primary") | |
| with gr.Column(scale=1): | |
| video_output = gr.Video(label="Result") | |
| status_output = gr.Textbox(label="Status/Logs") | |
| fix_btn_output = gr.Button("Fix Error & Re-render", variant="stop", visible=False) | |
| render_btn.click( | |
| fn=render_video_from_code, | |
| inputs=[code_input, orientation_opt, quality_opt, timeout_input, preview_speed_factor_input], | |
| outputs=[video_output, status_output, fix_btn_output], | |
| api_name="render" | |
| ) | |
| with gr.Tab("🎤 Add Audio to Video"): | |
| gr.Markdown("## Merge Audio into Video") | |
| gr.Markdown("Upload a video and an audio file. The audio will be automatically stretched or compressed to match the video's length, preserving original video quality.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input_audio_tab = gr.Video(label="Input Video (MP4)") | |
| audio_input_audio_tab = gr.Audio(label="Input Audio", type="filepath") | |
| merge_audio_btn = gr.Button("Merge Audio", variant="primary") | |
| with gr.Column(): | |
| video_output_audio_tab = gr.Video(label="Merged Video") | |
| status_audio_tab = gr.Textbox(label="Status") | |
| merge_audio_btn.click( | |
| fn=merge_audio_to_video, | |
| inputs=[video_input_audio_tab, audio_input_audio_tab], | |
| outputs=[video_output_audio_tab, status_audio_tab], | |
| api_name="add_audio" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |