| from flask import Flask, request, jsonify, send_file |
| from flask_cors import CORS |
| import os |
| import subprocess |
| import tempfile |
| import shutil |
| from datetime import datetime |
| import traceback |
| import json |
| import ast |
| import re |
| import html |
| import unicodedata |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor |
| from functools import lru_cache |
| import edge_tts |
| from pydub import AudioSegment |
| from pydub.effects import normalize |
| from mutagen.mp3 import MP3 |
|
|
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| BASE_DIR = "/app" |
| MEDIA_DIR = os.path.join(BASE_DIR, "media") |
| TEMP_DIR = os.path.join(BASE_DIR, "temp") |
| AUDIO_DIR = os.path.join(BASE_DIR, "sound") |
| os.makedirs(MEDIA_DIR, exist_ok=True) |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
| |
| API_KEY = "rkmentormindzofficaltokenkey12345" |
|
|
| import os |
| import re |
| import html |
| import unicodedata |
| import asyncio |
| import tempfile |
| import traceback |
| import random |
| import hashlib |
| import json |
| from concurrent.futures import ThreadPoolExecutor |
| from functools import lru_cache |
| from typing import List, Tuple, Optional, Dict |
|
|
| import edge_tts |
| from pydub import AudioSegment |
| from pydub.effects import normalize |
| from mutagen.mp3 import MP3 |
|
|
| |
| VOICE_EN = "en-IN-NeerjaNeural" |
| AUDIO_DIR = os.path.join(os.getcwd(), "audio") |
| os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
| |
| URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+') |
| TAG_PATTERN = re.compile(r'<[^>]*>') |
| BRACKET_PATTERN = re.compile(r'[\{\}\[\]]') |
| SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]') |
| WHITESPACE_PATTERN = re.compile(r'\s+') |
| |
| SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+(?=[A-Z])') |
| |
| SUB_PATTERN = re.compile(r'(?<!\d),(?!\d)\s*') |
|
|
| |
| _chunking_cache: Dict[str, Tuple[str, ...]] = {} |
|
|
| def clean_text_for_tts(text: str) -> str: |
| """Cleans text while preserving Tamil/Indic characters and code-switched punctuation.""" |
| if not text: |
| return "" |
| |
| text = str(text).strip() |
| text = html.unescape(text) |
| |
| |
| text = URL_PATTERN.sub('', text) |
| |
| |
| text = TAG_PATTERN.sub('', text) |
| |
| |
| text = BRACKET_PATTERN.sub('', text) |
| |
| |
| text = SPECIAL_CHAR_PATTERN.sub('', text) |
| |
| |
| text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ') |
| |
| |
| text = unicodedata.normalize('NFC', text) |
| |
| |
| text = WHITESPACE_PATTERN.sub(' ', text) |
| |
| return text.strip() |
|
|
| def split_by_word_boundary(text: str) -> List[str]: |
| """ |
| Intelligently splits text by language boundaries while preserving code-switched words. |
| Example: "Voltage னு" → ["Voltage", " னு"] |
| """ |
| if not text: |
| return [] |
| |
| segments = [] |
| current_segment = "" |
| current_lang = None |
| |
| i = 0 |
| while i < len(text): |
| char = text[i] |
| |
| |
| if '\u0B80' <= char <= '\u0BFF': |
| char_lang = 'ta' |
| elif char.isalpha() or char in '-': |
| char_lang = 'en' |
| else: |
| char_lang = current_lang |
| |
| |
| if current_lang and char_lang and current_lang != char_lang: |
| |
| if char == '-' and i > 0 and i < len(text) - 1: |
| |
| prev_char = text[i-1] |
| next_char = text[i+1] |
| if prev_char.isalpha() and ('\u0B80' <= next_char <= '\u0BFF'): |
| |
| current_segment += char |
| i += 1 |
| continue |
| |
| if current_segment.strip(): |
| segments.append(current_segment) |
| current_segment = char |
| current_lang = char_lang |
| else: |
| current_segment += char |
| current_lang = char_lang or current_lang |
| |
| i += 1 |
| |
| if current_segment.strip(): |
| segments.append(current_segment) |
| |
| return segments |
|
|
| def chunk_text_with_overlap(text: str, max_chars: int = 250) -> List[Tuple[str, int]]: |
| """ |
| Creates chunks with overlap for smooth transitions. |
| Returns list of (chunk_text, chunk_index) |
| """ |
| |
| cleaned = clean_text_for_tts(text) |
| if not cleaned: |
| return [] |
| |
| |
| segments = split_by_word_boundary(cleaned) |
| |
| |
| chunks = [] |
| current_chunk = "" |
| current_words = [] |
| |
| for segment in segments: |
| test_chunk = current_chunk + segment if current_chunk else segment |
| test_words = test_chunk.split() |
| |
| if len(test_chunk) <= max_chars and len(test_words) <= 20: |
| current_chunk = test_chunk |
| current_words = test_words |
| else: |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
| |
| |
| if len(segment) > max_chars: |
| |
| words = segment.split() |
| temp_chunk = "" |
| temp_words = [] |
| |
| for word in words: |
| test = temp_chunk + " " + word if temp_chunk else word |
| if len(test) <= max_chars: |
| temp_chunk = test |
| temp_words.append(word) |
| else: |
| if temp_chunk: |
| chunks.append(temp_chunk) |
| temp_chunk = word |
| temp_words = [word] |
| |
| if temp_chunk: |
| current_chunk = temp_chunk |
| current_words = temp_words |
| else: |
| current_chunk = segment |
| current_words = segment.split() |
| |
| |
| if current_chunk: |
| chunks.append(current_chunk) |
| |
| |
| overlapped_chunks = [] |
| for i, chunk in enumerate(chunks): |
| if i > 0: |
| |
| prev_chunk = chunks[i-1] |
| prev_words = prev_chunk.split() |
| overlap_words = prev_words[-3:] if len(prev_words) >= 3 else prev_words |
| |
| if overlap_words: |
| overlap_text = " ".join(overlap_words) |
| |
| test_chunk = overlap_text + " " + chunk |
| if len(test_chunk) <= max_chars: |
| chunk = test_chunk |
| |
| overlapped_chunks.append((chunk, i)) |
| |
| return overlapped_chunks |
|
|
| async def generate_safe_audio(text: str, voice: str, semaphore: asyncio.Semaphore, |
| chunk_index: int) -> Tuple[Optional[str], int]: |
| """Generate audio with rate limiting, caching, and retry logic.""" |
| if not text or len(text) < 2: |
| return None, chunk_index |
| |
| |
| cache_key = f"{text}_{voice}" |
| text_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest() |
| cache_filename = os.path.join(AUDIO_DIR, f"cache_{text_hash}.mp3") |
| |
| |
| if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 1024: |
| return cache_filename, chunk_index |
| |
| async with semaphore: |
| max_retries = 3 |
| base_delay = 2.0 |
| |
| for attempt in range(max_retries): |
| try: |
| |
| with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp: |
| temp_filename = tmp.name |
| |
| comm = edge_tts.Communicate(text, voice=voice) |
| await comm.save(temp_filename) |
| |
| |
| if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1024: |
| |
| os.replace(temp_filename, cache_filename) |
| return cache_filename, chunk_index |
| |
| except Exception as e: |
| |
| try: |
| if os.path.exists(temp_filename): |
| os.unlink(temp_filename) |
| except: |
| pass |
| |
| if attempt == max_retries - 1: |
| print(f"Failed to generate audio chunk {chunk_index} after {max_retries} attempts: {e}") |
| return None, chunk_index |
| |
| |
| sleep_time = (base_delay * (2 ** attempt)) + random.uniform(0.1, 1.0) |
| await asyncio.sleep(sleep_time) |
| |
| return None, chunk_index |
|
|
| def process_audio_segment_fast(audio_data: Tuple[str, int]) -> Tuple[Optional[AudioSegment], int]: |
| """Process audio segment with proper cleanup.""" |
| audio_file, chunk_index = audio_data |
| |
| try: |
| if not audio_file or not os.path.exists(audio_file): |
| return None, chunk_index |
| |
| segment = AudioSegment.from_file(audio_file) |
| |
| |
| if len(segment) > 0: |
| segment = AudioSegment.silent(duration=50) + segment + AudioSegment.silent(duration=50) |
| |
| segment = normalize(segment) |
| |
| return segment, chunk_index |
| |
| except Exception as e: |
| print(f"Warning: Error processing audio segment {chunk_index}: {e}") |
| return None, chunk_index |
|
|
| async def bilingual_tts_optimized(text: str, output_file: str = "audio0.mp3", |
| VOICE_TA: Optional[str] = None, max_concurrent: int = 5) -> Optional[str]: |
| """Optimized bilingual TTS with proper ordering and smooth transitions.""" |
| print("Starting bilingual TTS processing...") |
| |
| try: |
| |
| chunks_with_indices = chunk_text_with_overlap(text, max_chars=250) |
| if not chunks_with_indices: |
| print("Error: No valid text chunks after processing") |
| return None |
| |
| print(f"Processing {len(chunks_with_indices)} text chunks...") |
| |
| |
| chunks_to_generate = [] |
| for chunk_text, chunk_index in chunks_with_indices: |
| has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk_text) |
| |
| if VOICE_TA and has_tamil: |
| voice = VOICE_TA |
| else: |
| voice = VOICE_TA or VOICE_EN |
| |
| chunks_to_generate.append((chunk_text, voice, chunk_index)) |
| |
| |
| semaphore = asyncio.Semaphore(max_concurrent) |
| |
| |
| tasks = [] |
| for chunk_text, voice, chunk_index in chunks_to_generate: |
| tasks.append(generate_safe_audio(chunk_text, voice, semaphore, chunk_index)) |
| |
| |
| results = await asyncio.gather(*tasks, return_exceptions=False) |
| |
| |
| audio_data = [] |
| for result in results: |
| if isinstance(result, tuple) and result[0] and os.path.exists(result[0]): |
| audio_data.append(result) |
| |
| if not audio_data: |
| print("Error: No audio was successfully generated") |
| return None |
| |
| |
| audio_data.sort(key=lambda x: x[1]) |
| |
| print(f"Successfully generated {len(audio_data)} audio segments") |
| |
| |
| with ThreadPoolExecutor(max_workers=min(len(audio_data), 8)) as executor: |
| processed = list(executor.map(process_audio_segment_fast, audio_data)) |
| |
| |
| processed = [(seg, idx) for seg, idx in processed if seg is not None] |
| processed.sort(key=lambda x: x[1]) |
| |
| audio_segments = [seg for seg, idx in processed] |
| |
| if not audio_segments: |
| print("Error: No audio segments were successfully processed") |
| return None |
| |
| print(f"Merging {len(audio_segments)} audio segments with crossfade...") |
| |
| |
| merged_audio = audio_segments[0] |
| |
| for segment in audio_segments[1:]: |
| |
| merged_audio = merged_audio.append(segment, crossfade=30) |
| |
| |
| try: |
| merged_audio = merged_audio.compress_dynamic_range( |
| threshold=-20.0, |
| ratio=2.5, |
| attack=5.0, |
| release=50.0 |
| ) |
| except: |
| pass |
| |
| merged_audio = normalize(merged_audio) |
| |
| |
| merged_audio.export(output_file, format="mp3", bitrate="192k") |
| |
| if os.path.exists(output_file) and os.path.getsize(output_file) > 1024: |
| print(f"✅ Audio successfully generated: {output_file}") |
| return output_file |
| else: |
| print(f"Error: Generated file is empty or missing") |
| return None |
| |
| except Exception as main_error: |
| print(f"Main error in bilingual TTS: {main_error}") |
| traceback.print_exc() |
| return None |
|
|
| async def generate_tts_optimized(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: |
| """Optimized TTS generation function.""" |
| voice_map = { |
| "English": "en-US-JennyNeural", |
| "Tamil": "ta-IN-PallaviNeural", |
| "Hindi": "hi-IN-SwaraNeural", |
| "Malayalam": "ml-IN-SobhanaNeural", |
| "Kannada": "kn-IN-SapnaNeural", |
| "Telugu": "te-IN-ShrutiNeural", |
| "Bengali": "bn-IN-TanishaaNeural", |
| "Marathi": "mr-IN-AarohiNeural", |
| "Gujarati": "gu-IN-DhwaniNeural", |
| "Punjabi": "pa-IN-VaaniNeural", |
| "Urdu": "ur-IN-GulNeural", |
| "French": "fr-FR-DeniseNeural", |
| "German": "de-DE-KatjaNeural", |
| "Spanish": "es-ES-ElviraNeural", |
| "Italian": "it-IT-IsabellaNeural", |
| "Russian": "ru-RU-SvetlanaNeural", |
| "Japanese": "ja-JP-NanamiNeural", |
| "Korean": "ko-KR-SunHiNeural", |
| "Chinese": "zh-CN-XiaoxiaoNeural", |
| "Arabic": "ar-SA-ZariyahNeural", |
| "Portuguese": "pt-BR-FranciscaNeural", |
| "Dutch": "nl-NL-FennaNeural", |
| "Greek": "el-GR-AthinaNeural", |
| "Hebrew": "he-IL-HilaNeural", |
| "Turkish": "tr-TR-EmelNeural", |
| "Polish": "pl-PL-AgnieszkaNeural", |
| "Thai": "th-TH-AcharaNeural", |
| "Vietnamese": "vi-VN-HoaiMyNeural", |
| "Swedish": "sv-SE-SofieNeural", |
| "Finnish": "fi-FI-NooraNeural", |
| "Czech": "cs-CZ-VlastaNeural", |
| "Hungarian": "hu-HU-NoemiNeural" |
| } |
| |
| audio_name = f"audio{id}.mp3" |
| audio_path = os.path.join(AUDIO_DIR, audio_name) |
| |
| if "&&&" in lang: |
| listf = lang.split("&&&") |
| text = listf[0].strip() |
| lang_name = listf[1].strip() if len(listf) > 1 else "English" |
| voice_to_use = voice_map.get(lang_name, VOICE_EN) |
| else: |
| text = lines[id] if isinstance(lines, (list, tuple)) and id < len(lines) else str(lines) |
| voice_to_use = voice_map.get(lang, VOICE_EN) |
| |
| |
| output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=5) |
| |
| if output and os.path.exists(audio_path): |
| try: |
| audio = MP3(audio_path) |
| duration = audio.info.length |
| return duration, audio_path |
| except Exception as e: |
| print(f"Error reading audio file: {e}") |
| return None, None |
| |
| return None, None |
|
|
| def audio_func(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: |
| """Synchronous wrapper for audio generation.""" |
| try: |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| try: |
| return loop.run_until_complete(generate_tts_optimized(id, lines, lang)) |
| finally: |
| loop.close() |
| except Exception as e: |
| print(f"Error in audio_func: {e}") |
| traceback.print_exc() |
| return None, None |
|
|
| def create_manim_script(problem_data, script_path, audio_path, scale=1): |
| """Generate Manim script from problem data with robust wrapping.""" |
|
|
| settings = problem_data.get("video_settings", { |
| "background_color": "#0f0f23", |
| "text_color": "WHITE", |
| "highlight_color": "YELLOW", |
| "font": "CMU Serif", |
| "text_size": 36, |
| "equation_size": 45, |
| "title_size": 48, |
| "wrap_width": 15.5 |
| }) |
|
|
| slides = problem_data.get("slides", []) |
| if not slides: |
| raise ValueError("No slides provided in input data") |
|
|
| slides_repr = repr(slides) |
| audio_path_repr = repr(audio_path) |
|
|
| wrap_width = float(settings.get("wrap_width", 15.5)) |
| background_color = settings.get("background_color", "#0f0f23") |
| text_color = settings.get("text_color", "WHITE") |
| highlight_color = settings.get("highlight_color", "YELLOW") |
| font = settings.get("font", "CMU Serif") |
| text_size = settings.get("text_size", 36) |
| equation_size = settings.get("equation_size", 50) |
| title_size = settings.get("title_size", 48) |
|
|
| manim_code = f"""from manim import * |
| class GeneratedMathScene(Scene): |
| def construct(self): |
| # Scene settings |
| self.add_sound({audio_path_repr}) |
| self.camera.background_color = "{background_color}" |
| default_color = {text_color} |
| highlight_color = {highlight_color} |
| default_font = "{font}" |
| text_size = {text_size} |
| equation_size = {equation_size} |
| title_size = {title_size} |
| wrap_width = {wrap_width} |
| |
| def make_inline_segments(content, color, font, text_size, equation_size): |
| |
| if not content: |
| return VGroup() |
| |
| # Split by # separator |
| segments = content.split("#") |
| mobjects = [] |
| |
| for segment in segments: |
| segment = segment.strip() |
| if not segment: |
| continue |
| |
| # Check if it's a LaTeX equation (starts with %$ and ends with $) |
| if segment.startswith("%"): |
| # Remove %$ from start and $ from end |
| latex_content = segment[1:] |
| mob = MathTex(latex_content, color=color, font_size=equation_size) |
| else: |
| # Regular text |
| mob = Text(segment, color=color, font=font, font_size=text_size) |
| |
| mobjects.append(mob) |
| |
| if not mobjects: |
| return VGroup() |
| |
| # Arrange horizontally with minimal spacing |
| inline_group = VGroup(*mobjects).arrange(RIGHT, buff=0.1) |
| return inline_group |
| |
| def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2): |
| lines = [] |
| words = content.split() |
| current = "" |
| for w in words: |
| test = w if not current else current + " " + w |
| test_obj = Text(test, color=color, font=font, font_size=font_size) |
| if test_obj.width <= wrap_width * 0.9: |
| current = test |
| else: |
| if current: |
| line_obj = Text(current, color=color, font=font, font_size=font_size) |
| lines.append(line_obj) |
| current = w |
| if current: |
| lines.append(Text(current, color=color, font=font, font_size=font_size)) |
| if not lines: |
| return VGroup() |
| first_line = lines[0] |
| for ln in lines: |
| ln.align_to(first_line, LEFT) |
| para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing) |
| return para |
| |
| content_group = VGroup() |
| current_y = 3.0 |
| line_spacing = 0.8 |
| slides = {slides_repr} |
| |
| for idx, slide in enumerate(slides): |
| obj = None |
| content = slide.get("content", "") |
| animation = slide.get("animation", "write_left") |
| scalelen = slide.get("duration", 1.0) |
| duration = scalelen * {scale} |
| slide_type = slide.get("type", "text") |
| |
| if slide_type == "title": |
| # Use inline segments for title |
| obj = make_inline_segments(content, highlight_color, default_font, title_size, equation_size) |
| |
| # Fallback to simple text if no inline segments |
| if len(obj) == 0: |
| obj = Text(content, color=highlight_color, font=default_font, font_size=title_size) |
| |
| if obj.width > wrap_width: |
| obj.scale_to_fit_width(wrap_width) |
| obj.move_to(ORIGIN) |
| self.play(FadeIn(obj), run_time=duration * 0.8) |
| self.wait(duration * 0.3) |
| self.play(FadeOut(obj), run_time=duration * 0.3) |
| continue |
| |
| elif slide_type == "text": |
| # Use inline segments for text |
| obj = make_inline_segments(content, default_color, default_font, text_size, equation_size) |
| |
| # Fallback if no inline segments detected |
| if len(obj) == 0: |
| obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25) |
| |
| # Handle width overflow |
| if obj.width > wrap_width: |
| obj.scale_to_fit_width(wrap_width) |
| |
| elif slide_type == "equation": |
| eq_content = content |
| test = MathTex(eq_content, color=default_color, font_size=equation_size) |
| if test.width > wrap_width: |
| parts = eq_content.split(" ") |
| mid = len(parts) // 2 |
| line1 = " ".join(parts[:mid]) |
| line2 = " ".join(parts[mid:]) |
| wrapped_eq = f"{{line1}} \\\\ {{line2}}" |
| obj = MathTex(wrapped_eq, color=default_color, font_size=equation_size) |
| else: |
| obj = MathTex(eq_content, color=default_color, font_size=equation_size) |
| if obj.width > wrap_width: |
| obj.scale_to_fit_width(wrap_width) |
| |
| if obj: |
| obj.to_edge(LEFT, buff=0.3) |
| obj.shift(UP * (current_y - obj.height / 2)) |
| obj_bottom = obj.get_bottom()[1] |
| |
| if obj_bottom < -3.5: |
| scroll_amount = abs(obj_bottom - (-3.5)) + 0.3 |
| self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5) |
| current_y += scroll_amount |
| obj.shift(UP * scroll_amount) |
| obj.to_edge(LEFT, buff=0.3) |
| |
| if animation == "write_left": |
| self.play(Write(obj), run_time=duration) |
| elif animation == "fade_in": |
| self.play(FadeIn(obj), run_time=duration) |
| elif animation == "highlight_left": |
| self.play(Write(obj), run_time=duration * 0.6) |
| self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4) |
| else: |
| self.play(Write(obj), run_time=duration) |
| |
| content_group.add(obj) |
| current_y -= (getattr(obj, "height", 0) + line_spacing) |
| self.wait(0.3) |
| |
| if len(content_group) > 0: |
| final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2) |
| self.play(Create(final_box), run_time=0.8) |
| self.wait(1.5) |
| """ |
|
|
| try: |
| with open(script_path, 'w', encoding='utf-8') as f: |
| f.write(manim_code) |
| print(f"Generated script at {script_path}") |
| except Exception as e: |
| print(f"Error writing script: {e}") |
| raise |
|
|
|
|
| @app.route("/") |
| def home(): |
| return "Flask Manim Video Generator is Running" |
|
|
|
|
| @app.route("/generate", methods=["POST"]) |
| def generate_video(): |
| temp_work_dir = None |
| try: |
| raw_data = request.get_json() |
| if not raw_data: |
| return jsonify({"error": "No JSON data provided"}), 400 |
|
|
| raw_body = raw_data.get("jsondata", '') |
| if not raw_body: |
| return jsonify({"error": "No jsondata field in request"}), 400 |
|
|
| lst = raw_body.split("&&&&") |
| if len(lst) < 2: |
| return jsonify({"error": "Invalid data format, missing &&&&separator"}), 400 |
|
|
| cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0]) |
|
|
| try: |
| nlist = ast.literal_eval(cleaned) |
| except Exception as e: |
| return jsonify({"error": f"Failed to parse slide data: {str(e)}"}), 400 |
|
|
| datalst = [] |
| total = 0.0 |
|
|
| for line in range(len(nlist)): |
| try: |
| total += float(nlist[line][3]) |
| datalst.append({ |
| "type": nlist[line][0].strip(), |
| "content": nlist[line][1].strip(), |
| "animation": nlist[line][2].strip().replace(" ", ""), |
| "duration": float(nlist[line][3]) |
| }) |
| except (IndexError, ValueError) as e: |
| return jsonify({"error": f"Invalid slide data at index {line}: {str(e)}"}), 400 |
|
|
| if total <= 0: |
| total = 1.0 |
|
|
| data = { |
| "video_settings": { |
| "background_color": "#0f0f23", |
| "text_color": "WHITE", |
| "highlight_color": "YELLOW", |
| "font": "CMU Serif", |
| "text_size": 36, |
| "equation_size": 42, |
| "title_size": 48 |
| }, |
| "slides": datalst |
| } |
|
|
| best = lst[1].split("&&&") |
| lines = best[0] |
| try: |
| lang = best[1] if len(best) > 1 else "English" |
| except: |
| lang = "English" |
|
|
| length, audio_path = audio_func(0, lines, lang) |
|
|
| if not length or not audio_path or not os.path.exists(audio_path): |
| return jsonify({"error": "Failed to generate audio"}), 500 |
|
|
| scale = float(length) / total if total > 0 else 1.0 |
|
|
| if "slides" not in data or not data["slides"]: |
| return jsonify({"error": "No slides provided in request"}), 400 |
|
|
| print(f"Received request with {len(data['slides'])} slides") |
|
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}") |
| os.makedirs(temp_work_dir, exist_ok=True) |
|
|
| script_path = os.path.join(temp_work_dir, "scene.py") |
| create_manim_script(data, script_path, audio_path, scale) |
| print(f"Created Manim script at {script_path}") |
|
|
| quality = 'l' |
| render_command = [ |
| "manim", |
| f"-q{quality}", |
| "--disable_caching", |
| "--media_dir", temp_work_dir, |
| script_path, |
| "GeneratedMathScene" |
| ] |
|
|
| print(f"Running command: {' '.join(render_command)}") |
|
|
| result = subprocess.run( |
| render_command, |
| capture_output=True, |
| text=True, |
| cwd=temp_work_dir, |
| timeout=120 |
| ) |
|
|
| if result.returncode != 0: |
| error_msg = result.stderr or result.stdout |
| print(f"Manim rendering failed: {error_msg}") |
| return jsonify({ |
| "error": "Manim rendering failed", |
| "details": error_msg |
| }), 500 |
|
|
| print("Manim rendering completed successfully") |
|
|
| quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'} |
| video_quality = quality_map.get(quality, '480p15') |
|
|
| video_path = os.path.join( |
| temp_work_dir, |
| "videos", |
| "scene", |
| video_quality, |
| "GeneratedMathScene.mp4" |
| ) |
|
|
| if not os.path.exists(video_path): |
| print(f"Video not found at expected path: {video_path}") |
| return jsonify({ |
| "error": "Video file not found after rendering", |
| "expected_path": video_path |
| }), 500 |
|
|
| print(f"Video found at: {video_path}") |
|
|
| output_filename = f"math_video_{timestamp}.mp4" |
| output_path = os.path.join(MEDIA_DIR, output_filename) |
| shutil.copy(video_path, output_path) |
| print(f"Video copied to: {output_path}") |
|
|
| try: |
| if temp_work_dir and os.path.exists(temp_work_dir): |
| shutil.rmtree(temp_work_dir) |
| print("Cleaned up temp directory") |
| except Exception as e: |
| print(f"Failed to clean temp dir: {e}") |
|
|
| return send_file( |
| output_path, |
| mimetype='video/mp4', |
| as_attachment=False, |
| download_name=output_filename |
| ) |
|
|
| except subprocess.TimeoutExpired: |
| print("Video rendering timeout") |
| if temp_work_dir and os.path.exists(temp_work_dir): |
| try: |
| shutil.rmtree(temp_work_dir) |
| except: |
| pass |
| return jsonify({"error": "Video rendering timeout (120s)"}), 504 |
|
|
| except Exception as e: |
| print(f"Error: {str(e)}") |
| traceback.print_exc() |
| if temp_work_dir and os.path.exists(temp_work_dir): |
| try: |
| shutil.rmtree(temp_work_dir) |
| except: |
| pass |
| return jsonify({ |
| "error": str(e), |
| "traceback": traceback.format_exc() |
| }), 500 |
|
|
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 7860)) |
| app.run(host='0.0.0.0', port=port, debug=False) |