|
|
from flask import Flask, request, jsonify, send_file |
|
|
from flask_cors import CORS |
|
|
import os |
|
|
import subprocess |
|
|
import tempfile |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
import traceback |
|
|
import json |
|
|
import ast |
|
|
import re |
|
|
import html |
|
|
import unicodedata |
|
|
import asyncio |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from functools import lru_cache |
|
|
import edge_tts |
|
|
from pydub import AudioSegment |
|
|
from pydub.effects import normalize |
|
|
from mutagen.mp3 import MP3 |
|
|
|
|
|
app = Flask(__name__) |
|
|
CORS(app) |
|
|
|
|
|
|
|
|
BASE_DIR = "/app" |
|
|
MEDIA_DIR = os.path.join(BASE_DIR, "media") |
|
|
TEMP_DIR = os.path.join(BASE_DIR, "temp") |
|
|
AUDIO_DIR = os.path.join(BASE_DIR, "sound") |
|
|
os.makedirs(MEDIA_DIR, exist_ok=True) |
|
|
os.makedirs(TEMP_DIR, exist_ok=True) |
|
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
API_KEY = "rkmentormindzofficaltokenkey12345" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import html |
|
|
import unicodedata |
|
|
import asyncio |
|
|
import tempfile |
|
|
import traceback |
|
|
import random |
|
|
import hashlib |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from typing import List, Tuple, Optional, Dict |
|
|
|
|
|
import edge_tts |
|
|
from pydub import AudioSegment |
|
|
from pydub.effects import normalize, compress_dynamic_range |
|
|
from mutagen.mp3 import MP3 |
|
|
|
|
|
|
|
|
VOICE_EN = "en-IN-NeerjaNeural" |
|
|
AUDIO_DIR = os.path.join(os.getcwd(), "audio") |
|
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+') |
|
|
TAG_PATTERN = re.compile(r'<[^>]*>') |
|
|
BRACKET_PATTERN = re.compile(r'[\{\}\[\]]') |
|
|
SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]') |
|
|
WHITESPACE_PATTERN = re.compile(r'\s+') |
|
|
|
|
|
def clean_text_for_tts(text: str) -> str: |
|
|
"""Cleans text while preserving ALL Tamil/Indic characters and punctuation.""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
text = str(text).strip() |
|
|
text = html.unescape(text) |
|
|
|
|
|
|
|
|
text = URL_PATTERN.sub('', text) |
|
|
|
|
|
|
|
|
text = TAG_PATTERN.sub('', text) |
|
|
|
|
|
|
|
|
text = BRACKET_PATTERN.sub('', text) |
|
|
|
|
|
|
|
|
text = SPECIAL_CHAR_PATTERN.sub('', text) |
|
|
|
|
|
|
|
|
text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ') |
|
|
|
|
|
|
|
|
text = unicodedata.normalize('NFC', text) |
|
|
|
|
|
|
|
|
text = WHITESPACE_PATTERN.sub(' ', text) |
|
|
|
|
|
|
|
|
text = text.replace('\u200b', '') |
|
|
text = text.replace('\u200c', '') |
|
|
text = text.replace('\u200d', '') |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def create_natural_chunks(text: str, max_chars: int = 300) -> List[Tuple[str, int, str]]: |
|
|
""" |
|
|
Create natural chunks that preserve language context and Tamil words. |
|
|
Returns list of (chunk_text, chunk_index, language) |
|
|
""" |
|
|
cleaned = clean_text_for_tts(text) |
|
|
if not cleaned or len(cleaned) < 5: |
|
|
|
|
|
has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in cleaned) if cleaned else False |
|
|
lang = 'ta' if has_tamil else 'en' |
|
|
return [(cleaned, 0, lang)] if cleaned else [] |
|
|
|
|
|
|
|
|
|
|
|
words = cleaned.split() |
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
current_lang = None |
|
|
chunk_index = 0 |
|
|
|
|
|
i = 0 |
|
|
while i < len(words): |
|
|
word = words[i] |
|
|
|
|
|
|
|
|
has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in word) |
|
|
word_lang = 'ta' if has_tamil else 'en' |
|
|
|
|
|
|
|
|
if has_tamil and len(word) == 1: |
|
|
|
|
|
if i + 1 < len(words): |
|
|
next_word = words[i + 1] |
|
|
|
|
|
if len(next_word) <= 3 or any('\u0B80' <= char <= '\u0BFF' for char in next_word): |
|
|
word = word + " " + next_word |
|
|
i += 1 |
|
|
word_lang = 'ta' |
|
|
|
|
|
|
|
|
test_chunk = f"{current_chunk} {word}" if current_chunk else word |
|
|
|
|
|
if len(test_chunk) <= max_chars: |
|
|
|
|
|
if current_chunk: |
|
|
current_chunk = f"{current_chunk} {word}" |
|
|
else: |
|
|
current_chunk = word |
|
|
|
|
|
|
|
|
if current_lang != word_lang: |
|
|
|
|
|
tamil_chars = sum(1 for char in current_chunk if '\u0B80' <= char <= '\u0BFF') |
|
|
english_chars = sum(1 for char in current_chunk if char.isalpha() and not ('\u0B80' <= char <= '\u0BFF')) |
|
|
current_lang = 'ta' if tamil_chars >= english_chars else 'en' |
|
|
else: |
|
|
|
|
|
if current_chunk: |
|
|
chunks.append((current_chunk, chunk_index, current_lang or word_lang)) |
|
|
chunk_index += 1 |
|
|
|
|
|
current_chunk = word |
|
|
current_lang = word_lang |
|
|
|
|
|
i += 1 |
|
|
|
|
|
|
|
|
if current_chunk: |
|
|
chunks.append((current_chunk, chunk_index, current_lang or 'en')) |
|
|
|
|
|
|
|
|
merged_chunks = [] |
|
|
i = 0 |
|
|
while i < len(chunks): |
|
|
chunk_text, chunk_idx, chunk_lang = chunks[i] |
|
|
|
|
|
|
|
|
if len(chunk_text) < 20 and i + 1 < len(chunks): |
|
|
next_text, next_idx, next_lang = chunks[i + 1] |
|
|
|
|
|
if chunk_lang == next_lang or len(next_text) < 30: |
|
|
merged_text = f"{chunk_text} {next_text}" |
|
|
merged_lang = chunk_lang if len(chunk_text) >= len(next_text) else next_lang |
|
|
merged_chunks.append((merged_text, len(merged_chunks), merged_lang)) |
|
|
i += 2 |
|
|
else: |
|
|
merged_chunks.append((chunk_text, len(merged_chunks), chunk_lang)) |
|
|
i += 1 |
|
|
else: |
|
|
merged_chunks.append((chunk_text, len(merged_chunks), chunk_lang)) |
|
|
i += 1 |
|
|
|
|
|
return merged_chunks |
|
|
|
|
|
async def generate_safe_audio(text: str, voice: str, semaphore: asyncio.Semaphore, |
|
|
chunk_index: int) -> Tuple[Optional[str], int]: |
|
|
"""Generate audio with rate limiting, caching, and retry logic.""" |
|
|
if not text or len(text) < 1: |
|
|
return None, chunk_index |
|
|
|
|
|
|
|
|
cache_key = f"{text}_{voice}" |
|
|
text_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest() |
|
|
cache_filename = os.path.join(AUDIO_DIR, f"cache_{text_hash}.mp3") |
|
|
|
|
|
|
|
|
if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 512: |
|
|
return cache_filename, chunk_index |
|
|
|
|
|
async with semaphore: |
|
|
max_retries = 3 |
|
|
base_delay = 1.5 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
temp_filename = None |
|
|
try: |
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp: |
|
|
temp_filename = tmp.name |
|
|
|
|
|
|
|
|
rate = "-10%" if "ta-IN" in voice else "0%" |
|
|
|
|
|
|
|
|
comm = edge_tts.Communicate(text, voice=voice, rate=rate) |
|
|
await comm.save(temp_filename) |
|
|
|
|
|
|
|
|
if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 512: |
|
|
|
|
|
os.replace(temp_filename, cache_filename) |
|
|
return cache_filename, chunk_index |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
if temp_filename and os.path.exists(temp_filename): |
|
|
try: |
|
|
os.unlink(temp_filename) |
|
|
except: |
|
|
pass |
|
|
|
|
|
if attempt == max_retries - 1: |
|
|
print(f"Failed to generate audio chunk {chunk_index}: {e}") |
|
|
return None, chunk_index |
|
|
|
|
|
|
|
|
sleep_time = (base_delay * (2 ** attempt)) + random.uniform(0.1, 0.5) |
|
|
await asyncio.sleep(sleep_time) |
|
|
finally: |
|
|
|
|
|
if temp_filename and os.path.exists(temp_filename) and temp_filename != cache_filename: |
|
|
try: |
|
|
os.unlink(temp_filename) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return None, chunk_index |
|
|
|
|
|
def process_audio_segment_fast(audio_data: Tuple[str, int]) -> Tuple[Optional[AudioSegment], int]: |
|
|
"""Process audio segment with minimal silence.""" |
|
|
audio_file, chunk_index = audio_data |
|
|
|
|
|
try: |
|
|
if not audio_file or not os.path.exists(audio_file): |
|
|
return None, chunk_index |
|
|
|
|
|
segment = AudioSegment.from_file(audio_file) |
|
|
|
|
|
|
|
|
if len(segment) > 0: |
|
|
|
|
|
segment = AudioSegment.silent(duration=10) + segment + AudioSegment.silent(duration=10) |
|
|
|
|
|
|
|
|
segment = normalize(segment, headroom=0.1) |
|
|
|
|
|
|
|
|
if len(segment) > 1000: |
|
|
try: |
|
|
|
|
|
segment = segment.strip_silence(silence_thresh=-40, padding=25) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return segment, chunk_index |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Error processing audio segment {chunk_index}: {e}") |
|
|
return None, chunk_index |
|
|
|
|
|
async def bilingual_tts_optimized(text: str, output_file: str = "audio0.mp3", |
|
|
VOICE_TA: Optional[str] = None, max_concurrent: int = 4) -> Optional[str]: |
|
|
"""Optimized bilingual TTS with minimal silence and preserved words.""" |
|
|
print("Starting bilingual TTS processing...") |
|
|
|
|
|
try: |
|
|
|
|
|
chunks_info = create_natural_chunks(text, max_chars=300) |
|
|
if not chunks_info: |
|
|
print("Error: No valid text chunks after processing") |
|
|
return None |
|
|
|
|
|
print(f"Processing {len(chunks_info)} text chunks...") |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
|
|
for chunk_text, chunk_index, chunk_lang in chunks_info: |
|
|
if not chunk_text or len(chunk_text.strip()) < 1: |
|
|
continue |
|
|
|
|
|
|
|
|
if VOICE_TA and chunk_lang == 'ta': |
|
|
voice = VOICE_TA |
|
|
else: |
|
|
voice = VOICE_TA or VOICE_EN |
|
|
|
|
|
tasks.append(generate_safe_audio(chunk_text, voice, semaphore, chunk_index)) |
|
|
|
|
|
if not tasks: |
|
|
print("Error: No tasks to process") |
|
|
return None |
|
|
|
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=False) |
|
|
|
|
|
|
|
|
audio_data = [] |
|
|
for result in results: |
|
|
if isinstance(result, tuple) and result[0] and os.path.exists(result[0]): |
|
|
audio_data.append(result) |
|
|
|
|
|
if not audio_data: |
|
|
print("Error: No audio was successfully generated") |
|
|
return None |
|
|
|
|
|
|
|
|
audio_data.sort(key=lambda x: x[1]) |
|
|
|
|
|
print(f"Successfully generated {len(audio_data)} audio segments") |
|
|
|
|
|
|
|
|
processed_segments = [] |
|
|
for audio_file, chunk_index in audio_data: |
|
|
segment_result = process_audio_segment_fast((audio_file, chunk_index)) |
|
|
if segment_result[0] is not None: |
|
|
processed_segments.append(segment_result) |
|
|
|
|
|
|
|
|
processed_segments.sort(key=lambda x: x[1]) |
|
|
audio_segments = [seg for seg, idx in processed_segments] |
|
|
|
|
|
if not audio_segments: |
|
|
print("Error: No audio segments were successfully processed") |
|
|
return None |
|
|
|
|
|
print(f"Merging {len(audio_segments)} audio segments...") |
|
|
|
|
|
|
|
|
merged_audio = audio_segments[0] |
|
|
|
|
|
for i in range(1, len(audio_segments)): |
|
|
|
|
|
current_end = merged_audio[-50:] if len(merged_audio) > 50 else merged_audio |
|
|
next_start = audio_segments[i][:50] if len(audio_segments[i]) > 50 else audio_segments[i] |
|
|
|
|
|
|
|
|
add_pause = 20 |
|
|
|
|
|
merged_audio = merged_audio + AudioSegment.silent(duration=add_pause) + audio_segments[i] |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
merged_audio = compress_dynamic_range( |
|
|
merged_audio, |
|
|
threshold=-25.0, |
|
|
ratio=1.8, |
|
|
attack=10.0, |
|
|
release=100.0 |
|
|
) |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
merged_audio = normalize(merged_audio, headroom=0.5) |
|
|
|
|
|
|
|
|
merged_audio.export(output_file, format="mp3", bitrate="192k") |
|
|
|
|
|
if os.path.exists(output_file) and os.path.getsize(output_file) > 1024: |
|
|
print(f"✅ Audio successfully generated: {output_file}") |
|
|
|
|
|
|
|
|
try: |
|
|
audio = MP3(output_file) |
|
|
duration = audio.info.length |
|
|
print(f"Audio duration: {duration:.2f} seconds") |
|
|
except: |
|
|
pass |
|
|
|
|
|
return output_file |
|
|
else: |
|
|
print("Error: Generated file is empty or missing") |
|
|
return None |
|
|
|
|
|
except Exception as main_error: |
|
|
print(f"Main error in bilingual TTS: {main_error}") |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
async def generate_tts_optimized(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: |
|
|
"""Optimized TTS generation function.""" |
|
|
voice_map = { |
|
|
"English": "en-US-JennyNeural", |
|
|
"Tamil": "ta-IN-PallaviNeural", |
|
|
"Hindi": "hi-IN-SwaraNeural", |
|
|
"Malayalam": "ml-IN-SobhanaNeural", |
|
|
"Kannada": "kn-IN-SapnaNeural", |
|
|
"Telugu": "te-IN-ShrutiNeural", |
|
|
"Bengali": "bn-IN-TanishaaNeural", |
|
|
"Marathi": "mr-IN-AarohiNeural", |
|
|
"Gujarati": "gu-IN-DhwaniNeural", |
|
|
"Punjabi": "pa-IN-VaaniNeural", |
|
|
"Urdu": "ur-IN-GulNeural", |
|
|
"French": "fr-FR-DeniseNeural", |
|
|
"German": "de-DE-KatjaNeural", |
|
|
"Spanish": "es-ES-ElviraNeural", |
|
|
"Italian": "it-IT-IsabellaNeural", |
|
|
"Russian": "ru-RU-SvetlanaNeural", |
|
|
"Japanese": "ja-JP-NanamiNeural", |
|
|
"Korean": "ko-KR-SunHiNeural", |
|
|
"Chinese": "zh-CN-XiaoxiaoNeural", |
|
|
"Arabic": "ar-SA-ZariyahNeural", |
|
|
"Portuguese": "pt-BR-FranciscaNeural", |
|
|
"Dutch": "nl-NL-FennaNeural", |
|
|
"Greek": "el-GR-AthinaNeural", |
|
|
"Hebrew": "he-IL-HilaNeural", |
|
|
"Turkish": "tr-TR-EmelNeural", |
|
|
"Polish": "pl-PL-AgnieszkaNeural", |
|
|
"Thai": "th-TH-AcharaNeural", |
|
|
"Vietnamese": "vi-VN-HoaiMyNeural", |
|
|
"Swedish": "sv-SE-SofieNeural", |
|
|
"Finnish": "fi-FI-NooraNeural", |
|
|
"Czech": "cs-CZ-VlastaNeural", |
|
|
"Hungarian": "hu-HU-NoemiNeural" |
|
|
} |
|
|
|
|
|
audio_name = f"audio{id}.mp3" |
|
|
audio_path = os.path.join(AUDIO_DIR, audio_name) |
|
|
|
|
|
if "&&&" in lang: |
|
|
listf = lang.split("&&&") |
|
|
text = listf[0].strip() |
|
|
lang_name = listf[1].strip() if len(listf) > 1 else "English" |
|
|
voice_to_use = voice_map.get(lang_name, VOICE_EN) |
|
|
else: |
|
|
text = lines[id] if isinstance(lines, (list, tuple)) and id < len(lines) else str(lines) |
|
|
voice_to_use = voice_map.get(lang, VOICE_EN) |
|
|
|
|
|
|
|
|
output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=3) |
|
|
|
|
|
if output and os.path.exists(audio_path): |
|
|
try: |
|
|
audio = MP3(audio_path) |
|
|
duration = audio.info.length |
|
|
return duration, audio_path |
|
|
except Exception as e: |
|
|
print(f"Error reading audio file: {e}") |
|
|
return None, None |
|
|
|
|
|
return None, None |
|
|
|
|
|
def audio_func(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]: |
|
|
"""Synchronous wrapper for audio generation.""" |
|
|
try: |
|
|
loop = asyncio.new_event_loop() |
|
|
asyncio.set_event_loop(loop) |
|
|
try: |
|
|
return loop.run_until_complete(generate_tts_optimized(id, lines, lang)) |
|
|
finally: |
|
|
loop.close() |
|
|
except Exception as e: |
|
|
print(f"Error in audio_func: {e}") |
|
|
traceback.print_exc() |
|
|
return None, None |
|
|
def create_manim_script(problem_data, script_path, audio_path, audio_length): |
|
|
"""Generate Manim script with proper wrapping and audio-video sync.""" |
|
|
|
|
|
settings = problem_data.get("video_settings", { |
|
|
"background_color": "#0f0f23", |
|
|
"text_color": "WHITE", |
|
|
"highlight_color": "YELLOW", |
|
|
"font": "CMU Serif", |
|
|
"text_size": 36, |
|
|
"equation_size": 45, |
|
|
"title_size": 48, |
|
|
"wrap_width": 15.5 |
|
|
}) |
|
|
|
|
|
slides = problem_data.get("slides", []) |
|
|
if not slides: |
|
|
raise ValueError("No slides provided in input data") |
|
|
|
|
|
|
|
|
num_slides = len(slides) |
|
|
num_titles = sum(1 for s in slides if s.get("type") == "title") |
|
|
|
|
|
overhead_time = (num_slides - num_titles) * 0.3 |
|
|
overhead_time += num_titles * 0.4 |
|
|
overhead_time += 2.3 |
|
|
overhead_time += (num_slides / 3) * 0.5 |
|
|
|
|
|
|
|
|
equation_duration = 0.0 |
|
|
text_title_duration = 0.0 |
|
|
|
|
|
for slide in slides: |
|
|
slide_duration = float(slide.get("duration", 1.0)) |
|
|
if slide.get("type") == "equation": |
|
|
equation_duration += slide_duration |
|
|
else: |
|
|
text_title_duration += slide_duration |
|
|
|
|
|
|
|
|
available_time = audio_length - text_title_duration - overhead_time |
|
|
|
|
|
if equation_duration > 0 and available_time > 0: |
|
|
equation_scale = available_time / equation_duration |
|
|
equation_scale = max(0.5, min(2.5, equation_scale)) |
|
|
else: |
|
|
equation_scale = 1.0 |
|
|
|
|
|
slides_repr = repr(slides) |
|
|
audio_path_repr = repr(audio_path) |
|
|
|
|
|
wrap_width = float(settings.get("wrap_width", 15.5)) |
|
|
background_color = settings.get("background_color", "#0f0f23") |
|
|
text_color = settings.get("text_color", "WHITE") |
|
|
highlight_color = settings.get("highlight_color", "YELLOW") |
|
|
font = settings.get("font", "CMU Serif") |
|
|
text_size = settings.get("text_size", 36) |
|
|
equation_size = settings.get("equation_size", 50) |
|
|
title_size = settings.get("title_size", 48) |
|
|
|
|
|
manim_code = f"""from manim import * |
|
|
class GeneratedMathScene(Scene): |
|
|
def construct(self): |
|
|
# Scene settings |
|
|
self.add_sound({audio_path_repr}) |
|
|
self.camera.background_color = "{background_color}" |
|
|
default_color = {text_color} |
|
|
highlight_color = {highlight_color} |
|
|
default_font = "{font}" |
|
|
text_size = {text_size} |
|
|
equation_size = {equation_size} |
|
|
title_size = {title_size} |
|
|
wrap_width = {wrap_width} |
|
|
equation_scale = {equation_scale} |
|
|
|
|
|
# FIX #1: Improved wrapping function - check width BEFORE arranging |
|
|
def make_inline_segments(content, color, font, text_size, equation_size): |
|
|
if not content: |
|
|
return VGroup() |
|
|
|
|
|
segments = content.split("#") |
|
|
all_lines = [] |
|
|
current_line = [] |
|
|
current_width = 0.0 |
|
|
|
|
|
for segment in segments: |
|
|
segment = segment.strip() |
|
|
if not segment: |
|
|
continue |
|
|
|
|
|
# Create mobject |
|
|
if segment.startswith("%"): |
|
|
latex_content = segment[1:] |
|
|
mob = MathTex(latex_content, color=color, font_size=equation_size) |
|
|
else: |
|
|
mob = Text(segment, color=color, font=font, font_size=text_size) |
|
|
|
|
|
# FIX #1: Check width BEFORE adding to line |
|
|
mob_width = mob.width |
|
|
potential_width = current_width + mob_width + (0.05 * len(current_line)) |
|
|
|
|
|
if potential_width > wrap_width and len(current_line) > 0: |
|
|
# Line is full, save it and start new line |
|
|
line_group = VGroup(*current_line).arrange(RIGHT, buff=0.05) |
|
|
all_lines.append(line_group) |
|
|
current_line = [mob] |
|
|
current_width = mob_width |
|
|
else: |
|
|
# Add to current line |
|
|
current_line.append(mob) |
|
|
current_width = potential_width |
|
|
|
|
|
# Safety: If single item exceeds width, scale it down |
|
|
if len(current_line) == 1 and mob.width > wrap_width: |
|
|
mob.scale_to_fit_width(wrap_width * 0.95) |
|
|
current_width = mob.width |
|
|
|
|
|
# Add final line |
|
|
if current_line: |
|
|
line_group = VGroup(*current_line).arrange(RIGHT, buff=0.05) |
|
|
all_lines.append(line_group) |
|
|
|
|
|
if not all_lines: |
|
|
return VGroup() |
|
|
|
|
|
final_group = VGroup(*all_lines).arrange(DOWN, aligned_edge=LEFT, buff=0.2) |
|
|
return final_group |
|
|
|
|
|
def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2): |
|
|
lines = [] |
|
|
words = content.split() |
|
|
current = "" |
|
|
|
|
|
for w in words: |
|
|
test = w if not current else current + " " + w |
|
|
test_obj = Text(test, color=color, font=font, font_size=font_size) |
|
|
|
|
|
if test_obj.width <= wrap_width * 0.95: |
|
|
current = test |
|
|
else: |
|
|
if current: |
|
|
line_obj = Text(current, color=color, font=font, font_size=font_size) |
|
|
lines.append(line_obj) |
|
|
current = w |
|
|
|
|
|
if current: |
|
|
lines.append(Text(current, color=color, font=font, font_size=font_size)) |
|
|
|
|
|
if not lines: |
|
|
return VGroup() |
|
|
|
|
|
first_line = lines[0] |
|
|
for ln in lines: |
|
|
ln.align_to(first_line, LEFT) |
|
|
|
|
|
para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing) |
|
|
return para |
|
|
|
|
|
content_group = VGroup() |
|
|
current_y = 3.0 |
|
|
line_spacing = 0.8 |
|
|
slides = {slides_repr} |
|
|
|
|
|
for idx, slide in enumerate(slides): |
|
|
obj = None |
|
|
content = slide.get("content", "") |
|
|
animation = slide.get("animation", "write_left") |
|
|
base_duration = slide.get("duration", 1.0) |
|
|
slide_type = slide.get("type", "text") |
|
|
|
|
|
# Apply scale ONLY to equations |
|
|
if slide_type == "equation": |
|
|
duration = base_duration * equation_scale |
|
|
else: |
|
|
duration = base_duration |
|
|
|
|
|
if slide_type == "title": |
|
|
obj = make_inline_segments(content, highlight_color, default_font, title_size, equation_size) |
|
|
if len(obj) == 0: |
|
|
obj = Text(content, color=highlight_color, font=default_font, font_size=title_size) |
|
|
|
|
|
# FIX #1: Ensure title fits within screen |
|
|
if obj.width > wrap_width: |
|
|
obj.scale_to_fit_width(wrap_width * 0.95) |
|
|
|
|
|
obj.move_to(ORIGIN) |
|
|
self.play(FadeIn(obj), run_time=duration * 0.8) |
|
|
self.wait(duration * 0.3) |
|
|
self.play(FadeOut(obj), run_time=duration * 0.3) |
|
|
continue |
|
|
|
|
|
elif slide_type == "text": |
|
|
obj = make_inline_segments(content, default_color, default_font, text_size, equation_size) |
|
|
if len(obj) == 0: |
|
|
obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25) |
|
|
|
|
|
# FIX #1: Safety check for text overflow |
|
|
if obj.width > wrap_width: |
|
|
obj.scale_to_fit_width(wrap_width * 0.95) |
|
|
|
|
|
elif slide_type == "equation": |
|
|
eq_content = content |
|
|
obj = MathTex(eq_content, color=default_color, font_size=equation_size) |
|
|
|
|
|
# FIX #1: Scale equation instead of splitting by spaces |
|
|
if obj.width > wrap_width: |
|
|
obj.scale_to_fit_width(wrap_width * 0.95) |
|
|
|
|
|
if obj: |
|
|
obj.to_edge(LEFT, buff=0.3) |
|
|
obj.shift(UP * (current_y - obj.height / 2)) |
|
|
|
|
|
obj_bottom = obj.get_bottom()[1] |
|
|
if obj_bottom < -3.5: |
|
|
scroll_amount = abs(obj_bottom - (-3.5)) + 0.3 |
|
|
self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5) |
|
|
current_y += scroll_amount |
|
|
obj.shift(UP * scroll_amount) |
|
|
obj.to_edge(LEFT, buff=0.3) |
|
|
|
|
|
if animation == "write_left": |
|
|
self.play(Write(obj), run_time=duration) |
|
|
elif animation == "fade_in": |
|
|
self.play(FadeIn(obj), run_time=duration) |
|
|
elif animation == "highlight_left": |
|
|
self.play(Write(obj), run_time=duration * 0.6) |
|
|
self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4) |
|
|
else: |
|
|
self.play(Write(obj), run_time=duration) |
|
|
|
|
|
content_group.add(obj) |
|
|
current_y -= (getattr(obj, "height", 0) + line_spacing) |
|
|
self.wait(0.3) |
|
|
|
|
|
if len(content_group) > 0: |
|
|
final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2) |
|
|
self.play(Create(final_box), run_time=0.8) |
|
|
self.wait(1.5) |
|
|
""" |
|
|
|
|
|
try: |
|
|
with open(script_path, 'w', encoding='utf-8') as f: |
|
|
f.write(manim_code) |
|
|
print(f"Generated script at {script_path}") |
|
|
print(f"Audio length: {audio_length:.2f}s") |
|
|
print(f"Overhead time: {overhead_time:.2f}s") |
|
|
print(f"Equation scale factor: {equation_scale:.2f}x") |
|
|
print(f"Text/Title duration: {text_title_duration:.2f}s (unchanged)") |
|
|
print(f"Equation duration: {equation_duration:.2f}s -> {equation_duration * equation_scale:.2f}s") |
|
|
print(f"Expected total: {text_title_duration + (equation_duration * equation_scale) + overhead_time:.2f}s") |
|
|
except Exception as e: |
|
|
print(f"Error writing script: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
@app.route("/generate", methods=["POST"]) |
|
|
def generate_video(): |
|
|
temp_work_dir = None |
|
|
try: |
|
|
raw_data = request.get_json() |
|
|
if not raw_data: |
|
|
return jsonify({"error": "No JSON data provided"}), 400 |
|
|
|
|
|
raw_body = raw_data.get("jsondata", '') |
|
|
if not raw_body: |
|
|
return jsonify({"error": "No jsondata field in request"}), 400 |
|
|
|
|
|
lst = raw_body.split("&&&&") |
|
|
if len(lst) < 2: |
|
|
return jsonify({"error": "Invalid data format, missing &&&&separator"}), 400 |
|
|
|
|
|
cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0]) |
|
|
|
|
|
try: |
|
|
nlist = ast.literal_eval(cleaned) |
|
|
except Exception as e: |
|
|
return jsonify({"error": f"Failed to parse slide data: {str(e)}"}), 400 |
|
|
|
|
|
datalst = [] |
|
|
|
|
|
for line in range(len(nlist)): |
|
|
try: |
|
|
datalst.append({ |
|
|
"type": nlist[line][0].strip(), |
|
|
"content": nlist[line][1].strip(), |
|
|
"animation": nlist[line][2].strip().replace(" ", ""), |
|
|
"duration": float(nlist[line][3]) |
|
|
}) |
|
|
except (IndexError, ValueError) as e: |
|
|
return jsonify({"error": f"Invalid slide data at index {line}: {str(e)}"}), 400 |
|
|
|
|
|
data = { |
|
|
"video_settings": { |
|
|
"background_color": "#0f0f23", |
|
|
"text_color": "WHITE", |
|
|
"highlight_color": "YELLOW", |
|
|
"font": "CMU Serif", |
|
|
"text_size": 36, |
|
|
"equation_size": 42, |
|
|
"title_size": 48 |
|
|
}, |
|
|
"slides": datalst |
|
|
} |
|
|
|
|
|
best = lst[1].split("&&&") |
|
|
lines = best[0] |
|
|
try: |
|
|
lang = best[1] if len(best) > 1 else "English" |
|
|
except: |
|
|
lang = "English" |
|
|
|
|
|
audio_length, audio_path = audio_func(0, lines, lang) |
|
|
|
|
|
if not audio_length or not audio_path or not os.path.exists(audio_path): |
|
|
return jsonify({"error": "Failed to generate audio"}), 500 |
|
|
|
|
|
if "slides" not in data or not data["slides"]: |
|
|
return jsonify({"error": "No slides provided in request"}), 400 |
|
|
|
|
|
print(f"Received request with {len(data['slides'])} slides") |
|
|
print(f"Audio length: {audio_length}s") |
|
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}") |
|
|
os.makedirs(temp_work_dir, exist_ok=True) |
|
|
|
|
|
script_path = os.path.join(temp_work_dir, "scene.py") |
|
|
|
|
|
|
|
|
create_manim_script(data, script_path, audio_path, audio_length) |
|
|
print(f"Created Manim script at {script_path}") |
|
|
|
|
|
quality = 'l' |
|
|
render_command = [ |
|
|
"manim", |
|
|
f"-q{quality}", |
|
|
"--disable_caching", |
|
|
"--media_dir", temp_work_dir, |
|
|
script_path, |
|
|
"GeneratedMathScene" |
|
|
] |
|
|
|
|
|
print(f"Running command: {' '.join(render_command)}") |
|
|
|
|
|
result = subprocess.run( |
|
|
render_command, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
cwd=temp_work_dir, |
|
|
timeout=120 |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
error_msg = result.stderr or result.stdout |
|
|
print(f"Manim rendering failed: {error_msg}") |
|
|
return jsonify({ |
|
|
"error": "Manim rendering failed", |
|
|
"details": error_msg |
|
|
}), 500 |
|
|
|
|
|
print("Manim rendering completed successfully") |
|
|
|
|
|
quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'} |
|
|
video_quality = quality_map.get(quality, '480p15') |
|
|
|
|
|
video_path = os.path.join( |
|
|
temp_work_dir, |
|
|
"videos", |
|
|
"scene", |
|
|
video_quality, |
|
|
"GeneratedMathScene.mp4" |
|
|
) |
|
|
|
|
|
if not os.path.exists(video_path): |
|
|
print(f"Video not found at expected path: {video_path}") |
|
|
return jsonify({ |
|
|
"error": "Video file not found after rendering", |
|
|
"expected_path": video_path |
|
|
}), 500 |
|
|
|
|
|
print(f"Video found at: {video_path}") |
|
|
|
|
|
output_filename = f"math_video_{timestamp}.mp4" |
|
|
output_path = os.path.join(MEDIA_DIR, output_filename) |
|
|
shutil.copy(video_path, output_path) |
|
|
print(f"Video copied to: {output_path}") |
|
|
|
|
|
try: |
|
|
if temp_work_dir and os.path.exists(temp_work_dir): |
|
|
shutil.rmtree(temp_work_dir) |
|
|
print("Cleaned up temp directory") |
|
|
except Exception as e: |
|
|
print(f"Failed to clean temp dir: {e}") |
|
|
|
|
|
return send_file( |
|
|
output_path, |
|
|
mimetype='video/mp4', |
|
|
as_attachment=False, |
|
|
download_name=output_filename |
|
|
) |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
print("Video rendering timeout") |
|
|
if temp_work_dir and os.path.exists(temp_work_dir): |
|
|
try: |
|
|
shutil.rmtree(temp_work_dir) |
|
|
except: |
|
|
pass |
|
|
return jsonify({"error": "Video rendering timeout (120s)"}), 504 |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {str(e)}") |
|
|
traceback.print_exc() |
|
|
if temp_work_dir and os.path.exists(temp_work_dir): |
|
|
try: |
|
|
shutil.rmtree(temp_work_dir) |
|
|
except: |
|
|
pass |
|
|
return jsonify({ |
|
|
"error": str(e), |
|
|
"traceback": traceback.format_exc() |
|
|
}), 500 |
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
port = int(os.environ.get('PORT', 7860)) |
|
|
app.run(host='0.0.0.0', port=port, debug=False) |