AnimateMyIdea1 / app.py
devendergarg14's picture
Update app.py
41c110f verified
raw
history blame
15.9 kB
import gradio as gr
import os
import subprocess
import re
import time
import shutil
import glob
import base64
import tempfile
# ---------------------------------------------------------
# 1. Helper Functions
# ---------------------------------------------------------
def modify_animation_times(code: str, factor: float) -> str:
"""
Safely scales all numeric run_time and wait() values by a given factor.
This single, safe function is now used for both Pre-check and Preview modes.
"""
print(f"⚡ Scaling animation times by a factor of {factor}...", flush=True)
# A safe minimum to prevent Manim from crashing with run_time=0
MIN_RUN_TIME = 0.01
def scale_match(m, is_wait):
try:
val = float(m.group(2))
new_val = val * factor
# For waits, we can go low. For run_time, we must stay above zero.
final_val = new_val if is_wait else max(new_val, MIN_RUN_TIME)
return f"{m.group(1)}{final_val:.3f}"
except ValueError:
# This happens if run_time is a variable. We safely ignore it.
return m.group(0)
# This safe regex only matches explicit numbers, preventing SyntaxErrors.
code = re.sub(r"(run_time\s*=\s*)(\d+\.?\d*)", lambda m: scale_match(m, False), code)
code = re.sub(r"(self\.wait\s*\(\s*)(\d+\.?\d*)", lambda m: scale_match(m, True), code)
return code
def run_manim_pre_check(code_str: str) -> (bool, str):
"""
Runs Manim with '-s'. This is the clean and safe version.
- Uses the safe `modify_animation_times` function, so no special error handling is needed.
- Any error that occurs is now considered a real, critical error.
- Soft Pass on Timeout remains as a fallback.
"""
print("🕵️ Running fast pre-check with 'manim -s'...", flush=True)
# Using a factor of 0.2 provides a good speedup without being overly aggressive.
fast_code = modify_animation_times(code_str, factor=0.1)
with open("scene_pre_check.py", "w", encoding="utf-8") as f:
f.write(fast_code)
cmd = ["manim", "-ql", "--progress_bar", "none", "--disable_caching", "scene_pre_check.py", "GenScene", "-s", "-o", "pre_check_output"]
try:
process = subprocess.run(cmd, capture_output=True, timeout=30, check=False)
if process.returncode == 0:
print("✅ Pre-check passed.", flush=True)
return True, "Pre-check successful."
else:
# With our safe regex, any error should be treated as a real problem.
stderr_log = process.stderr.decode('utf-8', 'ignore')
print(f"❌ Pre-check failed with a critical error.\n{stderr_log}", flush=True)
return False, f"⚠️ ERROR: Your code failed the pre-check.\n\n--- ERROR LOG ---\n{stderr_log}"
except subprocess.TimeoutExpired:
print("⌛ Pre-check timed out (30s). Soft Pass.", flush=True)
return True, "⚠️ Warning: Pre-check timed out. Proceeding to full render..."
def cleanup_media_directory():
media_dir = 'media'
if os.path.exists(media_dir):
try: shutil.rmtree(media_dir)
except OSError: pass
def make_even(n):
return int(n) if int(n) % 2 == 0 else int(n) + 1
def get_resolution_flags(orientation, quality):
qual_map = {"Preview (360p)": 360, "480p": 480, "720p": 720, "1080p": 1080, "4k": 2160}
base_h = qual_map.get(quality, 1080)
if orientation == "Landscape (16:9)":
width, height = make_even(base_h * (16/9)), make_even(base_h)
else:
width, height = make_even(base_h), make_even(base_h * (16/9))
return f"{width},{height}"
def run_manim(code_str, orientation, quality, timeout):
"""
Executes Manim. Partial stitching on Timeout. No recovery on other errors.
"""
timeout_sec = float(timeout) if timeout and float(timeout) > 0 else None
print(f"🎬 Starting Full Render: {orientation} @ {quality} (Timeout: {timeout_sec}s)...", flush=True)
with open("scene.py", "w", encoding="utf-8") as f: f.write(code_str)
timestamp = int(time.time())
output_filename = f"video_{timestamp}.mp4"
res_str = get_resolution_flags(orientation, quality)
frame_rate_flags = ["--frame_rate", "15"] if quality == "Preview (360p)" else []
cmd = ["manim", "--resolution", res_str, *frame_rate_flags, "--disable_caching",
"--progress_bar", "none", "scene.py", "GenScene", "-o", output_filename]
full_logs = ""
try:
process = subprocess.run(cmd, capture_output=True, timeout=timeout_sec, check=False)
stdout_log = process.stdout.decode('utf-8', 'ignore')
stderr_log = process.stderr.decode('utf-8', 'ignore')
full_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}"
if process.returncode != 0:
print(f"❌ Render Failed (Process Error). Return Code: {process.returncode}", flush=True)
except subprocess.TimeoutExpired as e:
print(f"⌛ Render timed out after {timeout_sec} seconds. Attempting recovery...", flush=True)
stdout_log = e.stdout.decode('utf-8', 'ignore') if e.stdout else ""
stderr_log = e.stderr.decode('utf-8', 'ignore') if e.stderr else ""
timeout_logs = f"--- MANIM STDOUT ---\n{stdout_log}\n\n--- MANIM STDERR ---\n{stderr_log}"
full_logs = timeout_logs
combined_log = stdout_log + "\n" + stderr_log
path_matches = re.findall(r"movie file written in\s*'([^']+?)'", combined_log, flags=re.DOTALL)
if path_matches:
partial_files_dir = os.path.dirname("".join(path_matches[-1].split()))
if os.path.exists(partial_files_dir):
partial_files = sorted(glob.glob(os.path.join(partial_files_dir, 'uncached_*.mp4')),
key=lambda f: int(re.search(r'(\d+)\.mp4$', f).group(1)))
if partial_files:
print(f"⚡ Found {len(partial_files)} partial chunks. Stitching...", flush=True)
list_file_path = os.path.join(partial_files_dir, "file_list.txt")
with open(list_file_path, 'w') as f:
for pf in partial_files:
f.write(f"file '{os.path.abspath(pf)}'\n")
combined_video_path = os.path.join(os.path.dirname(partial_files_dir), f"combined_partial_{timestamp}.mp4")
ffmpeg_cmd = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_file_path, "-c", "copy", combined_video_path]
ffmpeg_process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True, check=False)
if ffmpeg_process.returncode == 0 and os.path.exists(combined_video_path):
print(f"✅ Recovery Successful: {combined_video_path}", flush=True)
return combined_video_path, f"⚠️ WARNING: Render Timed Out. Recovered partial video.\n\n{timeout_logs}", True
print("❌ Recovery failed or no partial files found.", flush=True)
media_video_base = os.path.join("media", "videos", "scene")
if os.path.exists(media_video_base):
for root, _, files in os.walk(media_video_base):
if output_filename in files:
found_video_path = os.path.join(root, output_filename)
print(f"✅ Video Render Success: {found_video_path}", flush=True)
return found_video_path, f"✅ Rendering Successful\n\n{full_logs}", True
return None, f"❌ Failure: Video file was not created.\n\n{full_logs}", False
# ---------------------------------------------------------
# 2. Main API Function
# ---------------------------------------------------------
def render_video_from_code(code, orientation, quality, timeout, preview_factor):
try:
is_valid, logs = run_manim_pre_check(code)
if not is_valid:
return None, logs, gr.Button(visible=True)
cleanup_media_directory()
if not code or "from manim import" not in code:
return None, "Error: No valid code.", gr.Button(visible=False)
if quality == "Preview (360p)":
code_to_render = modify_animation_times(code, factor=float(preview_factor) or 0.5)
else:
code_to_render = code
video_path, logs, success = run_manim(code_to_render, orientation, quality, timeout)
return video_path, logs, gr.Button(visible=not success)
except Exception as e:
return None, f"Rendering failed: {str(e)}", gr.Button(visible=True)
# ---------------------------------------------------------
# 3. Audio Merging Functions (API-Safe)
# ---------------------------------------------------------
def get_media_duration(file_path):
"""Uses ffprobe to get the duration of a media file."""
cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return float(result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError):
print(f"⚠️ Could not get duration for {file_path}. Is ffprobe installed?", flush=True)
return None
def build_atempo_filter(factor):
"""Builds a chained atempo filter for FFmpeg to handle extreme speed changes."""
filters = []
while factor > 100.0:
filters.append("atempo=100.0"); factor /= 100.0
while factor < 0.5:
filters.append("atempo=0.5"); factor /= 0.5
if 0.5 <= factor <= 100.0:
filters.append(f"atempo={factor}")
return ",".join(filters)
def decode_base64_to_tempfile(data_obj):
"""Decodes a base64 string from a Gradio file object and saves to a temp file."""
if not data_obj or 'data' not in data_obj: return None
header, encoded_data = data_obj['data'].split(",", 1)
file_extension = header.split('/')[1].split(';')[0]
decoded_data = base64.b64decode(encoded_data)
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as tmp_file:
tmp_file.write(decoded_data)
return tmp_file.name
def merge_audio_to_video(video_input, audio_input):
"""
Merges audio into a video. Handles both file paths (from UI) and
base64 dictionaries (from API) as inputs.
"""
video_path, audio_path = None, None
temp_files_to_clean = []
try:
if isinstance(video_input, str) and os.path.exists(video_input):
video_path = video_input
elif isinstance(video_input, dict):
print("API call detected: Decoding video from base64.", flush=True)
video_path = decode_base64_to_tempfile(video_input)
if video_path: temp_files_to_clean.append(video_path)
if isinstance(audio_input, str) and os.path.exists(audio_input):
audio_path = audio_input
elif isinstance(audio_input, dict):
print("API call detected: Decoding audio from base64.", flush=True)
audio_path = decode_base64_to_tempfile(audio_input)
if audio_path: temp_files_to_clean.append(audio_path)
if not video_path or not audio_path:
return None, "Error: Missing video or audio file. Please provide both."
gr.Info("Merging audio...")
video_duration = get_media_duration(video_path)
audio_duration = get_media_duration(audio_path)
if video_duration is None or audio_duration is None:
return None, "Error: Could not determine media durations."
if video_duration == 0:
return None, "Error: Input video has zero duration."
speed_factor = audio_duration / video_duration
atempo_filter = build_atempo_filter(speed_factor)
output_dir = "temp_outputs"; os.makedirs(output_dir, exist_ok=True)
timestamp = int(time.time())
output_path = os.path.join(output_dir, f"merged_video_{timestamp}.mp4")
ffmpeg_cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_path,
"-c:v", "copy",
"-filter:a", atempo_filter,
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
output_path
]
process = subprocess.run(ffmpeg_cmd, capture_output=True, text=True)
if process.returncode != 0:
error_message = f"FFmpeg Error:\n{process.stderr}"
print(error_message, flush=True)
return None, error_message
print(f"✅ Audio merged successfully: {output_path}", flush=True)
return output_path, "✅ Audio merged successfully!"
finally:
print(f"Cleaning up {len(temp_files_to_clean)} temporary files...", flush=True)
for f in temp_files_to_clean:
try: os.remove(f)
except OSError as e: print(f"Error removing temp file {f}: {e}", flush=True)
# ---------------------------------------------------------
# 4. Gradio Interface
# ---------------------------------------------------------
DEFAULT_CODE = """from manim import *
class GenScene(Scene):
def construct(self):
c = Circle(color=BLUE, fill_opacity=0.5)
self.play(Create(c))
self.wait(1)
"""
with gr.Blocks(title="Manim Render & Audio Tool") as demo:
with gr.Tab("🎬 Manim Video Renderer"):
with gr.Row():
with gr.Column(scale=1):
code_input = gr.Code(label="Python Code", language="python", value=DEFAULT_CODE, visible=False)
orientation_opt = gr.Radio(choices=["Landscape (16:9)", "Portrait (9:16)"], value="Portrait (9:16)", label="Orientation", visible=False)
quality_opt = gr.Dropdown(choices=["Preview (360p)", "480p", "720p", "1080p", "4k"], value="Preview (360p)", label="Quality", visible=False)
timeout_input = gr.Number(label="Render Timeout (seconds)", value=60, visible=False)
preview_speed_factor_input = gr.Number(label="Preview Speed Factor", value=0.5, visible=False)
render_btn = gr.Button("Render", variant="primary")
with gr.Column(scale=1):
video_output = gr.Video(label="Result")
status_output = gr.Textbox(label="Status/Logs")
fix_btn_output = gr.Button("Fix Error & Re-render", variant="stop", visible=False)
render_btn.click(
fn=render_video_from_code,
inputs=[code_input, orientation_opt, quality_opt, timeout_input, preview_speed_factor_input],
outputs=[video_output, status_output, fix_btn_output],
api_name="render"
)
with gr.Tab("🎤 Add Audio to Video"):
gr.Markdown("## Merge Audio into Video")
gr.Markdown("Upload a video and an audio file. The audio will be automatically stretched or compressed to match the video's length, preserving original video quality.")
with gr.Row():
with gr.Column():
video_input_audio_tab = gr.Video(label="Input Video (MP4)")
audio_input_audio_tab = gr.Audio(label="Input Audio", type="filepath")
merge_audio_btn = gr.Button("Merge Audio", variant="primary")
with gr.Column():
video_output_audio_tab = gr.Video(label="Merged Video")
status_audio_tab = gr.Textbox(label="Status")
merge_audio_btn.click(
fn=merge_audio_to_video,
inputs=[video_input_audio_tab, audio_input_audio_tab],
outputs=[video_output_audio_tab, status_audio_tab],
api_name="add_audio"
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)