from flask import Flask, request, jsonify, send_file from flask_cors import CORS import os import subprocess import tempfile import shutil from datetime import datetime import traceback import json import ast import re import html import unicodedata import asyncio from concurrent.futures import ThreadPoolExecutor from functools import lru_cache import edge_tts from pydub import AudioSegment from pydub.effects import normalize from mutagen.mp3 import MP3 app = Flask(__name__) CORS(app) # Configuration BASE_DIR = "/app" MEDIA_DIR = os.path.join(BASE_DIR, "media") TEMP_DIR = os.path.join(BASE_DIR, "temp") AUDIO_DIR = os.path.join(BASE_DIR, "sound") os.makedirs(MEDIA_DIR, exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(AUDIO_DIR, exist_ok=True) # API Key for security (optional) API_KEY = "rkmentormindzofficaltokenkey12345" import os import re import html import unicodedata import asyncio import tempfile import traceback import random import hashlib from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple, Optional, Dict import edge_tts from pydub import AudioSegment from pydub.effects import normalize, compress_dynamic_range from mutagen.mp3 import MP3 # Voice configuration VOICE_EN = "en-IN-NeerjaNeural" AUDIO_DIR = os.path.join(os.getcwd(), "audio") os.makedirs(AUDIO_DIR, exist_ok=True) # Pre-compiled regex patterns URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+') TAG_PATTERN = re.compile(r'<[^>]*>') BRACKET_PATTERN = re.compile(r'[\{\}\[\]]') SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]') WHITESPACE_PATTERN = re.compile(r'\s+') def clean_text_for_tts(text: str) -> str: """Cleans text while preserving ALL Tamil/Indic characters and punctuation.""" if not text: return "" text = str(text).strip() text = html.unescape(text) # Remove URLs text = URL_PATTERN.sub('', text) # Remove HTML/XML tags but preserve content text = TAG_PATTERN.sub('', text) # Remove brackets text = BRACKET_PATTERN.sub('', text) # Remove special characters but preserve punctuation needed for TTS text = SPECIAL_CHAR_PATTERN.sub('', text) # Replace newlines/tabs with spaces text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ') # Use NFC normalization to preserve Tamil/Indic characters text = unicodedata.normalize('NFC', text) # Collapse multiple whitespace but preserve single spaces text = WHITESPACE_PATTERN.sub(' ', text) # IMPORTANT: Remove zero-width characters that might break TTS text = text.replace('\u200b', '') # Zero-width space text = text.replace('\u200c', '') # Zero-width non-joiner text = text.replace('\u200d', '') # Zero-width joiner return text.strip() def create_natural_chunks(text: str, max_chars: int = 300) -> List[Tuple[str, int, str]]: """ Create natural chunks that preserve language context and Tamil words. Returns list of (chunk_text, chunk_index, language) """ cleaned = clean_text_for_tts(text) if not cleaned or len(cleaned) < 5: # If text is very short, return as single chunk has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in cleaned) if cleaned else False lang = 'ta' if has_tamil else 'en' return [(cleaned, 0, lang)] if cleaned else [] # First, preserve natural Tamil words by not breaking them # Protect Tamil words with spaces around them words = cleaned.split() chunks = [] current_chunk = "" current_lang = None chunk_index = 0 i = 0 while i < len(words): word = words[i] # Detect word language has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in word) word_lang = 'ta' if has_tamil else 'en' # Handle single-character Tamil words like "ல" if has_tamil and len(word) == 1: # Attach to next word if possible if i + 1 < len(words): next_word = words[i + 1] # If next word is also Tamil or short, combine them if len(next_word) <= 3 or any('\u0B80' <= char <= '\u0BFF' for char in next_word): word = word + " " + next_word i += 1 # Skip next word word_lang = 'ta' # Test if adding this word would exceed max_chars test_chunk = f"{current_chunk} {word}" if current_chunk else word if len(test_chunk) <= max_chars: # Can add to current chunk if current_chunk: current_chunk = f"{current_chunk} {word}" else: current_chunk = word # Update language - if mixed, use language with most characters if current_lang != word_lang: # Count characters by language in current chunk tamil_chars = sum(1 for char in current_chunk if '\u0B80' <= char <= '\u0BFF') english_chars = sum(1 for char in current_chunk if char.isalpha() and not ('\u0B80' <= char <= '\u0BFF')) current_lang = 'ta' if tamil_chars >= english_chars else 'en' else: # Start new chunk if current_chunk: chunks.append((current_chunk, chunk_index, current_lang or word_lang)) chunk_index += 1 current_chunk = word current_lang = word_lang i += 1 # Add final chunk if current_chunk: chunks.append((current_chunk, chunk_index, current_lang or 'en')) # Ensure chunks aren't too small (merge small chunks) merged_chunks = [] i = 0 while i < len(chunks): chunk_text, chunk_idx, chunk_lang = chunks[i] # If chunk is very small (less than 20 chars), merge with next if len(chunk_text) < 20 and i + 1 < len(chunks): next_text, next_idx, next_lang = chunks[i + 1] # Merge if languages are compatible if chunk_lang == next_lang or len(next_text) < 30: merged_text = f"{chunk_text} {next_text}" merged_lang = chunk_lang if len(chunk_text) >= len(next_text) else next_lang merged_chunks.append((merged_text, len(merged_chunks), merged_lang)) i += 2 else: merged_chunks.append((chunk_text, len(merged_chunks), chunk_lang)) i += 1 else: merged_chunks.append((chunk_text, len(merged_chunks), chunk_lang)) i += 1 return merged_chunks async def generate_safe_audio(text: str, voice: str, semaphore: asyncio.Semaphore, chunk_index: int) -> Tuple[Optional[str], int]: """Generate audio with rate limiting, caching, and retry logic.""" if not text or len(text) < 1: return None, chunk_index # Create deterministic cache key cache_key = f"{text}_{voice}" text_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest() cache_filename = os.path.join(AUDIO_DIR, f"cache_{text_hash}.mp3") # Check disk cache if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 512: return cache_filename, chunk_index async with semaphore: max_retries = 3 base_delay = 1.5 for attempt in range(max_retries): temp_filename = None try: # Create temp file with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp: temp_filename = tmp.name # Use slower rate for Tamil to ensure quality rate = "-10%" if "ta-IN" in voice else "0%" # Generate with edge_tts comm = edge_tts.Communicate(text, voice=voice, rate=rate) await comm.save(temp_filename) # Verify successful generation if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 512: # Move to cache location os.replace(temp_filename, cache_filename) return cache_filename, chunk_index except Exception as e: # Clean up temp file on error if temp_filename and os.path.exists(temp_filename): try: os.unlink(temp_filename) except: pass if attempt == max_retries - 1: print(f"Failed to generate audio chunk {chunk_index}: {e}") return None, chunk_index # Exponential backoff with jitter sleep_time = (base_delay * (2 ** attempt)) + random.uniform(0.1, 0.5) await asyncio.sleep(sleep_time) finally: # Ensure temp file is cleaned up if temp_filename and os.path.exists(temp_filename) and temp_filename != cache_filename: try: os.unlink(temp_filename) except: pass return None, chunk_index def process_audio_segment_fast(audio_data: Tuple[str, int]) -> Tuple[Optional[AudioSegment], int]: """Process audio segment with minimal silence.""" audio_file, chunk_index = audio_data try: if not audio_file or not os.path.exists(audio_file): return None, chunk_index segment = AudioSegment.from_file(audio_file) # REDUCED SILENCE: Only add minimal padding if len(segment) > 0: # Just 10ms padding instead of 50ms segment = AudioSegment.silent(duration=10) + segment + AudioSegment.silent(duration=10) # Gentle normalization (don't over-process) segment = normalize(segment, headroom=0.1) # Remove excessive silence (but be careful not to cut words) if len(segment) > 1000: # Only for longer segments try: # Only strip if there's clear silence at ends segment = segment.strip_silence(silence_thresh=-40, padding=25) except: pass return segment, chunk_index except Exception as e: print(f"Warning: Error processing audio segment {chunk_index}: {e}") return None, chunk_index async def bilingual_tts_optimized(text: str, output_file: str = "audio0.mp3", VOICE_TA: Optional[str] = None, max_concurrent: int = 4) -> Optional[str]: """Optimized bilingual TTS with minimal silence and preserved words.""" print("Starting bilingual TTS processing...") try: # Create natural chunks that preserve Tamil words chunks_info = create_natural_chunks(text, max_chars=300) if not chunks_info: print("Error: No valid text chunks after processing") return None print(f"Processing {len(chunks_info)} text chunks...") # Prepare tasks tasks = [] semaphore = asyncio.Semaphore(max_concurrent) for chunk_text, chunk_index, chunk_lang in chunks_info: if not chunk_text or len(chunk_text.strip()) < 1: continue # Determine voice for this chunk if VOICE_TA and chunk_lang == 'ta': voice = VOICE_TA else: voice = VOICE_TA or VOICE_EN tasks.append(generate_safe_audio(chunk_text, voice, semaphore, chunk_index)) if not tasks: print("Error: No tasks to process") return None # Generate all audio files results = await asyncio.gather(*tasks, return_exceptions=False) # Filter successful results audio_data = [] for result in results: if isinstance(result, tuple) and result[0] and os.path.exists(result[0]): audio_data.append(result) if not audio_data: print("Error: No audio was successfully generated") return None # Sort by chunk index audio_data.sort(key=lambda x: x[1]) print(f"Successfully generated {len(audio_data)} audio segments") # Process audio segments processed_segments = [] for audio_file, chunk_index in audio_data: segment_result = process_audio_segment_fast((audio_file, chunk_index)) if segment_result[0] is not None: processed_segments.append(segment_result) # Sort by index processed_segments.sort(key=lambda x: x[1]) audio_segments = [seg for seg, idx in processed_segments] if not audio_segments: print("Error: No audio segments were successfully processed") return None print(f"Merging {len(audio_segments)} audio segments...") # Merge with MINIMAL gaps - only 30ms between segments merged_audio = audio_segments[0] for i in range(1, len(audio_segments)): # Only add tiny pause if needed current_end = merged_audio[-50:] if len(merged_audio) > 50 else merged_audio next_start = audio_segments[i][:50] if len(audio_segments[i]) > 50 else audio_segments[i] # Check if we need a pause (if both segments end/start with sound) add_pause = 20 # Only 20ms pause merged_audio = merged_audio + AudioSegment.silent(duration=add_pause) + audio_segments[i] # Gentle processing for natural sound try: # Very light compression to reduce volume variations merged_audio = compress_dynamic_range( merged_audio, threshold=-25.0, # Higher threshold = less compression ratio=1.8, # Lower ratio = more natural attack=10.0, release=100.0 ) except: pass # Final normalization with headroom merged_audio = normalize(merged_audio, headroom=0.5) # Export merged_audio.export(output_file, format="mp3", bitrate="192k") if os.path.exists(output_file) and os.path.getsize(output_file) > 1024: print(f"✅ Audio successfully generated: {output_file}") # Verify all words are present by checking file properties try: audio = MP3(output_file) duration = audio.info.length print(f"Audio duration: {duration:.2f} seconds") except: pass return output_file else: print("Error: Generated file is empty or missing") return None except Exception as main_error: print(f"Main error in bilingual TTS: {main_error}") traceback.print_exc() return None async def generate_tts_optimized(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: """Optimized TTS generation function.""" voice_map = { "English": "en-US-JennyNeural", "Tamil": "ta-IN-PallaviNeural", "Hindi": "hi-IN-SwaraNeural", "Malayalam": "ml-IN-SobhanaNeural", "Kannada": "kn-IN-SapnaNeural", "Telugu": "te-IN-ShrutiNeural", "Bengali": "bn-IN-TanishaaNeural", "Marathi": "mr-IN-AarohiNeural", "Gujarati": "gu-IN-DhwaniNeural", "Punjabi": "pa-IN-VaaniNeural", "Urdu": "ur-IN-GulNeural", "French": "fr-FR-DeniseNeural", "German": "de-DE-KatjaNeural", "Spanish": "es-ES-ElviraNeural", "Italian": "it-IT-IsabellaNeural", "Russian": "ru-RU-SvetlanaNeural", "Japanese": "ja-JP-NanamiNeural", "Korean": "ko-KR-SunHiNeural", "Chinese": "zh-CN-XiaoxiaoNeural", "Arabic": "ar-SA-ZariyahNeural", "Portuguese": "pt-BR-FranciscaNeural", "Dutch": "nl-NL-FennaNeural", "Greek": "el-GR-AthinaNeural", "Hebrew": "he-IL-HilaNeural", "Turkish": "tr-TR-EmelNeural", "Polish": "pl-PL-AgnieszkaNeural", "Thai": "th-TH-AcharaNeural", "Vietnamese": "vi-VN-HoaiMyNeural", "Swedish": "sv-SE-SofieNeural", "Finnish": "fi-FI-NooraNeural", "Czech": "cs-CZ-VlastaNeural", "Hungarian": "hu-HU-NoemiNeural" } audio_name = f"audio{id}.mp3" audio_path = os.path.join(AUDIO_DIR, audio_name) if "&&&" in lang: listf = lang.split("&&&") text = listf[0].strip() lang_name = listf[1].strip() if len(listf) > 1 else "English" voice_to_use = voice_map.get(lang_name, VOICE_EN) else: text = lines[id] if isinstance(lines, (list, tuple)) and id < len(lines) else str(lines) voice_to_use = voice_map.get(lang, VOICE_EN) # Reduced concurrency for better quality output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=3) if output and os.path.exists(audio_path): try: audio = MP3(audio_path) duration = audio.info.length return duration, audio_path except Exception as e: print(f"Error reading audio file: {e}") return None, None return None, None def audio_func(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: """Synchronous wrapper for audio generation.""" try: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(generate_tts_optimized(id, lines, lang)) finally: loop.close() except Exception as e: print(f"Error in audio_func: {e}") traceback.print_exc() return None, None def create_manim_script(problem_data, script_path, audio_path, audio_length): """Generate Manim script with proper wrapping and audio-video sync.""" settings = problem_data.get("video_settings", { "background_color": "#0f0f23", "text_color": "WHITE", "highlight_color": "YELLOW", "font": "CMU Serif", "text_size": 36, "equation_size": 45, "title_size": 48, "wrap_width": 15.5 }) slides = problem_data.get("slides", []) if not slides: raise ValueError("No slides provided in input data") # FIX #2: Calculate timing overhead for accurate audio-video sync num_slides = len(slides) num_titles = sum(1 for s in slides if s.get("type") == "title") overhead_time = (num_slides - num_titles) * 0.3 # wait after each content slide overhead_time += num_titles * 0.4 # title animation overhead overhead_time += 2.3 # final highlight + wait overhead_time += (num_slides / 3) * 0.5 # estimated scroll overhead # Calculate separate durations for different slide types equation_duration = 0.0 text_title_duration = 0.0 for slide in slides: slide_duration = float(slide.get("duration", 1.0)) if slide.get("type") == "equation": equation_duration += slide_duration else: text_title_duration += slide_duration # FIX #2: Subtract overhead from available time before calculating scale available_time = audio_length - text_title_duration - overhead_time if equation_duration > 0 and available_time > 0: equation_scale = available_time / equation_duration equation_scale = max(0.5, min(2.5, equation_scale)) else: equation_scale = 1.0 slides_repr = repr(slides) audio_path_repr = repr(audio_path) wrap_width = float(settings.get("wrap_width", 15.5)) background_color = settings.get("background_color", "#0f0f23") text_color = settings.get("text_color", "WHITE") highlight_color = settings.get("highlight_color", "YELLOW") font = settings.get("font", "CMU Serif") text_size = settings.get("text_size", 36) equation_size = settings.get("equation_size", 50) title_size = settings.get("title_size", 48) manim_code = f"""from manim import * class GeneratedMathScene(Scene): def construct(self): # Scene settings self.add_sound({audio_path_repr}) self.camera.background_color = "{background_color}" default_color = {text_color} highlight_color = {highlight_color} default_font = "{font}" text_size = {text_size} equation_size = {equation_size} title_size = {title_size} wrap_width = {wrap_width} equation_scale = {equation_scale} # FIX #1: Improved wrapping function - check width BEFORE arranging def make_inline_segments(content, color, font, text_size, equation_size): if not content: return VGroup() segments = content.split("#") all_lines = [] current_line = [] current_width = 0.0 for segment in segments: segment = segment.strip() if not segment: continue # Create mobject if segment.startswith("%"): latex_content = segment[1:] mob = MathTex(latex_content, color=color, font_size=equation_size) else: mob = Text(segment, color=color, font=font, font_size=text_size) # FIX #1: Check width BEFORE adding to line mob_width = mob.width potential_width = current_width + mob_width + (0.05 * len(current_line)) if potential_width > wrap_width and len(current_line) > 0: # Line is full, save it and start new line line_group = VGroup(*current_line).arrange(RIGHT, buff=0.05) all_lines.append(line_group) current_line = [mob] current_width = mob_width else: # Add to current line current_line.append(mob) current_width = potential_width # Safety: If single item exceeds width, scale it down if len(current_line) == 1 and mob.width > wrap_width: mob.scale_to_fit_width(wrap_width * 0.95) current_width = mob.width # Add final line if current_line: line_group = VGroup(*current_line).arrange(RIGHT, buff=0.05) all_lines.append(line_group) if not all_lines: return VGroup() final_group = VGroup(*all_lines).arrange(DOWN, aligned_edge=LEFT, buff=0.2) return final_group def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2): lines = [] words = content.split() current = "" for w in words: test = w if not current else current + " " + w test_obj = Text(test, color=color, font=font, font_size=font_size) if test_obj.width <= wrap_width * 0.95: current = test else: if current: line_obj = Text(current, color=color, font=font, font_size=font_size) lines.append(line_obj) current = w if current: lines.append(Text(current, color=color, font=font, font_size=font_size)) if not lines: return VGroup() first_line = lines[0] for ln in lines: ln.align_to(first_line, LEFT) para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing) return para content_group = VGroup() current_y = 3.0 line_spacing = 0.8 slides = {slides_repr} for idx, slide in enumerate(slides): obj = None content = slide.get("content", "") animation = slide.get("animation", "write_left") base_duration = slide.get("duration", 1.0) slide_type = slide.get("type", "text") # Apply scale ONLY to equations if slide_type == "equation": duration = base_duration * equation_scale else: duration = base_duration if slide_type == "title": obj = make_inline_segments(content, highlight_color, default_font, title_size, equation_size) if len(obj) == 0: obj = Text(content, color=highlight_color, font=default_font, font_size=title_size) # FIX #1: Ensure title fits within screen if obj.width > wrap_width: obj.scale_to_fit_width(wrap_width * 0.95) obj.move_to(ORIGIN) self.play(FadeIn(obj), run_time=duration * 0.8) self.wait(duration * 0.3) self.play(FadeOut(obj), run_time=duration * 0.3) continue elif slide_type == "text": obj = make_inline_segments(content, default_color, default_font, text_size, equation_size) if len(obj) == 0: obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25) # FIX #1: Safety check for text overflow if obj.width > wrap_width: obj.scale_to_fit_width(wrap_width * 0.95) elif slide_type == "equation": eq_content = content obj = MathTex(eq_content, color=default_color, font_size=equation_size) # FIX #1: Scale equation instead of splitting by spaces if obj.width > wrap_width: obj.scale_to_fit_width(wrap_width * 0.95) if obj: obj.to_edge(LEFT, buff=0.3) obj.shift(UP * (current_y - obj.height / 2)) obj_bottom = obj.get_bottom()[1] if obj_bottom < -3.5: scroll_amount = abs(obj_bottom - (-3.5)) + 0.3 self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5) current_y += scroll_amount obj.shift(UP * scroll_amount) obj.to_edge(LEFT, buff=0.3) if animation == "write_left": self.play(Write(obj), run_time=duration) elif animation == "fade_in": self.play(FadeIn(obj), run_time=duration) elif animation == "highlight_left": self.play(Write(obj), run_time=duration * 0.6) self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4) else: self.play(Write(obj), run_time=duration) content_group.add(obj) current_y -= (getattr(obj, "height", 0) + line_spacing) self.wait(0.3) if len(content_group) > 0: final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2) self.play(Create(final_box), run_time=0.8) self.wait(1.5) """ try: with open(script_path, 'w', encoding='utf-8') as f: f.write(manim_code) print(f"Generated script at {script_path}") print(f"Audio length: {audio_length:.2f}s") print(f"Overhead time: {overhead_time:.2f}s") print(f"Equation scale factor: {equation_scale:.2f}x") print(f"Text/Title duration: {text_title_duration:.2f}s (unchanged)") print(f"Equation duration: {equation_duration:.2f}s -> {equation_duration * equation_scale:.2f}s") print(f"Expected total: {text_title_duration + (equation_duration * equation_scale) + overhead_time:.2f}s") except Exception as e: print(f"Error writing script: {e}") raise @app.route("/generate", methods=["POST"]) def generate_video(): temp_work_dir = None try: raw_data = request.get_json() if not raw_data: return jsonify({"error": "No JSON data provided"}), 400 raw_body = raw_data.get("jsondata", '') if not raw_body: return jsonify({"error": "No jsondata field in request"}), 400 lst = raw_body.split("&&&&") if len(lst) < 2: return jsonify({"error": "Invalid data format, missing &&&&separator"}), 400 cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0]) try: nlist = ast.literal_eval(cleaned) except Exception as e: return jsonify({"error": f"Failed to parse slide data: {str(e)}"}), 400 datalst = [] for line in range(len(nlist)): try: datalst.append({ "type": nlist[line][0].strip(), "content": nlist[line][1].strip(), "animation": nlist[line][2].strip().replace(" ", ""), "duration": float(nlist[line][3]) }) except (IndexError, ValueError) as e: return jsonify({"error": f"Invalid slide data at index {line}: {str(e)}"}), 400 data = { "video_settings": { "background_color": "#0f0f23", "text_color": "WHITE", "highlight_color": "YELLOW", "font": "CMU Serif", "text_size": 36, "equation_size": 42, "title_size": 48 }, "slides": datalst } best = lst[1].split("&&&") lines = best[0] try: lang = best[1] if len(best) > 1 else "English" except: lang = "English" audio_length, audio_path = audio_func(0, lines, lang) if not audio_length or not audio_path or not os.path.exists(audio_path): return jsonify({"error": "Failed to generate audio"}), 500 if "slides" not in data or not data["slides"]: return jsonify({"error": "No slides provided in request"}), 400 print(f"Received request with {len(data['slides'])} slides") print(f"Audio length: {audio_length}s") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}") os.makedirs(temp_work_dir, exist_ok=True) script_path = os.path.join(temp_work_dir, "scene.py") # Pass audio_length instead of scale create_manim_script(data, script_path, audio_path, audio_length) print(f"Created Manim script at {script_path}") quality = 'l' render_command = [ "manim", f"-q{quality}", "--disable_caching", "--media_dir", temp_work_dir, script_path, "GeneratedMathScene" ] print(f"Running command: {' '.join(render_command)}") result = subprocess.run( render_command, capture_output=True, text=True, cwd=temp_work_dir, timeout=120 ) if result.returncode != 0: error_msg = result.stderr or result.stdout print(f"Manim rendering failed: {error_msg}") return jsonify({ "error": "Manim rendering failed", "details": error_msg }), 500 print("Manim rendering completed successfully") quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'} video_quality = quality_map.get(quality, '480p15') video_path = os.path.join( temp_work_dir, "videos", "scene", video_quality, "GeneratedMathScene.mp4" ) if not os.path.exists(video_path): print(f"Video not found at expected path: {video_path}") return jsonify({ "error": "Video file not found after rendering", "expected_path": video_path }), 500 print(f"Video found at: {video_path}") output_filename = f"math_video_{timestamp}.mp4" output_path = os.path.join(MEDIA_DIR, output_filename) shutil.copy(video_path, output_path) print(f"Video copied to: {output_path}") try: if temp_work_dir and os.path.exists(temp_work_dir): shutil.rmtree(temp_work_dir) print("Cleaned up temp directory") except Exception as e: print(f"Failed to clean temp dir: {e}") return send_file( output_path, mimetype='video/mp4', as_attachment=False, download_name=output_filename ) except subprocess.TimeoutExpired: print("Video rendering timeout") if temp_work_dir and os.path.exists(temp_work_dir): try: shutil.rmtree(temp_work_dir) except: pass return jsonify({"error": "Video rendering timeout (120s)"}), 504 except Exception as e: print(f"Error: {str(e)}") traceback.print_exc() if temp_work_dir and os.path.exists(temp_work_dir): try: shutil.rmtree(temp_work_dir) except: pass return jsonify({ "error": str(e), "traceback": traceback.format_exc() }), 500 if __name__ == '__main__': port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=False)