sreepathi-ravikumar's picture
Update app.py
c2bcfbf verified
raw
history blame
24.8 kB
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import os
import subprocess
import tempfile
import shutil
from datetime import datetime
import traceback
import json
import ast
import re
import textwrap
from manim import *
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configuration
BASE_DIR = "/app"
MEDIA_DIR = os.path.join(BASE_DIR, "media")
TEMP_DIR = os.path.join(BASE_DIR, "temp")
AUDIO_DIR = os.path.join(BASE_DIR, "sound")
os.makedirs(MEDIA_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)
# API Key for security (optional)
API_KEY = "rkmentormindzofficaltokenkey12345"
import re
import html
import unicodedata
import tempfile
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import lru_cache
import edge_tts
from pydub import AudioSegment
from pydub.effects import normalize
from mutagen.mp3 import MP3
VOICE_EN = "en-IN-NeerjaNeural"
# Pre-compiled regex patterns for speed (compiled once, reused many times)
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+')
TAG_PATTERN = re.compile(r'<[^>]*>|[<>]')
BRACKET_PATTERN = re.compile(r'[\{\}\[\]]')
SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]')
WHITESPACE_PATTERN = re.compile(r'\s+')
SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+')
SUB_PATTERN = re.compile(r'(?<=[,;:])\s+')
@lru_cache(maxsize=1024) # Cache cleaned text to avoid re-processing
def clean_text_for_tts(text):
"""Cleans text before TTS with optimized regex and caching."""
if not text:
return ""
text = str(text).strip()
text = html.unescape(text)
# Use pre-compiled patterns (much faster)
text = URL_PATTERN.sub('', text)
text = TAG_PATTERN.sub('', text)
text = BRACKET_PATTERN.sub('', text)
text = SPECIAL_CHAR_PATTERN.sub('', text)
text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ')
# Batch remove keywords (faster than multiple re.sub calls)
for keyword in ['voice', 'speak', 'prosody', 'ssml', 'xmlns']:
text = text.replace(keyword, '').replace(keyword.upper(), '')
text = unicodedata.normalize('NFKD', text)
text = WHITESPACE_PATTERN.sub(' ', text)
return text.strip()
async def generate_safe_audio(text, voice, semaphore):
"""Generate clean audio with rate limiting."""
async with semaphore: # Limit concurrent TTS requests
cleaned_text = clean_text_for_tts(text)
if not cleaned_text:
return None
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
fname = temp_file.name
temp_file.close()
try:
comm = edge_tts.Communicate(cleaned_text, voice=voice)
await comm.save(fname)
return fname
except Exception as e:
print(f"Error generating audio: {e}")
if os.path.exists(fname):
os.unlink(fname)
return None
@lru_cache(maxsize=256)
def smart_text_chunking(text, max_chars=80):
"""Cached text chunking for speed."""
text = clean_text_for_tts(text)
if not text:
return tuple() # Return tuple for hashability (required by lru_cache)
sentences = SENTENCE_PATTERN.split(text)
chunks = []
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(sentence) <= max_chars:
chunks.append(sentence)
else:
sub_parts = SUB_PATTERN.split(sentence)
for part in sub_parts:
part = part.strip()
if not part:
continue
if len(part) <= max_chars:
chunks.append(part)
else:
words = part.split()
current_chunk = ""
for word in words:
test_chunk = f"{current_chunk} {word}" if current_chunk else word
if len(test_chunk) <= max_chars:
current_chunk = test_chunk
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = word
if current_chunk:
chunks.append(current_chunk.strip())
return tuple(chunk for chunk in chunks if chunk.strip())
def process_audio_segment_fast(audio_file):
"""Fast audio processing in separate thread."""
try:
segment = AudioSegment.from_file(audio_file)
segment = normalize(segment)
# Only strip silence for longer segments
if len(segment) > 200:
try:
segment = segment.strip_silence(silence_len=50, silence_thresh=-40)
except:
pass # Skip if fails
return segment
except Exception as e:
print(f"Warning: Error processing audio segment: {e}")
return None
finally:
# Cleanup temp file immediately
try:
if os.path.exists(audio_file):
os.unlink(audio_file)
except:
pass
async def bilingual_tts_optimized(text, output_file="audio0.mp3", VOICE_TA=None, max_concurrent=10):
"""Ultra-optimized bilingual TTS with parallel processing."""
print("Starting optimized bilingual TTS processing...")
try:
chunks = smart_text_chunking(text)
if not chunks:
print("Error: No valid text chunks after cleaning")
return None
print(f"Processing {len(chunks)} text chunks with max {max_concurrent} concurrent requests...")
is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA
# Semaphore to limit concurrent TTS requests (prevents rate limiting)
semaphore = asyncio.Semaphore(max_concurrent)
# Prepare all tasks
tasks = []
for i, chunk in enumerate(chunks):
is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk)
voice = VOICE_TA if (is_bilingual_tamil and is_tamil) else (VOICE_TA or VOICE_EN)
tasks.append(generate_safe_audio(chunk, voice, semaphore))
# Generate all audio files concurrently
audio_files = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful files
processed_audio_files = [f for f in audio_files if isinstance(f, str) and f]
if not processed_audio_files:
print("Error: No audio was successfully generated")
return None
print(f"Successfully generated {len(processed_audio_files)} audio segments")
# Process audio segments in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(processed_audio_files), 8)) as executor:
audio_segments = list(executor.map(process_audio_segment_fast, processed_audio_files))
# Filter out None segments
audio_segments = [seg for seg in audio_segments if seg is not None]
if not audio_segments:
print("Error: No audio segments were successfully processed")
return None
# Merge audio segments (fast concatenation)
print("Merging audio segments...")
merged_audio = audio_segments[0]
pause = AudioSegment.silent(duration=200)
for segment in audio_segments[1:]:
merged_audio += pause + segment
# Apply final processing (compression and normalization)
print("Applying final audio processing...")
merged_audio = merged_audio.compress_dynamic_range(
threshold=-20.0,
ratio=4.0,
attack=5.0,
release=50.0
)
merged_audio = normalize(merged_audio)
# Export with high quality
merged_audio.export(output_file, format="mp3", bitrate="192k")
print(f"✅ Audio successfully generated: {output_file}")
return output_file
except Exception as main_error:
print(f"Main error in bilingual TTS: {main_error}")
return None
async def generate_tts_optimized(id, lines, lang):
"""Optimized TTS generation function."""
voice = {
"English": "en-US-JennyNeural",
"Tamil": "ta-IN-PallaviNeural",
"Hindi": "hi-IN-SwaraNeural",
"Malayalam": "ml-IN-SobhanaNeural",
"Kannada": "kn-IN-SapnaNeural",
"Telugu": "te-IN-ShrutiNeural",
"Bengali": "bn-IN-TanishaaNeural",
"Marathi": "mr-IN-AarohiNeural",
"Gujarati": "gu-IN-DhwaniNeural",
"Punjabi": "pa-IN-VaaniNeural",
"Urdu": "ur-IN-GulNeural",
"French": "fr-FR-DeniseNeural",
"German": "de-DE-KatjaNeural",
"Spanish": "es-ES-ElviraNeural",
"Italian": "it-IT-IsabellaNeural",
"Russian": "ru-RU-SvetlanaNeural",
"Japanese": "ja-JP-NanamiNeural",
"Korean": "ko-KR-SunHiNeural",
"Chinese": "zh-CN-XiaoxiaoNeural",
"Arabic": "ar-SA-ZariyahNeural",
"Portuguese": "pt-BR-FranciscaNeural",
"Dutch": "nl-NL-FennaNeural",
"Greek": "el-GR-AthinaNeural",
"Hebrew": "he-IL-HilaNeural",
"Turkish": "tr-TR-EmelNeural",
"Polish": "pl-PL-AgnieszkaNeural",
"Thai": "th-TH-AcharaNeural",
"Vietnamese": "vi-VN-HoaiMyNeural",
"Swedish": "sv-SE-SofieNeural",
"Finnish": "fi-FI-NooraNeural",
"Czech": "cs-CZ-VlastaNeural",
"Hungarian": "hu-HU-NoemiNeural"
}
audio_name = f"audio{id}.mp3"
audio_path = os.path.join(AUDIO_DIR, audio_name)
if "&&&" in lang:
listf = lang.split("&&&")
text = listf[0].strip()
lang_name = listf[1].strip()
voice_to_use = voice.get(lang_name, VOICE_EN)
else:
text = lines[id]
voice_to_use = voice.get(lang, VOICE_EN)
# Increase max_concurrent for more speed (adjust based on your system)
output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=15)
if output and os.path.exists(audio_path):
audio = MP3(audio_path)
duration = audio.info.length
return duration, audio_path
return None, None
def audio_func(id, lines, lang):
"""Synchronous wrapper for audio generation."""
return asyncio.run(generate_tts_optimized(id, lines, lang))
def make_wrapped_paragraph(content, max_width, color, font, font_size, line_spacing, align_left=True):
"""
Build a vertically stacked group of Text lines that together form a paragraph.
It splits content into lines that fit within max_width by measuring rendered width.
Each line is a separate Text object joined into a VGroup and arranged downward.
"""
words = content.split()
lines = []
current = ""
# Create a temporary Text to measure width; use the same font/size as final lines
temp = Text("", color=color, font=font, font_size=font_size)
for w in words:
test = w if not current else current + " " + w
test_obj = Text(test, color=color, font=font, font_size=font_size)
if test_obj.width <= max_width:
current = test
else:
# flush the current line
line = Text(current, color=color, font=font, font_size=font_size)
lines.append(line)
current = w
if current:
lines.append(Text(current, color=color, font=font, font_size=font_size))
if not lines:
return VGroup()
para = VGroup(*lines)
# Space lines vertically; arrange them as a column
para.arrange(DOWN, buff=line_spacing)
if align_left:
para = para.align_to(LEFT)
return para.strip()
def create_manim_script(problem_data, script_path,audio_path,scale=1):
"""Generate Manim script from problem data with robust wrapping for title, text, and equations."""
# Defaults
settings = problem_data.get("video_settings", {
"background_color": "#0f0f23",
"text_color": "WHITE",
"highlight_color": "YELLOW",
"font": "",
"text_size": 36,
"equation_size": 45,
"title_size": 48,
"wrap_width": 15.5 # in scene width units; adjust to taste
})
slides = problem_data.get("slides", [])
if not slides:
raise ValueError("No slides provided in input data")
slides_repr = repr(slides)
# Use a dedicated wrap width in scene units; you can adapt how max_width is computed
wrap_width = float(settings.get("wrap_width", 15.5))
manim_code = f'''
from manim import *
import textwrap
class GeneratedMathScene(Scene):
def construct(self):
# Scene settings
self.add_sound({audio_path})
self.camera.background_color = "{settings.get('background_color', '#0f0f23')}"
default_color = {settings.get('text_color', 'WHITE')}
highlight_color = {settings.get('highlight_color', 'YELLOW')}
default_font = "{settings.get('font', 'CMU Serif')}"
text_size = {settings.get('text_size', 36)}
equation_size = {settings.get('equation_size', 45)}
title_size = {settings.get('title_size', 48)}
wrap_width = {wrap_width}
# Helper to wrap text into lines that fit within max width
def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2):
lines = []
words = content.split()
current = ""
for w in words:
test = w if not current else current + " " + w
test_obj = Text(test, color=color, font=font, font_size=font_size)
if test_obj.width <= wrap_width * 0.9:
current = test
else:
line_obj = Text(current, color=color, font=font, font_size=font_size)
lines.append(line_obj)
current = w
if current:
lines.append(Text(current, color=color, font=font, font_size=font_size))
if not lines:
return VGroup()
# --- FIX: Force every line to align to LEFT like line 1 ---
first_line = lines[0]
for ln in lines:
ln.align_to(first_line, LEFT)
para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing)
return para
class GeneratedMathSceneInner(Scene):
pass
content_group = VGroup()
current_y = 3.0
line_spacing = 0.8
slides = {slides_repr}
# Build each slide
for idx, slide in enumerate(slides):
obj = None
content = slide.get("content", "")
animation = slide.get("animation", "write_left")
scalelen = slide.get("duration", 1.0)
duration=scalelen*{scale}
slide_type = slide.get("type", "text")
if slide_type == "title":
# Wrap title text
title_text = content
# Use paragraph wrapping to keep multi-line titles readable
lines = []
if title_text:
lines = []
# Reuse make_wrapped_paragraph by simulating a single paragraph
lines_group = make_wrapped_paragraph(title_text, highlight_color, default_font, title_size, line_spacing=0.2)
obj = lines_group if len(lines_group) > 0 else Text(title_text, color=highlight_color, font=default_font, font_size=title_size)
else:
obj = Text("", color=highlight_color, font=default_font, font_size=title_size)
if obj.width > wrap_width:
obj.scale_to_fit_width(wrap_width)
obj.move_to(ORIGIN)
self.play(FadeIn(obj), run_time=duration * 0.8)
self.wait(duration * 0.3)
self.play(FadeOut(obj), run_time=duration * 0.3)
continue
elif slide_type == "text":
# Use wrapping for normal text
obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25)
elif slide_type == "equation":
# Wrap long equations by splitting content into lines if needed
# Heuristic: if content is too wide, create a multi-line TeX using \\ line breaks
eq_content = content
# Optional: insert line breaks at common math breakpoints if needed
test = MathTex(eq_content, color=default_color, font_size=equation_size)
if test.width > wrap_width:
# naive wrap: insert line breaks at spaces near the middle
parts = eq_content.split(" ")
mid = len(parts)//2
line1 = " ".join(parts[:mid])
line2 = " ".join(parts[mid:])
wrapped_eq = f"{{line1}} \\\\\\\\ {{line2}}"
obj = MathTex(wrapped_eq, color=default_color, font_size=equation_size)
else:
obj = MathTex(eq_content, color=default_color, font_size=equation_size)
if obj.width > wrap_width:
obj.scale_to_fit_width(wrap_width)
if obj:
# Position and animate
obj.to_edge(LEFT, buff=0.3)
obj.shift(UP * (current_y - obj.height/2))
obj_bottom = obj.get_bottom()[1]
if obj_bottom < -3.5:
scroll_amount = abs(obj_bottom - (-3.5)) + 0.3
self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5)
current_y += scroll_amount
obj.shift(UP * scroll_amount)
obj.to_edge(LEFT, buff=0.3)
if animation == "write_left":
self.play(Write(obj), run_time=duration)
elif animation == "fade_in":
self.play(FadeIn(obj), run_time=duration)
elif animation == "highlight_left":
self.play(Write(obj), run_time=duration * 0.6)
self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4)
else:
self.play(Write(obj), run_time=duration)
content_group.add(obj)
# Decrease y for next item
current_y -= (getattr(obj, "height", 0) + line_spacing)
self.wait(0.3)
if len(content_group) > 0:
final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2)
self.play(Create(final_box), run_time=0.8)
self.wait(1.5)
'''
with open(script_path, 'w', encoding='utf-8') as f:
f.write(manim_code)
print(f"Generated script preview (first 500 chars):{manim_code[:500]}...")
@app.route("/")
def home():
return "Flask Manim Video Generator is Running"
@app.route("/generate", methods=["POST"])
def generate_video():
try:
raw_data = request.get_json()
raw_body=raw_data.get("jsondata" , '')
#print(f"Raw body length: {len(raw_body)}")
#print(f"First 200 chars: {raw_body[:200]}")
lst = raw_body.split("&&&&")
cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0])
nlist = ast.literal_eval(cleaned)
datalst=[]
total=0
scale=1
for line in range(len(nlist)):
total=total+float(nlist[line][3])
datalst.append({
"type": nlist[line][0].strip(),
"content": nlist[line][1].strip(),
"animation": nlist[line][2].strip().replace(" ",""),
"duration": nlist[line][3]
})
data={
"video_settings": {
"background_color": "#0f0f23",
"text_color": "WHITE",
"highlight_color": "YELLOW",
"font": "CMU Serif",
"text_size": 36,
"equation_size": 42,
"title_size": 48
},
"slides":datalst}
#audio generating code here
best=lst[1].split("&&&")
lines=best[0]
lang=best[1]
length, audio_path = audio_func(0, lines, lang)
if not length or not audio_path:
print("Failed to generate audio.")
scale=length/total
# Now proceed with video generation using 'data'
print(json.dumps(data, indent=2)) # For debugging
# ✅ Final validation
if "slides" not in data or not data["slides"]:
return jsonify({"error": "No slides provided in request"}), 400
print(f"✅ Parsed {len(data['slides'])} slides successfully.")
# Validate input
if "slides" not in data or not data["slides"]:
return jsonify({"error": "No slides provided in request"}), 400
print(f"Received request with {len(data['slides'])} slides")
# Create unique temporary directory
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}")
os.makedirs(temp_work_dir, exist_ok=True)
# Generate Manim script
script_path = os.path.join(temp_work_dir, "scene.py")
create_manim_script(data, script_path,audio_path,scale)
print(f"Created Manim script at {script_path}")
# Render video using subprocess
quality = 'l' # l=low, m=medium, h=high
render_command = [
"manim",
f"-q{quality}",
"--disable_caching",
"--media_dir", temp_work_dir,
script_path,
"GeneratedMathScene"
]
print(f"Running command: {' '.join(render_command)}")
result = subprocess.run(
render_command,
capture_output=True,
text=True,
cwd=temp_work_dir,
timeout=120
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
print(f"Manim rendering failed: {error_msg}")
return jsonify({
"error": "Manim rendering failed",
"details": error_msg
}), 500
print("Manim rendering completed successfully")
# Find generated video
quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'}
video_quality = quality_map.get(quality, '480p15')
video_path = os.path.join(
temp_work_dir,
"videos",
"scene",
video_quality,
"GeneratedMathScene.mp4"
)
if not os.path.exists(video_path):
print(f"Video not found at expected path: {video_path}")
return jsonify({
"error": "Video file not found after rendering",
"expected_path": video_path
}), 500
print(f"Video found at: {video_path}")
# Copy to media directory
output_filename = f"math_video_{timestamp}.mp4"
output_path = os.path.join(MEDIA_DIR, output_filename)
shutil.copy(video_path, output_path)
print(f"Video copied to: {output_path}")
# Clean up temp directory
try:
shutil.rmtree(temp_work_dir)
print("Cleaned up temp directory")
except Exception as e:
print(f"Failed to clean temp dir: {e}")
return send_file(
output_path,
mimetype='video/mp4',
as_attachment=False,
download_name=output_filename
)
except subprocess.TimeoutExpired:
print("Video rendering timeout")
return jsonify({"error": "Video rendering timeout (120s)"}), 504
except Exception as e:
print(f"Error: {str(e)}")
traceback.print_exc()
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)