| from flask import Flask, request, jsonify, send_file |
| from flask_cors import CORS |
| import os |
| import subprocess |
| import tempfile |
| import shutil |
| from datetime import datetime |
| import traceback |
| import json |
| import ast |
| import re |
| import textwrap |
| from manim import * |
|
|
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| BASE_DIR = "/app" |
| MEDIA_DIR = os.path.join(BASE_DIR, "media") |
| TEMP_DIR = os.path.join(BASE_DIR, "temp") |
| AUDIO_DIR = os.path.join(BASE_DIR, "sound") |
| os.makedirs(MEDIA_DIR, exist_ok=True) |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| os.makedirs(AUDIO_DIR, exist_ok=True) |
| |
| API_KEY = "rkmentormindzofficaltokenkey12345" |
|
|
|
|
| |
| import re |
| import html |
| import unicodedata |
| import tempfile |
| import os |
| import asyncio |
| from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor |
| from functools import lru_cache |
| import edge_tts |
| from pydub import AudioSegment |
| from pydub.effects import normalize |
| from mutagen.mp3 import MP3 |
|
|
| VOICE_EN = "en-IN-NeerjaNeural" |
|
|
| |
| URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+') |
| TAG_PATTERN = re.compile(r'<[^>]*>|[<>]') |
| BRACKET_PATTERN = re.compile(r'[\{\}\[\]]') |
| SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]') |
| WHITESPACE_PATTERN = re.compile(r'\s+') |
| SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+') |
| SUB_PATTERN = re.compile(r'(?<=[,;:])\s+') |
|
|
| @lru_cache(maxsize=1024) |
| def clean_text_for_tts(text): |
| """Cleans text before TTS with optimized regex and caching.""" |
| if not text: |
| return "" |
| text = str(text).strip() |
| text = html.unescape(text) |
| |
| |
| text = URL_PATTERN.sub('', text) |
| text = TAG_PATTERN.sub('', text) |
| text = BRACKET_PATTERN.sub('', text) |
| text = SPECIAL_CHAR_PATTERN.sub('', text) |
| text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ') |
| |
| |
| for keyword in ['voice', 'speak', 'prosody', 'ssml', 'xmlns']: |
| text = text.replace(keyword, '').replace(keyword.upper(), '') |
| |
| text = unicodedata.normalize('NFKD', text) |
| text = WHITESPACE_PATTERN.sub(' ', text) |
| return text.strip() |
|
|
| async def generate_safe_audio(text, voice, semaphore): |
| """Generate clean audio with rate limiting.""" |
| async with semaphore: |
| cleaned_text = clean_text_for_tts(text) |
| if not cleaned_text: |
| return None |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') |
| fname = temp_file.name |
| temp_file.close() |
| |
| try: |
| comm = edge_tts.Communicate(cleaned_text, voice=voice) |
| await comm.save(fname) |
| return fname |
| except Exception as e: |
| print(f"Error generating audio: {e}") |
| if os.path.exists(fname): |
| os.unlink(fname) |
| return None |
|
|
| @lru_cache(maxsize=256) |
| def smart_text_chunking(text, max_chars=80): |
| """Cached text chunking for speed.""" |
| text = clean_text_for_tts(text) |
| if not text: |
| return tuple() |
| |
| sentences = SENTENCE_PATTERN.split(text) |
| chunks = [] |
| |
| for sentence in sentences: |
| sentence = sentence.strip() |
| if not sentence: |
| continue |
| |
| if len(sentence) <= max_chars: |
| chunks.append(sentence) |
| else: |
| sub_parts = SUB_PATTERN.split(sentence) |
| for part in sub_parts: |
| part = part.strip() |
| if not part: |
| continue |
| |
| if len(part) <= max_chars: |
| chunks.append(part) |
| else: |
| words = part.split() |
| current_chunk = "" |
| for word in words: |
| test_chunk = f"{current_chunk} {word}" if current_chunk else word |
| if len(test_chunk) <= max_chars: |
| current_chunk = test_chunk |
| else: |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| current_chunk = word |
| if current_chunk: |
| chunks.append(current_chunk.strip()) |
| |
| return tuple(chunk for chunk in chunks if chunk.strip()) |
|
|
| def process_audio_segment_fast(audio_file): |
| """Fast audio processing in separate thread.""" |
| try: |
| segment = AudioSegment.from_file(audio_file) |
| segment = normalize(segment) |
| |
| |
| if len(segment) > 200: |
| try: |
| segment = segment.strip_silence(silence_len=50, silence_thresh=-40) |
| except: |
| pass |
| |
| return segment |
| except Exception as e: |
| print(f"Warning: Error processing audio segment: {e}") |
| return None |
| finally: |
| |
| try: |
| if os.path.exists(audio_file): |
| os.unlink(audio_file) |
| except: |
| pass |
|
|
| async def bilingual_tts_optimized(text, output_file="audio0.mp3", VOICE_TA=None, max_concurrent=10): |
| """Ultra-optimized bilingual TTS with parallel processing.""" |
| print("Starting optimized bilingual TTS processing...") |
| |
| try: |
| chunks = smart_text_chunking(text) |
| if not chunks: |
| print("Error: No valid text chunks after cleaning") |
| return None |
| |
| print(f"Processing {len(chunks)} text chunks with max {max_concurrent} concurrent requests...") |
| |
| is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA |
| |
| |
| semaphore = asyncio.Semaphore(max_concurrent) |
| |
| |
| tasks = [] |
| for i, chunk in enumerate(chunks): |
| is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk) |
| voice = VOICE_TA if (is_bilingual_tamil and is_tamil) else (VOICE_TA or VOICE_EN) |
| tasks.append(generate_safe_audio(chunk, voice, semaphore)) |
| |
| |
| audio_files = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| |
| processed_audio_files = [f for f in audio_files if isinstance(f, str) and f] |
| |
| if not processed_audio_files: |
| print("Error: No audio was successfully generated") |
| return None |
| |
| print(f"Successfully generated {len(processed_audio_files)} audio segments") |
| |
| |
| with ThreadPoolExecutor(max_workers=min(len(processed_audio_files), 8)) as executor: |
| audio_segments = list(executor.map(process_audio_segment_fast, processed_audio_files)) |
| |
| |
| audio_segments = [seg for seg in audio_segments if seg is not None] |
| |
| if not audio_segments: |
| print("Error: No audio segments were successfully processed") |
| return None |
| |
| |
| print("Merging audio segments...") |
| merged_audio = audio_segments[0] |
| pause = AudioSegment.silent(duration=200) |
| |
| for segment in audio_segments[1:]: |
| merged_audio += pause + segment |
| |
| |
| print("Applying final audio processing...") |
| merged_audio = merged_audio.compress_dynamic_range( |
| threshold=-20.0, |
| ratio=4.0, |
| attack=5.0, |
| release=50.0 |
| ) |
| merged_audio = normalize(merged_audio) |
| |
| |
| merged_audio.export(output_file, format="mp3", bitrate="192k") |
| print(f"✅ Audio successfully generated: {output_file}") |
| |
| return output_file |
| |
| except Exception as main_error: |
| print(f"Main error in bilingual TTS: {main_error}") |
| return None |
|
|
| async def generate_tts_optimized(id, lines, lang): |
| """Optimized TTS generation function.""" |
| voice = { |
| "English": "en-US-JennyNeural", |
| "Tamil": "ta-IN-PallaviNeural", |
| "Hindi": "hi-IN-SwaraNeural", |
| "Malayalam": "ml-IN-SobhanaNeural", |
| "Kannada": "kn-IN-SapnaNeural", |
| "Telugu": "te-IN-ShrutiNeural", |
| "Bengali": "bn-IN-TanishaaNeural", |
| "Marathi": "mr-IN-AarohiNeural", |
| "Gujarati": "gu-IN-DhwaniNeural", |
| "Punjabi": "pa-IN-VaaniNeural", |
| "Urdu": "ur-IN-GulNeural", |
| "French": "fr-FR-DeniseNeural", |
| "German": "de-DE-KatjaNeural", |
| "Spanish": "es-ES-ElviraNeural", |
| "Italian": "it-IT-IsabellaNeural", |
| "Russian": "ru-RU-SvetlanaNeural", |
| "Japanese": "ja-JP-NanamiNeural", |
| "Korean": "ko-KR-SunHiNeural", |
| "Chinese": "zh-CN-XiaoxiaoNeural", |
| "Arabic": "ar-SA-ZariyahNeural", |
| "Portuguese": "pt-BR-FranciscaNeural", |
| "Dutch": "nl-NL-FennaNeural", |
| "Greek": "el-GR-AthinaNeural", |
| "Hebrew": "he-IL-HilaNeural", |
| "Turkish": "tr-TR-EmelNeural", |
| "Polish": "pl-PL-AgnieszkaNeural", |
| "Thai": "th-TH-AcharaNeural", |
| "Vietnamese": "vi-VN-HoaiMyNeural", |
| "Swedish": "sv-SE-SofieNeural", |
| "Finnish": "fi-FI-NooraNeural", |
| "Czech": "cs-CZ-VlastaNeural", |
| "Hungarian": "hu-HU-NoemiNeural" |
| } |
| |
| audio_name = f"audio{id}.mp3" |
| audio_path = os.path.join(AUDIO_DIR, audio_name) |
| |
| if "&&&" in lang: |
| listf = lang.split("&&&") |
| text = listf[0].strip() |
| lang_name = listf[1].strip() |
| voice_to_use = voice.get(lang_name, VOICE_EN) |
| else: |
| text = lines[id] |
| voice_to_use = voice.get(lang, VOICE_EN) |
| |
| |
| output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=15) |
| |
| if output and os.path.exists(audio_path): |
| audio = MP3(audio_path) |
| duration = audio.info.length |
| return duration, audio_path |
| |
| return None, None |
|
|
| def audio_func(id, lines, lang): |
| """Synchronous wrapper for audio generation.""" |
| return asyncio.run(generate_tts_optimized(id, lines, lang)) |
|
|
| def make_wrapped_paragraph(content, max_width, color, font, font_size, line_spacing, align_left=True): |
| """ |
| Build a vertically stacked group of Text lines that together form a paragraph. |
| It splits content into lines that fit within max_width by measuring rendered width. |
| Each line is a separate Text object joined into a VGroup and arranged downward. |
| """ |
| words = content.split() |
| lines = [] |
| current = "" |
|
|
| |
| temp = Text("", color=color, font=font, font_size=font_size) |
|
|
| for w in words: |
| test = w if not current else current + " " + w |
| test_obj = Text(test, color=color, font=font, font_size=font_size) |
| if test_obj.width <= max_width: |
| current = test |
| else: |
| |
| line = Text(current, color=color, font=font, font_size=font_size) |
| lines.append(line) |
| current = w |
| if current: |
| lines.append(Text(current, color=color, font=font, font_size=font_size)) |
|
|
| if not lines: |
| return VGroup() |
|
|
| para = VGroup(*lines) |
| |
| para.arrange(DOWN, buff=line_spacing) |
| if align_left: |
| para = para.align_to(LEFT) |
| return para.strip() |
|
|
| def create_manim_script(problem_data, script_path,audio_path,scale=1): |
| """Generate Manim script from problem data with robust wrapping for title, text, and equations.""" |
|
|
| |
| settings = problem_data.get("video_settings", { |
| "background_color": "#0f0f23", |
| "text_color": "WHITE", |
| "highlight_color": "YELLOW", |
| "font": "", |
| "text_size": 36, |
| "equation_size": 45, |
| "title_size": 48, |
| "wrap_width": 15.5 |
| }) |
|
|
| slides = problem_data.get("slides", []) |
| if not slides: |
| raise ValueError("No slides provided in input data") |
|
|
| slides_repr = repr(slides) |
|
|
| |
| wrap_width = float(settings.get("wrap_width", 15.5)) |
|
|
| manim_code = f''' |
| from manim import * |
| import textwrap |
| class GeneratedMathScene(Scene): |
| def construct(self): |
| # Scene settings |
| self.add_sound({audio_path}) |
| self.camera.background_color = "{settings.get('background_color', '#0f0f23')}" |
| default_color = {settings.get('text_color', 'WHITE')} |
| highlight_color = {settings.get('highlight_color', 'YELLOW')} |
| default_font = "{settings.get('font', 'CMU Serif')}" |
| text_size = {settings.get('text_size', 36)} |
| equation_size = {settings.get('equation_size', 45)} |
| title_size = {settings.get('title_size', 48)} |
| wrap_width = {wrap_width} |
| |
| # Helper to wrap text into lines that fit within max width |
| def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2): |
| lines = [] |
| words = content.split() |
| current = "" |
| |
| for w in words: |
| test = w if not current else current + " " + w |
| test_obj = Text(test, color=color, font=font, font_size=font_size) |
| |
| if test_obj.width <= wrap_width * 0.9: |
| current = test |
| else: |
| line_obj = Text(current, color=color, font=font, font_size=font_size) |
| lines.append(line_obj) |
| current = w |
| |
| if current: |
| lines.append(Text(current, color=color, font=font, font_size=font_size)) |
| |
| if not lines: |
| return VGroup() |
| |
| # --- FIX: Force every line to align to LEFT like line 1 --- |
| first_line = lines[0] |
| for ln in lines: |
| ln.align_to(first_line, LEFT) |
| |
| para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing) |
| return para |
| class GeneratedMathSceneInner(Scene): |
| pass |
| content_group = VGroup() |
| current_y = 3.0 |
| line_spacing = 0.8 |
| slides = {slides_repr} |
| |
| # Build each slide |
| for idx, slide in enumerate(slides): |
| obj = None |
| content = slide.get("content", "") |
| animation = slide.get("animation", "write_left") |
| scalelen = slide.get("duration", 1.0) |
| duration=scalelen*{scale} |
| slide_type = slide.get("type", "text") |
| |
| if slide_type == "title": |
| # Wrap title text |
| title_text = content |
| # Use paragraph wrapping to keep multi-line titles readable |
| lines = [] |
| if title_text: |
| lines = [] |
| # Reuse make_wrapped_paragraph by simulating a single paragraph |
| lines_group = make_wrapped_paragraph(title_text, highlight_color, default_font, title_size, line_spacing=0.2) |
| obj = lines_group if len(lines_group) > 0 else Text(title_text, color=highlight_color, font=default_font, font_size=title_size) |
| else: |
| obj = Text("", color=highlight_color, font=default_font, font_size=title_size) |
| if obj.width > wrap_width: |
| obj.scale_to_fit_width(wrap_width) |
| |
| obj.move_to(ORIGIN) |
| self.play(FadeIn(obj), run_time=duration * 0.8) |
| self.wait(duration * 0.3) |
| self.play(FadeOut(obj), run_time=duration * 0.3) |
| continue |
| |
| elif slide_type == "text": |
| # Use wrapping for normal text |
| obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25) |
| |
| elif slide_type == "equation": |
| # Wrap long equations by splitting content into lines if needed |
| # Heuristic: if content is too wide, create a multi-line TeX using \\ line breaks |
| eq_content = content |
| # Optional: insert line breaks at common math breakpoints if needed |
| test = MathTex(eq_content, color=default_color, font_size=equation_size) |
| if test.width > wrap_width: |
| # naive wrap: insert line breaks at spaces near the middle |
| parts = eq_content.split(" ") |
| mid = len(parts)//2 |
| line1 = " ".join(parts[:mid]) |
| line2 = " ".join(parts[mid:]) |
| wrapped_eq = f"{{line1}} \\\\\\\\ {{line2}}" |
| obj = MathTex(wrapped_eq, color=default_color, font_size=equation_size) |
| else: |
| obj = MathTex(eq_content, color=default_color, font_size=equation_size) |
| |
| if obj.width > wrap_width: |
| obj.scale_to_fit_width(wrap_width) |
| |
| if obj: |
| # Position and animate |
| obj.to_edge(LEFT, buff=0.3) |
| obj.shift(UP * (current_y - obj.height/2)) |
| |
| obj_bottom = obj.get_bottom()[1] |
| if obj_bottom < -3.5: |
| scroll_amount = abs(obj_bottom - (-3.5)) + 0.3 |
| self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5) |
| current_y += scroll_amount |
| obj.shift(UP * scroll_amount) |
| obj.to_edge(LEFT, buff=0.3) |
| |
| if animation == "write_left": |
| self.play(Write(obj), run_time=duration) |
| elif animation == "fade_in": |
| self.play(FadeIn(obj), run_time=duration) |
| elif animation == "highlight_left": |
| self.play(Write(obj), run_time=duration * 0.6) |
| self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4) |
| else: |
| self.play(Write(obj), run_time=duration) |
| |
| content_group.add(obj) |
| # Decrease y for next item |
| current_y -= (getattr(obj, "height", 0) + line_spacing) |
| self.wait(0.3) |
| |
| if len(content_group) > 0: |
| final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2) |
| self.play(Create(final_box), run_time=0.8) |
| self.wait(1.5) |
| ''' |
|
|
| with open(script_path, 'w', encoding='utf-8') as f: |
| f.write(manim_code) |
| |
| print(f"Generated script preview (first 500 chars):{manim_code[:500]}...") |
|
|
| @app.route("/") |
| def home(): |
| return "Flask Manim Video Generator is Running" |
|
|
| @app.route("/generate", methods=["POST"]) |
| def generate_video(): |
| try: |
| raw_data = request.get_json() |
| raw_body=raw_data.get("jsondata" , '') |
| |
| |
| lst = raw_body.split("&&&&") |
| cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0]) |
| nlist = ast.literal_eval(cleaned) |
| datalst=[] |
| total=0 |
| scale=1 |
| for line in range(len(nlist)): |
| total=total+float(nlist[line][3]) |
| datalst.append({ |
| "type": nlist[line][0].strip(), |
| "content": nlist[line][1].strip(), |
| "animation": nlist[line][2].strip().replace(" ",""), |
| "duration": nlist[line][3] |
| }) |
| |
| data={ |
| "video_settings": { |
| "background_color": "#0f0f23", |
| "text_color": "WHITE", |
| "highlight_color": "YELLOW", |
| "font": "CMU Serif", |
| "text_size": 36, |
| "equation_size": 42, |
| "title_size": 48 |
| }, |
| "slides":datalst} |
| |
| best=lst[1].split("&&&") |
| lines=best[0] |
| lang=best[1] |
| length, audio_path = audio_func(0, lines, lang) |
| if not length or not audio_path: |
| print("Failed to generate audio.") |
|
|
| scale=length/total |
| |
| |
| print(json.dumps(data, indent=2)) |
| |
| if "slides" not in data or not data["slides"]: |
| return jsonify({"error": "No slides provided in request"}), 400 |
| |
| print(f"✅ Parsed {len(data['slides'])} slides successfully.") |
| |
| |
| if "slides" not in data or not data["slides"]: |
| return jsonify({"error": "No slides provided in request"}), 400 |
| |
| print(f"Received request with {len(data['slides'])} slides") |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}") |
| os.makedirs(temp_work_dir, exist_ok=True) |
| |
| |
| script_path = os.path.join(temp_work_dir, "scene.py") |
| create_manim_script(data, script_path,audio_path,scale) |
| print(f"Created Manim script at {script_path}") |
| |
| |
| quality = 'l' |
| render_command = [ |
| "manim", |
| f"-q{quality}", |
| "--disable_caching", |
| "--media_dir", temp_work_dir, |
| script_path, |
| "GeneratedMathScene" |
| ] |
| |
| print(f"Running command: {' '.join(render_command)}") |
| |
| result = subprocess.run( |
| render_command, |
| capture_output=True, |
| text=True, |
| cwd=temp_work_dir, |
| timeout=120 |
| ) |
| |
| if result.returncode != 0: |
| error_msg = result.stderr or result.stdout |
| print(f"Manim rendering failed: {error_msg}") |
| return jsonify({ |
| "error": "Manim rendering failed", |
| "details": error_msg |
| }), 500 |
| |
| print("Manim rendering completed successfully") |
| |
| |
| quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'} |
| video_quality = quality_map.get(quality, '480p15') |
| |
| video_path = os.path.join( |
| temp_work_dir, |
| "videos", |
| "scene", |
| video_quality, |
| "GeneratedMathScene.mp4" |
| ) |
| |
| if not os.path.exists(video_path): |
| print(f"Video not found at expected path: {video_path}") |
| return jsonify({ |
| "error": "Video file not found after rendering", |
| "expected_path": video_path |
| }), 500 |
| |
| print(f"Video found at: {video_path}") |
| |
| |
| output_filename = f"math_video_{timestamp}.mp4" |
| output_path = os.path.join(MEDIA_DIR, output_filename) |
| shutil.copy(video_path, output_path) |
| print(f"Video copied to: {output_path}") |
| |
| |
| try: |
| shutil.rmtree(temp_work_dir) |
| print("Cleaned up temp directory") |
| except Exception as e: |
| print(f"Failed to clean temp dir: {e}") |
|
|
| return send_file( |
| output_path, |
| mimetype='video/mp4', |
| as_attachment=False, |
| download_name=output_filename |
| ) |
| |
| except subprocess.TimeoutExpired: |
| print("Video rendering timeout") |
| return jsonify({"error": "Video rendering timeout (120s)"}), 504 |
| except Exception as e: |
| print(f"Error: {str(e)}") |
| traceback.print_exc() |
| return jsonify({ |
| "error": str(e), |
| "traceback": traceback.format_exc() |
| }), 500 |
|
|
| if __name__ == '__main__': |
| port = int(os.environ.get('PORT', 7860)) |
| app.run(host='0.0.0.0', port=port, debug=False) |
|
|