sreepathi-ravikumar's picture
Update app.py
8bf23c7 verified
raw
history blame
31.2 kB
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import os
import subprocess
import tempfile
import shutil
from datetime import datetime
import traceback
import json
import ast
import re
import html
import unicodedata
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import edge_tts
from pydub import AudioSegment
from pydub.effects import normalize
from mutagen.mp3 import MP3
app = Flask(__name__)
CORS(app)
# Configuration
BASE_DIR = "/app"
MEDIA_DIR = os.path.join(BASE_DIR, "media")
TEMP_DIR = os.path.join(BASE_DIR, "temp")
AUDIO_DIR = os.path.join(BASE_DIR, "sound")
os.makedirs(MEDIA_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)
# API Key for security (optional)
API_KEY = "rkmentormindzofficaltokenkey12345"
import os
import re
import html
import unicodedata
import asyncio
import tempfile
import traceback
import random
import hashlib
import json
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from typing import List, Tuple, Optional, Dict
import edge_tts
from pydub import AudioSegment
from pydub.effects import normalize
from mutagen.mp3 import MP3
# Voice configuration
VOICE_EN = "en-IN-NeerjaNeural"
AUDIO_DIR = os.path.join(os.getcwd(), "audio")
os.makedirs(AUDIO_DIR, exist_ok=True)
# Pre-compiled regex patterns
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+')
TAG_PATTERN = re.compile(r'<[^>]*>')
BRACKET_PATTERN = re.compile(r'[\{\}\[\]]')
SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]')
WHITESPACE_PATTERN = re.compile(r'\s+')
# Conservative sentence splitting that doesn't break on abbreviations
SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+(?=[A-Z])')
# Avoid splitting on commas inside numbers
SUB_PATTERN = re.compile(r'(?<!\d),(?!\d)\s*')
# Cache for chunking results
_chunking_cache: Dict[str, Tuple[str, ...]] = {}
def clean_text_for_tts(text: str) -> str:
"""Cleans text while preserving Tamil/Indic characters and code-switched punctuation."""
if not text:
return ""
text = str(text).strip()
text = html.unescape(text)
# Remove URLs
text = URL_PATTERN.sub('', text)
# Remove HTML/XML tags but preserve content
text = TAG_PATTERN.sub('', text)
# Remove brackets
text = BRACKET_PATTERN.sub('', text)
# Remove special characters but preserve punctuation needed for TTS
text = SPECIAL_CHAR_PATTERN.sub('', text)
# Replace newlines/tabs with spaces
text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ')
# Use NFC normalization to preserve Tamil/Indic characters
text = unicodedata.normalize('NFC', text)
# Collapse multiple whitespace
text = WHITESPACE_PATTERN.sub(' ', text)
return text.strip()
def split_by_word_boundary(text: str) -> List[str]:
"""
Intelligently splits text by language boundaries while preserving code-switched words.
Example: "Voltage னு" → ["Voltage", " னு"]
"""
if not text:
return []
segments = []
current_segment = ""
current_lang = None # 'en', 'ta', or None
i = 0
while i < len(text):
char = text[i]
# Detect language of current character
if '\u0B80' <= char <= '\u0BFF': # Tamil range
char_lang = 'ta'
elif char.isalpha() or char in '-':
char_lang = 'en'
else:
char_lang = current_lang # Punctuation/space keeps current language
# Start new segment on language boundary
if current_lang and char_lang and current_lang != char_lang:
# Don't split on hyphens in code-switched words like "simple-ஆ"
if char == '-' and i > 0 and i < len(text) - 1:
# Check if it's a code-switched hyphen (English-Tamil)
prev_char = text[i-1]
next_char = text[i+1]
if prev_char.isalpha() and ('\u0B80' <= next_char <= '\u0BFF'):
# Keep hyphen with current segment
current_segment += char
i += 1
continue
if current_segment.strip():
segments.append(current_segment)
current_segment = char
current_lang = char_lang
else:
current_segment += char
current_lang = char_lang or current_lang
i += 1
if current_segment.strip():
segments.append(current_segment)
return segments
def chunk_text_with_overlap(text: str, max_chars: int = 250) -> List[Tuple[str, int]]:
"""
Creates chunks with overlap for smooth transitions.
Returns list of (chunk_text, chunk_index)
"""
# Clean first
cleaned = clean_text_for_tts(text)
if not cleaned:
return []
# Split into segments by language boundary
segments = split_by_word_boundary(cleaned)
# Group segments into chunks
chunks = []
current_chunk = ""
current_words = []
for segment in segments:
test_chunk = current_chunk + segment if current_chunk else segment
test_words = test_chunk.split()
if len(test_chunk) <= max_chars and len(test_words) <= 20:
current_chunk = test_chunk
current_words = test_words
else:
# Need to start new chunk
if current_chunk:
chunks.append(current_chunk)
# Handle long segments
if len(segment) > max_chars:
# Split long segment by words
words = segment.split()
temp_chunk = ""
temp_words = []
for word in words:
test = temp_chunk + " " + word if temp_chunk else word
if len(test) <= max_chars:
temp_chunk = test
temp_words.append(word)
else:
if temp_chunk:
chunks.append(temp_chunk)
temp_chunk = word
temp_words = [word]
if temp_chunk:
current_chunk = temp_chunk
current_words = temp_words
else:
current_chunk = segment
current_words = segment.split()
# Add final chunk
if current_chunk:
chunks.append(current_chunk)
# Add overlap between chunks (last 3 words of chunk N become first 3 words of chunk N+1)
overlapped_chunks = []
for i, chunk in enumerate(chunks):
if i > 0:
# Get last 3 words from previous chunk
prev_chunk = chunks[i-1]
prev_words = prev_chunk.split()
overlap_words = prev_words[-3:] if len(prev_words) >= 3 else prev_words
if overlap_words:
overlap_text = " ".join(overlap_words)
# Add overlap if it won't make the chunk too long
test_chunk = overlap_text + " " + chunk
if len(test_chunk) <= max_chars:
chunk = test_chunk
overlapped_chunks.append((chunk, i))
return overlapped_chunks
async def generate_safe_audio(text: str, voice: str, semaphore: asyncio.Semaphore,
chunk_index: int) -> Tuple[Optional[str], int]:
"""Generate audio with rate limiting, caching, and retry logic."""
if not text or len(text) < 2:
return None, chunk_index
# Create deterministic cache key
cache_key = f"{text}_{voice}"
text_hash = hashlib.md5(cache_key.encode('utf-8')).hexdigest()
cache_filename = os.path.join(AUDIO_DIR, f"cache_{text_hash}.mp3")
# Check disk cache
if os.path.exists(cache_filename) and os.path.getsize(cache_filename) > 1024:
return cache_filename, chunk_index
async with semaphore:
max_retries = 3
base_delay = 2.0
for attempt in range(max_retries):
try:
# Create temp file
with tempfile.NamedTemporaryFile(suffix='.mp3', delete=False) as tmp:
temp_filename = tmp.name
comm = edge_tts.Communicate(text, voice=voice)
await comm.save(temp_filename)
# Verify successful generation
if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1024:
# Move to cache location
os.replace(temp_filename, cache_filename)
return cache_filename, chunk_index
except Exception as e:
# Clean up temp file on error
try:
if os.path.exists(temp_filename):
os.unlink(temp_filename)
except:
pass
if attempt == max_retries - 1:
print(f"Failed to generate audio chunk {chunk_index} after {max_retries} attempts: {e}")
return None, chunk_index
# Exponential backoff with jitter
sleep_time = (base_delay * (2 ** attempt)) + random.uniform(0.1, 1.0)
await asyncio.sleep(sleep_time)
return None, chunk_index
def process_audio_segment_fast(audio_data: Tuple[str, int]) -> Tuple[Optional[AudioSegment], int]:
"""Process audio segment with proper cleanup."""
audio_file, chunk_index = audio_data
try:
if not audio_file or not os.path.exists(audio_file):
return None, chunk_index
segment = AudioSegment.from_file(audio_file)
# Add micro-padding to prevent clipping
if len(segment) > 0:
segment = AudioSegment.silent(duration=50) + segment + AudioSegment.silent(duration=50)
segment = normalize(segment)
return segment, chunk_index
except Exception as e:
print(f"Warning: Error processing audio segment {chunk_index}: {e}")
return None, chunk_index
async def bilingual_tts_optimized(text: str, output_file: str = "audio0.mp3",
VOICE_TA: Optional[str] = None, max_concurrent: int = 5) -> Optional[str]:
"""Optimized bilingual TTS with proper ordering and smooth transitions."""
print("Starting bilingual TTS processing...")
try:
# Split text into chunks with overlap
chunks_with_indices = chunk_text_with_overlap(text, max_chars=250)
if not chunks_with_indices:
print("Error: No valid text chunks after processing")
return None
print(f"Processing {len(chunks_with_indices)} text chunks...")
# Determine which chunks need Tamil voice
chunks_to_generate = []
for chunk_text, chunk_index in chunks_with_indices:
has_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk_text)
if VOICE_TA and has_tamil:
voice = VOICE_TA
else:
voice = VOICE_TA or VOICE_EN
chunks_to_generate.append((chunk_text, voice, chunk_index))
# Semaphore for rate limiting
semaphore = asyncio.Semaphore(max_concurrent)
# Prepare tasks
tasks = []
for chunk_text, voice, chunk_index in chunks_to_generate:
tasks.append(generate_safe_audio(chunk_text, voice, semaphore, chunk_index))
# Generate all audio files
results = await asyncio.gather(*tasks, return_exceptions=False)
# Filter successful results and maintain order
audio_data = []
for result in results:
if isinstance(result, tuple) and result[0] and os.path.exists(result[0]):
audio_data.append(result)
if not audio_data:
print("Error: No audio was successfully generated")
return None
# Sort by chunk index
audio_data.sort(key=lambda x: x[1])
print(f"Successfully generated {len(audio_data)} audio segments")
# Process audio segments in parallel
with ThreadPoolExecutor(max_workers=min(len(audio_data), 8)) as executor:
processed = list(executor.map(process_audio_segment_fast, audio_data))
# Filter and sort
processed = [(seg, idx) for seg, idx in processed if seg is not None]
processed.sort(key=lambda x: x[1])
audio_segments = [seg for seg, idx in processed]
if not audio_segments:
print("Error: No audio segments were successfully processed")
return None
print(f"Merging {len(audio_segments)} audio segments with crossfade...")
# Merge with crossfade for smooth transitions
merged_audio = audio_segments[0]
for segment in audio_segments[1:]:
# Crossfade 30ms for smooth transition
merged_audio = merged_audio.append(segment, crossfade=30)
# Apply compression for consistent volume
try:
merged_audio = merged_audio.compress_dynamic_range(
threshold=-20.0,
ratio=2.5, # Gentler compression for more natural sound
attack=5.0,
release=50.0
)
except:
pass # Skip if compression fails
merged_audio = normalize(merged_audio)
# Export
merged_audio.export(output_file, format="mp3", bitrate="192k")
if os.path.exists(output_file) and os.path.getsize(output_file) > 1024:
print(f"✅ Audio successfully generated: {output_file}")
return output_file
else:
print(f"Error: Generated file is empty or missing")
return None
except Exception as main_error:
print(f"Main error in bilingual TTS: {main_error}")
traceback.print_exc()
return None
async def generate_tts_optimized(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]:
"""Optimized TTS generation function."""
voice_map = {
"English": "en-US-JennyNeural",
"Tamil": "ta-IN-PallaviNeural",
"Hindi": "hi-IN-SwaraNeural",
"Malayalam": "ml-IN-SobhanaNeural",
"Kannada": "kn-IN-SapnaNeural",
"Telugu": "te-IN-ShrutiNeural",
"Bengali": "bn-IN-TanishaaNeural",
"Marathi": "mr-IN-AarohiNeural",
"Gujarati": "gu-IN-DhwaniNeural",
"Punjabi": "pa-IN-VaaniNeural",
"Urdu": "ur-IN-GulNeural",
"French": "fr-FR-DeniseNeural",
"German": "de-DE-KatjaNeural",
"Spanish": "es-ES-ElviraNeural",
"Italian": "it-IT-IsabellaNeural",
"Russian": "ru-RU-SvetlanaNeural",
"Japanese": "ja-JP-NanamiNeural",
"Korean": "ko-KR-SunHiNeural",
"Chinese": "zh-CN-XiaoxiaoNeural",
"Arabic": "ar-SA-ZariyahNeural",
"Portuguese": "pt-BR-FranciscaNeural",
"Dutch": "nl-NL-FennaNeural",
"Greek": "el-GR-AthinaNeural",
"Hebrew": "he-IL-HilaNeural",
"Turkish": "tr-TR-EmelNeural",
"Polish": "pl-PL-AgnieszkaNeural",
"Thai": "th-TH-AcharaNeural",
"Vietnamese": "vi-VN-HoaiMyNeural",
"Swedish": "sv-SE-SofieNeural",
"Finnish": "fi-FI-NooraNeural",
"Czech": "cs-CZ-VlastaNeural",
"Hungarian": "hu-HU-NoemiNeural"
}
audio_name = f"audio{id}.mp3"
audio_path = os.path.join(AUDIO_DIR, audio_name)
if "&&&" in lang:
listf = lang.split("&&&")
text = listf[0].strip()
lang_name = listf[1].strip() if len(listf) > 1 else "English"
voice_to_use = voice_map.get(lang_name, VOICE_EN)
else:
text = lines[id] if isinstance(lines, (list, tuple)) and id < len(lines) else str(lines)
voice_to_use = voice_map.get(lang, VOICE_EN)
# Use max_concurrent=5 for better rate limit handling
output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=5)
if output and os.path.exists(audio_path):
try:
audio = MP3(audio_path)
duration = audio.info.length
return duration, audio_path
except Exception as e:
print(f"Error reading audio file: {e}")
return None, None
return None, None
def audio_func(id: int, lines, lang: str) -> Tuple[Optional[float], Optional[str]]:
"""Synchronous wrapper for audio generation."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(generate_tts_optimized(id, lines, lang))
finally:
loop.close()
except Exception as e:
print(f"Error in audio_func: {e}")
traceback.print_exc()
return None, None
def create_manim_script(problem_data, script_path, audio_path, scale=1):
"""Generate Manim script from problem data with robust wrapping."""
settings = problem_data.get("video_settings", {
"background_color": "#0f0f23",
"text_color": "WHITE",
"highlight_color": "YELLOW",
"font": "CMU Serif",
"text_size": 36,
"equation_size": 45,
"title_size": 48,
"wrap_width": 15.5
})
slides = problem_data.get("slides", [])
if not slides:
raise ValueError("No slides provided in input data")
slides_repr = repr(slides)
audio_path_repr = repr(audio_path)
wrap_width = float(settings.get("wrap_width", 15.5))
background_color = settings.get("background_color", "#0f0f23")
text_color = settings.get("text_color", "WHITE")
highlight_color = settings.get("highlight_color", "YELLOW")
font = settings.get("font", "CMU Serif")
text_size = settings.get("text_size", 36)
equation_size = settings.get("equation_size", 50)
title_size = settings.get("title_size", 48)
manim_code = f"""from manim import *
class GeneratedMathScene(Scene):
def construct(self):
# Scene settings
self.add_sound({audio_path_repr})
self.camera.background_color = "{background_color}"
default_color = {text_color}
highlight_color = {highlight_color}
default_font = "{font}"
text_size = {text_size}
equation_size = {equation_size}
title_size = {title_size}
wrap_width = {wrap_width}
def make_inline_segments(content, color, font, text_size, equation_size):
if not content:
return VGroup()
# Split by # separator
segments = content.split("#")
mobjects = []
for segment in segments:
segment = segment.strip()
if not segment:
continue
# Check if it's a LaTeX equation (starts with %$ and ends with $)
if segment.startswith("%"):
# Remove %$ from start and $ from end
latex_content = segment[1:]
mob = MathTex(latex_content, color=color, font_size=equation_size)
else:
# Regular text
mob = Text(segment, color=color, font=font, font_size=text_size)
mobjects.append(mob)
if not mobjects:
return VGroup()
# Arrange horizontally with minimal spacing
inline_group = VGroup(*mobjects).arrange(RIGHT, buff=0.1)
return inline_group
def make_wrapped_paragraph(content, color, font, font_size, line_spacing=0.2):
lines = []
words = content.split()
current = ""
for w in words:
test = w if not current else current + " " + w
test_obj = Text(test, color=color, font=font, font_size=font_size)
if test_obj.width <= wrap_width * 0.9:
current = test
else:
if current:
line_obj = Text(current, color=color, font=font, font_size=font_size)
lines.append(line_obj)
current = w
if current:
lines.append(Text(current, color=color, font=font, font_size=font_size))
if not lines:
return VGroup()
first_line = lines[0]
for ln in lines:
ln.align_to(first_line, LEFT)
para = VGroup(*lines).arrange(DOWN, aligned_edge=LEFT, buff=line_spacing)
return para
content_group = VGroup()
current_y = 3.0
line_spacing = 0.8
slides = {slides_repr}
for idx, slide in enumerate(slides):
obj = None
content = slide.get("content", "")
animation = slide.get("animation", "write_left")
scalelen = slide.get("duration", 1.0)
duration = scalelen * {scale}
slide_type = slide.get("type", "text")
if slide_type == "title":
# Use inline segments for title
obj = make_inline_segments(content, highlight_color, default_font, title_size, equation_size)
# Fallback to simple text if no inline segments
if len(obj) == 0:
obj = Text(content, color=highlight_color, font=default_font, font_size=title_size)
if obj.width > wrap_width:
obj.scale_to_fit_width(wrap_width)
obj.move_to(ORIGIN)
self.play(FadeIn(obj), run_time=duration * 0.8)
self.wait(duration * 0.3)
self.play(FadeOut(obj), run_time=duration * 0.3)
continue
elif slide_type == "text":
# Use inline segments for text
obj = make_inline_segments(content, default_color, default_font, text_size, equation_size)
# Fallback if no inline segments detected
if len(obj) == 0:
obj = make_wrapped_paragraph(content, default_color, default_font, text_size, line_spacing=0.25)
# Handle width overflow
if obj.width > wrap_width:
obj.scale_to_fit_width(wrap_width)
elif slide_type == "equation":
eq_content = content
test = MathTex(eq_content, color=default_color, font_size=equation_size)
if test.width > wrap_width:
parts = eq_content.split(" ")
mid = len(parts) // 2
line1 = " ".join(parts[:mid])
line2 = " ".join(parts[mid:])
wrapped_eq = f"{{line1}} \\\\ {{line2}}"
obj = MathTex(wrapped_eq, color=default_color, font_size=equation_size)
else:
obj = MathTex(eq_content, color=default_color, font_size=equation_size)
if obj.width > wrap_width:
obj.scale_to_fit_width(wrap_width)
if obj:
obj.to_edge(LEFT, buff=0.3)
obj.shift(UP * (current_y - obj.height / 2))
obj_bottom = obj.get_bottom()[1]
if obj_bottom < -3.5:
scroll_amount = abs(obj_bottom - (-3.5)) + 0.3
self.play(content_group.animate.shift(UP * scroll_amount), run_time=0.5)
current_y += scroll_amount
obj.shift(UP * scroll_amount)
obj.to_edge(LEFT, buff=0.3)
if animation == "write_left":
self.play(Write(obj), run_time=duration)
elif animation == "fade_in":
self.play(FadeIn(obj), run_time=duration)
elif animation == "highlight_left":
self.play(Write(obj), run_time=duration * 0.6)
self.play(obj.animate.set_color(highlight_color), run_time=duration * 0.4)
else:
self.play(Write(obj), run_time=duration)
content_group.add(obj)
current_y -= (getattr(obj, "height", 0) + line_spacing)
self.wait(0.3)
if len(content_group) > 0:
final_box = SurroundingRectangle(content_group[-1], color=highlight_color, buff=0.2)
self.play(Create(final_box), run_time=0.8)
self.wait(1.5)
"""
try:
with open(script_path, 'w', encoding='utf-8') as f:
f.write(manim_code)
print(f"Generated script at {script_path}")
except Exception as e:
print(f"Error writing script: {e}")
raise
@app.route("/")
def home():
return "Flask Manim Video Generator is Running"
@app.route("/generate", methods=["POST"])
def generate_video():
temp_work_dir = None
try:
raw_data = request.get_json()
if not raw_data:
return jsonify({"error": "No JSON data provided"}), 400
raw_body = raw_data.get("jsondata", '')
if not raw_body:
return jsonify({"error": "No jsondata field in request"}), 400
lst = raw_body.split("&&&&")
if len(lst) < 2:
return jsonify({"error": "Invalid data format, missing &&&&separator"}), 400
cleaned = re.sub(r'(\d)\s*\.\s*(\d)', r'\1.\2', lst[0])
try:
nlist = ast.literal_eval(cleaned)
except Exception as e:
return jsonify({"error": f"Failed to parse slide data: {str(e)}"}), 400
datalst = []
total = 0.0
for line in range(len(nlist)):
try:
total += float(nlist[line][3])
datalst.append({
"type": nlist[line][0].strip(),
"content": nlist[line][1].strip(),
"animation": nlist[line][2].strip().replace(" ", ""),
"duration": float(nlist[line][3])
})
except (IndexError, ValueError) as e:
return jsonify({"error": f"Invalid slide data at index {line}: {str(e)}"}), 400
if total <= 0:
total = 1.0
data = {
"video_settings": {
"background_color": "#0f0f23",
"text_color": "WHITE",
"highlight_color": "YELLOW",
"font": "CMU Serif",
"text_size": 36,
"equation_size": 42,
"title_size": 48
},
"slides": datalst
}
best = lst[1].split("&&&")
lines = best[0]
try:
lang = best[1] if len(best) > 1 else "English"
except:
lang = "English"
length, audio_path = audio_func(0, lines, lang)
if not length or not audio_path or not os.path.exists(audio_path):
return jsonify({"error": "Failed to generate audio"}), 500
scale = float(length) / total if total > 0 else 1.0
if "slides" not in data or not data["slides"]:
return jsonify({"error": "No slides provided in request"}), 400
print(f"Received request with {len(data['slides'])} slides")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_work_dir = os.path.join(TEMP_DIR, f"manim_{timestamp}")
os.makedirs(temp_work_dir, exist_ok=True)
script_path = os.path.join(temp_work_dir, "scene.py")
create_manim_script(data, script_path, audio_path, scale)
print(f"Created Manim script at {script_path}")
quality = 'l'
render_command = [
"manim",
f"-q{quality}",
"--disable_caching",
"--media_dir", temp_work_dir,
script_path,
"GeneratedMathScene"
]
print(f"Running command: {' '.join(render_command)}")
result = subprocess.run(
render_command,
capture_output=True,
text=True,
cwd=temp_work_dir,
timeout=120
)
if result.returncode != 0:
error_msg = result.stderr or result.stdout
print(f"Manim rendering failed: {error_msg}")
return jsonify({
"error": "Manim rendering failed",
"details": error_msg
}), 500
print("Manim rendering completed successfully")
quality_map = {'l': '480p15', 'm': '720p30', 'h': '1080p60'}
video_quality = quality_map.get(quality, '480p15')
video_path = os.path.join(
temp_work_dir,
"videos",
"scene",
video_quality,
"GeneratedMathScene.mp4"
)
if not os.path.exists(video_path):
print(f"Video not found at expected path: {video_path}")
return jsonify({
"error": "Video file not found after rendering",
"expected_path": video_path
}), 500
print(f"Video found at: {video_path}")
output_filename = f"math_video_{timestamp}.mp4"
output_path = os.path.join(MEDIA_DIR, output_filename)
shutil.copy(video_path, output_path)
print(f"Video copied to: {output_path}")
try:
if temp_work_dir and os.path.exists(temp_work_dir):
shutil.rmtree(temp_work_dir)
print("Cleaned up temp directory")
except Exception as e:
print(f"Failed to clean temp dir: {e}")
return send_file(
output_path,
mimetype='video/mp4',
as_attachment=False,
download_name=output_filename
)
except subprocess.TimeoutExpired:
print("Video rendering timeout")
if temp_work_dir and os.path.exists(temp_work_dir):
try:
shutil.rmtree(temp_work_dir)
except:
pass
return jsonify({"error": "Video rendering timeout (120s)"}), 504
except Exception as e:
print(f"Error: {str(e)}")
traceback.print_exc()
if temp_work_dir and os.path.exists(temp_work_dir):
try:
shutil.rmtree(temp_work_dir)
except:
pass
return jsonify({
"error": str(e),
"traceback": traceback.format_exc()
}), 500
if __name__ == '__main__':
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=False)