| | |
| | |
| |
|
| | import modal |
| | import os |
| |
|
| | |
| | app = modal.App("aiquoteclipgenerator") |
| |
|
| | |
| | image = modal.Image.debian_slim(python_version="3.11").pip_install( |
| | "moviepy==1.0.3", |
| | "pillow", |
| | "numpy", |
| | "imageio==2.31.1", |
| | "imageio-ffmpeg", |
| | "requests", |
| | "fastapi", |
| | ) |
| |
|
| |
|
| | @app.function( |
| | image=image, |
| | cpu=2, |
| | memory=2048, |
| | timeout=180, |
| | concurrency_limit=10, |
| | allow_concurrent_inputs=10, |
| | container_idle_timeout=120, |
| | ) |
| | def process_quote_video( |
| | video_url: str, |
| | quote_text: str, |
| | audio_b64: str | None = None, |
| | text_style: str = "classic_center", |
| | ) -> bytes: |
| | """ |
| | Process a quote video on Modal. |
| | |
| | - Downloads a portrait/background video from `video_url`. |
| | - Overlays `quote_text` using a chosen `text_style`. |
| | - If `audio_b64` is provided, decodes it and: |
| | * sets it as the audio track |
| | * makes video duration roughly match audio (with min/max bounds). |
| | |
| | Duration rules: |
| | - With audio: |
| | target = audio_duration + 0.5s |
| | MIN = 7s, MAX = 20s |
| | - Without audio: |
| | target = min(original_video_duration, 15s) |
| | |
| | Returns: |
| | Raw bytes of the final MP4 video. |
| | """ |
| | import tempfile |
| | import requests |
| | from moviepy.editor import ( |
| | VideoFileClip, |
| | ImageClip, |
| | CompositeVideoClip, |
| | AudioFileClip, |
| | ) |
| | from moviepy.video.fx.all import loop as vfx_loop |
| | from PIL import Image, ImageDraw, ImageFont |
| | import numpy as np |
| | import time |
| | import base64 |
| |
|
| | start_time = time.time() |
| |
|
| | |
| | |
| | |
| | resp = requests.get(video_url, stream=True, timeout=30) |
| | resp.raise_for_status() |
| |
|
| | temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
| | with open(temp_video.name, "wb") as f: |
| | for chunk in resp.iter_content(chunk_size=1024 * 1024): |
| | f.write(chunk) |
| |
|
| | |
| | |
| | |
| | video = VideoFileClip(temp_video.name) |
| | orig_duration = video.duration |
| |
|
| | |
| | |
| | |
| | audio_clip = None |
| | temp_audio_path = None |
| |
|
| | |
| | target_duration = orig_duration |
| |
|
| | if audio_b64: |
| | try: |
| | temp_audio = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") |
| | temp_audio_path = temp_audio.name |
| | temp_audio.close() |
| |
|
| | audio_bytes = base64.b64decode(audio_b64) |
| | with open(temp_audio_path, "wb") as f: |
| | f.write(audio_bytes) |
| |
|
| | audio_clip = AudioFileClip(temp_audio_path) |
| | audio_duration = audio_clip.duration |
| |
|
| | |
| | MIN_DUR = 7.0 |
| | MAX_DUR = 20.0 |
| | target_duration = audio_duration + 0.5 |
| | if target_duration < MIN_DUR: |
| | target_duration = MIN_DUR |
| | if target_duration > MAX_DUR: |
| | target_duration = MAX_DUR |
| |
|
| | |
| | if target_duration > video.duration: |
| | video = vfx_loop(video, duration=target_duration) |
| | elif target_duration < video.duration: |
| | video = video.subclip(0, target_duration) |
| |
|
| | except Exception as e: |
| | print(f"⚠️ Audio handling error: {e}") |
| | audio_clip = None |
| | |
| |
|
| | if audio_clip is None: |
| | |
| | MAX_NO_AUDIO = 15.0 |
| | if orig_duration > MAX_NO_AUDIO: |
| | target_duration = MAX_NO_AUDIO |
| | video = video.subclip(0, target_duration) |
| | else: |
| | target_duration = orig_duration |
| |
|
| | |
| | w, h = video.size |
| |
|
| | |
| | |
| | |
| | def make_text_frame(t): |
| | img = Image.new("RGBA", (w, h), (0, 0, 0, 0)) |
| | draw = ImageDraw.Draw(img) |
| |
|
| | font_size = int(h * 0.025) |
| |
|
| | try: |
| | font = ImageFont.truetype( |
| | "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size |
| | ) |
| | except Exception: |
| | font = ImageFont.load_default() |
| |
|
| | max_width = int(w * 0.6) |
| |
|
| | |
| | words = quote_text.split() |
| | lines = [] |
| | current_line = [] |
| |
|
| | for word in words: |
| | test_line = " ".join(current_line + [word]) |
| | bbox = draw.textbbox((0, 0), test_line, font=font) |
| | text_width = bbox[2] - bbox[0] |
| |
|
| | if text_width <= max_width: |
| | current_line.append(word) |
| | else: |
| | if current_line: |
| | lines.append(" ".join(current_line)) |
| | current_line = [word] |
| | else: |
| | lines.append(word) |
| |
|
| | if current_line: |
| | lines.append(" ".join(current_line)) |
| |
|
| | line_spacing = int(font_size * 0.4) |
| | text_block_height = len(lines) * (font_size + line_spacing) |
| |
|
| | |
| | style = (text_style or "classic_center").lower().strip() |
| |
|
| | if style == "lower_third_serif": |
| | |
| | y_start = int(h * 0.60) - text_block_height // 2 |
| | elif style == "typewriter_top": |
| | |
| | y_start = int(h * 0.20) |
| | else: |
| | |
| | y_start = (h - text_block_height) // 2 |
| |
|
| | y = y_start |
| |
|
| | for line in lines: |
| | bbox = draw.textbbox((0, 0), line, font=font) |
| | text_width = bbox[2] - bbox[0] |
| | x = (w - text_width) // 2 |
| |
|
| | outline_width = max(2, int(font_size * 0.08)) |
| | for adj_x in range(-outline_width, outline_width + 1): |
| | for adj_y in range(-outline_width, outline_width + 1): |
| | draw.text((x + adj_x, y + adj_y), line, font=font, fill="black") |
| |
|
| | draw.text((x, y), line, font=font, fill="white") |
| | y += font_size + line_spacing |
| |
|
| | return np.array(img) |
| |
|
| | text_clip = ImageClip(make_text_frame(0), duration=video.duration) |
| |
|
| | |
| | |
| | |
| | final_video = CompositeVideoClip([video, text_clip]) |
| |
|
| | |
| | if audio_clip is not None: |
| | try: |
| | final_video = final_video.set_audio(audio_clip) |
| | except Exception as e: |
| | print(f"⚠️ Could not attach audio: {e}") |
| |
|
| | |
| | |
| | |
| | output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
| | output_path = output_file.name |
| | output_file.close() |
| |
|
| | final_video.write_videofile( |
| | output_path, |
| | codec="libx264", |
| | audio_codec="aac", |
| | fps=10, |
| | preset="ultrafast", |
| | threads=2, |
| | verbose=False, |
| | logger=None, |
| | bitrate="400k", |
| | ffmpeg_params=["-crf", "30", "-g", "30"], |
| | ) |
| |
|
| | |
| | with open(output_path, "rb") as f: |
| | video_bytes = f.read() |
| |
|
| | |
| | |
| | |
| | video.close() |
| | final_video.close() |
| |
|
| | try: |
| | os.unlink(temp_video.name) |
| | except Exception: |
| | pass |
| |
|
| | if audio_clip is not None: |
| | try: |
| | audio_clip.close() |
| | except Exception: |
| | pass |
| | if temp_audio_path and os.path.exists(temp_audio_path): |
| | try: |
| | os.unlink(temp_audio_path) |
| | except Exception: |
| | pass |
| |
|
| | try: |
| | os.unlink(output_path) |
| | except Exception: |
| | pass |
| |
|
| | total_time = time.time() - start_time |
| | print( |
| | f"🎉 Total: {total_time:.1f}s, Size: {len(video_bytes) / 1024 / 1024:.2f}MB, " |
| | f"text_style={text_style}, target_duration≈{target_duration:.1f}s" |
| | ) |
| |
|
| | return video_bytes |
| |
|
| |
|
| | @app.function(image=image) |
| | @modal.web_endpoint(method="POST") |
| | def process_video_endpoint(data: dict): |
| | """ |
| | Single-video HTTP endpoint. |
| | Expected JSON: |
| | { |
| | "video_url": "...", |
| | "quote_text": "...", |
| | "audio_b64": "....", # optional |
| | "text_style": "classic_center" | "lower_third_serif" | "typewriter_top" # optional |
| | } |
| | """ |
| | video_url = data.get("video_url") |
| | quote_text = data.get("quote_text") |
| | audio_b64 = data.get("audio_b64") |
| | text_style = data.get("text_style", "classic_center") |
| |
|
| | if not video_url or not quote_text: |
| | return {"error": "Missing video_url or quote_text"}, 400 |
| |
|
| | try: |
| | video_bytes = process_quote_video.remote( |
| | video_url=video_url, |
| | quote_text=quote_text, |
| | audio_b64=audio_b64, |
| | text_style=text_style, |
| | ) |
| |
|
| | import base64 |
| |
|
| | video_b64 = base64.b64encode(video_bytes).decode() |
| |
|
| | return { |
| | "success": True, |
| | "video": video_b64, |
| | "size_mb": len(video_bytes) / 1024 / 1024, |
| | } |
| |
|
| | except Exception as e: |
| | return {"error": str(e)}, 500 |
| |
|
| |
|
| | @app.function(image=image) |
| | @modal.web_endpoint(method="POST") |
| | def process_batch_endpoint(data: dict): |
| | """ |
| | Batch endpoint - process multiple videos in PARALLEL. |
| | |
| | Expected JSON: |
| | { |
| | "videos": [ |
| | { |
| | "video_url": "...", |
| | "quote_text": "...", |
| | "audio_b64": "...", # optional |
| | "text_style": "..." # optional |
| | }, |
| | ... |
| | ] |
| | } |
| | """ |
| | videos_data = data.get("videos", []) |
| |
|
| | if not videos_data: |
| | return {"error": "Missing videos array"}, 400 |
| |
|
| | try: |
| | |
| | video_urls = [v.get("video_url") for v in videos_data] |
| | quote_texts = [v.get("quote_text") for v in videos_data] |
| | audio_list = [v.get("audio_b64") for v in videos_data] |
| | styles = [v.get("text_style", "classic_center") for v in videos_data] |
| |
|
| | |
| | for i, (vu, qt) in enumerate(zip(video_urls, quote_texts)): |
| | if not vu or not qt: |
| | return {"error": f"Missing video_url or quote_text at index {i}"}, 400 |
| |
|
| | |
| | results = list( |
| | process_quote_video.map( |
| | video_urls, |
| | quote_texts, |
| | audio_list, |
| | styles, |
| | ) |
| | ) |
| |
|
| | import base64 |
| |
|
| | encoded_results = [] |
| | for video_bytes in results: |
| | video_b64 = base64.b64encode(video_bytes).decode() |
| | encoded_results.append( |
| | { |
| | "success": True, |
| | "video": video_b64, |
| | "size_mb": len(video_bytes) / 1024 / 1024, |
| | } |
| | ) |
| |
|
| | return { |
| | "success": True, |
| | "videos": encoded_results, |
| | "count": len(encoded_results), |
| | } |
| |
|
| | except Exception as e: |
| | return {"error": str(e)}, 500 |
| |
|