""" Modal backend for tricket — AI product-ad video studio. Three building blocks, all deployed in one Modal app ("tricket-flux"): * Model.generate — FLUX.1-schnell text-to-image (A100 GPU) * TTS.synth — Kokoro text-to-speech, EN + ZH (CPU) * assemble_video — ffmpeg: Ken Burns stills + burned captions + voiceover concatenated into a 9:16 MP4 (CPU) Deploy: modal deploy modal_app.py The Gradio frontend (app.py) orchestrates these via the Modal SDK. Weights/models are cached in a Modal Volume so cold starts stay fast. """ import io import modal MODEL_NAME = "black-forest-labs/FLUX.1-schnell" CACHE_DIR = "/cache" app = modal.App("tricket-flux") # Persistent cache for model weights (shared across cold starts). weights_volume = modal.Volume.from_name("tricket-flux-cache", create_if_missing=True) # FLUX.1-schnell is GATED. This Modal Secret (named "huggingface") holds HF_TOKEN. # modal secret create huggingface HF_TOKEN=hf_xxx hf_secret = modal.Secret.from_name("huggingface") # --------------------------------------------------------------------------- # Images # --------------------------------------------------------------------------- flux_image = ( modal.Image.debian_slim(python_version="3.12") .pip_install( "torch==2.5.1", "diffusers==0.32.1", "transformers==4.47.1", "accelerate==1.2.1", "sentencepiece==0.2.0", "protobuf==5.29.2", "pillow==11.0.0", ) .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) ) tts_image = ( modal.Image.debian_slim(python_version="3.12") .apt_install("espeak-ng") .pip_install("kokoro==0.9.4", "misaki[zh]==0.9.4", "soundfile==0.13.1", "numpy") .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) ) video_image = ( modal.Image.debian_slim(python_version="3.12") .apt_install("ffmpeg", "fonts-noto-cjk") .pip_install("pillow==11.0.0", "numpy") ) # LTX-Video (image-to-video) — fast diffusion video model. ltx_image = ( modal.Image.debian_slim(python_version="3.12") .apt_install("ffmpeg") .pip_install( "torch==2.5.1", "diffusers==0.32.1", "transformers==4.47.1", "accelerate==1.2.1", "sentencepiece==0.2.0", "imageio==2.36.1", "imageio-ffmpeg==0.5.1", "pillow==11.0.0", ) .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) ) LTX_MODEL = "Lightricks/LTX-Video" # --------------------------------------------------------------------------- # 1) Text-to-image (FLUX.1-schnell) # --------------------------------------------------------------------------- @app.cls( gpu="A100", image=flux_image, volumes={CACHE_DIR: weights_volume}, secrets=[hf_secret], scaledown_window=300, timeout=600, ) class Model: @modal.enter() def load(self): import torch from diffusers import FluxPipeline self.pipe = FluxPipeline.from_pretrained( MODEL_NAME, torch_dtype=torch.bfloat16, ) self.pipe.to("cuda") @modal.method() def generate( self, prompt: str, num_inference_steps: int = 4, guidance_scale: float = 0.0, width: int = 1024, height: int = 1024, seed: int = -1, ) -> bytes: """Generate one image, return PNG bytes.""" import torch if not prompt or not prompt.strip(): raise ValueError("prompt is empty") generator = None if seed is not None and seed >= 0: generator = torch.Generator("cuda").manual_seed(int(seed)) image = self.pipe( prompt=prompt.strip(), num_inference_steps=int(num_inference_steps), guidance_scale=float(guidance_scale), width=int(width), height=int(height), generator=generator, ).images[0] buf = io.BytesIO() image.save(buf, format="PNG") return buf.getvalue() # --------------------------------------------------------------------------- # 2) Text-to-speech (Kokoro, EN + ZH) # --------------------------------------------------------------------------- @app.cls( image=tts_image, volumes={CACHE_DIR: weights_volume}, scaledown_window=240, timeout=300, ) class TTS: @modal.enter() def setup(self): self._pipes = {} def _pipe(self, lang_code: str): from kokoro import KPipeline if lang_code not in self._pipes: self._pipes[lang_code] = KPipeline(lang_code=lang_code) return self._pipes[lang_code] @modal.method() def synth(self, text: str, voice: str = "af_heart", lang_code: str = "a") -> bytes: """Synthesize speech, return 24kHz mono WAV bytes (PCM_16).""" import numpy as np import soundfile as sf text = (text or "").strip() if not text: text = "..." pipe = self._pipe(lang_code) chunks = [] for _, _, audio in pipe(text, voice=voice): if audio is not None: chunks.append(np.asarray(audio, dtype="float32")) if not chunks: # produce ~1s of silence rather than failing the whole video chunks = [np.zeros(24000, dtype="float32")] full = np.concatenate(chunks) buf = io.BytesIO() sf.write(buf, full, 24000, format="WAV", subtype="PCM_16") return buf.getvalue() # --------------------------------------------------------------------------- # 2b) Image-to-video (LTX-Video) — turns a still scene into a moving clip # --------------------------------------------------------------------------- @app.cls( gpu="A100", image=ltx_image, volumes={CACHE_DIR: weights_volume}, scaledown_window=300, timeout=900, ) class Animate: @modal.enter() def load(self): import torch from diffusers import LTXImageToVideoPipeline self.pipe = LTXImageToVideoPipeline.from_pretrained( LTX_MODEL, torch_dtype=torch.bfloat16 ) self.pipe.to("cuda") @modal.method() def animate( self, image_png: bytes, prompt: str = "", width: int = 704, height: int = 1216, num_frames: int = 97, num_inference_steps: int = 30, fps: int = 24, ) -> bytes: """Image -> short silent MP4 clip (H.264). Returns MP4 bytes.""" import io import os import tempfile from diffusers.utils import export_to_video from PIL import Image img = Image.open(io.BytesIO(image_png)).convert("RGB").resize((width, height)) motion = (prompt or "").strip() full_prompt = ( f"{motion}. Subtle natural motion, gentle camera movement, cinematic, high detail" if motion else "Subtle natural motion, gentle camera movement, cinematic, high detail" ) frames = self.pipe( image=img, prompt=full_prompt, negative_prompt="worst quality, blurry, distorted, jittery, watermark, text", width=width, height=height, num_frames=int(num_frames), num_inference_steps=int(num_inference_steps), ).frames[0] tmp = tempfile.mkdtemp() out = os.path.join(tmp, "clip.mp4") export_to_video(frames, out, fps=fps) with open(out, "rb") as f: return f.read() # --------------------------------------------------------------------------- # 3) Video assembly (ffmpeg: Ken Burns or LTX clips + captions + voiceover) # --------------------------------------------------------------------------- @app.function(image=video_image, timeout=600) def assemble_video( scenes: list, width: int = 720, height: int = 1280, fps: int = 30, ) -> bytes: """ scenes: list of dicts, each: {"image": , "audio": , "caption": } Returns: MP4 bytes (H.264 + AAC), x. """ import math import os import subprocess import tempfile from PIL import Image, ImageDraw, ImageFont FONT_PATH = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc" def probe_duration(path: str) -> float: try: out = subprocess.check_output( [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] ) return float(out.decode().strip()) except Exception: return 0.0 def wrap_text(draw, text, font, max_w): """Wrap by pixel width; works for CJK (char-by-char) and spaced text.""" text = (text or "").strip() if not text: return [] # If there are spaces, wrap on words; otherwise wrap per character. tokens = text.split(" ") if " " in text else list(text) sep = " " if " " in text else "" lines, cur = [], "" for tok in tokens: trial = (cur + sep + tok).strip() if cur else tok w = draw.textlength(trial, font=font) if w <= max_w or not cur: cur = trial else: lines.append(cur) cur = tok if cur: lines.append(cur) return lines[:4] # cap lines so captions never dominate the frame def make_caption_png(caption: str, out_path: str): """Transparent WxH overlay with a translucent box + wrapped caption.""" img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) if not caption or not caption.strip(): img.save(out_path) return font_size = max(28, height // 24) try: font = ImageFont.truetype(FONT_PATH, font_size) except Exception: font = ImageFont.load_default() margin = int(width * 0.07) max_w = width - 2 * margin lines = wrap_text(draw, caption, font, max_w) line_h = int(font_size * 1.35) block_h = line_h * len(lines) pad = int(font_size * 0.6) box_top = height - block_h - pad * 2 - int(height * 0.06) # translucent rounded box draw.rounded_rectangle( [margin - pad, box_top - pad, width - margin + pad, box_top + block_h + pad], radius=24, fill=(0, 0, 0, 150), ) y = box_top for line in lines: w = draw.textlength(line, font=font) x = (width - w) / 2 # outline for readability for dx, dy in ((-2, 0), (2, 0), (0, -2), (0, 2)): draw.text((x + dx, y + dy), line, font=font, fill=(0, 0, 0, 220)) draw.text((x, y), line, font=font, fill=(255, 255, 255, 255)) y += line_h img.save(out_path) if not scenes: raise ValueError("no scenes provided") tmp = tempfile.mkdtemp() clip_paths = [] for i, scene in enumerate(scenes): img_path = os.path.join(tmp, f"img_{i}.png") with open(img_path, "wb") as f: f.write(scene["image"]) # audio (optional) audio_bytes = scene.get("audio") audio_path = None if audio_bytes: audio_path = os.path.join(tmp, f"aud_{i}.wav") with open(audio_path, "wb") as f: f.write(audio_bytes) dur = probe_duration(audio_path) else: dur = 0.0 dur = max(dur, 2.0) + 0.4 # floor + small tail cap_path = os.path.join(tmp, f"cap_{i}.png") make_caption_png(scene.get("caption", ""), cap_path) clip_path = os.path.join(tmp, f"clip_{i}.mp4") motion_bytes = scene.get("video") if motion_bytes: # ---- LTX motion clip as the base; freeze last frame to fit audio ---- base_path = os.path.join(tmp, f"base_{i}.mp4") with open(base_path, "wb") as f: f.write(motion_bytes) base_dur = probe_duration(base_path) pad = max(0.0, dur - base_dur) vf = ( f"[0:v]scale={width}:{height}:force_original_aspect_ratio=increase," f"crop={width}:{height}," f"tpad=stop_mode=clone:stop_duration={pad:.3f}," f"fps={fps},format=yuv420p[bg];" f"[bg][1:v]overlay=0:0,format=yuv420p[v]" ) cmd = [ "ffmpeg", "-y", "-i", base_path, # 0: motion clip "-loop", "1", "-i", cap_path, # 1: caption overlay ] else: # ---- Ken Burns on a still image ---- frames = max(int(math.ceil(dur * fps)), 1) if i % 2 == 0: z = "min(zoom+0.0012,1.18)" else: z = "if(eq(on,0),1.18,max(zoom-0.0012,1.0))" vf = ( f"[0:v]scale={width*2}:{height*2}:force_original_aspect_ratio=increase," f"crop={width*2}:{height*2}," f"zoompan=z='{z}':d={frames}:" f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':s={width}x{height}:fps={fps}," f"format=yuv420p[bg];" f"[bg][1:v]overlay=0:0,format=yuv420p[v]" ) cmd = [ "ffmpeg", "-y", "-loop", "1", "-i", img_path, # 0: scene image "-i", cap_path, # 1: caption overlay ] if audio_path: cmd += ["-i", audio_path] # 2: voiceover cmd += ["-filter_complex", vf, "-map", "[v]"] if audio_path: cmd += ["-map", "2:a", "-af", "apad", "-c:a", "aac", "-b:a", "128k"] cmd += [ "-t", f"{dur:.3f}", "-r", str(fps), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast", clip_path, ] subprocess.run(cmd, check=True, capture_output=True) clip_paths.append(clip_path) # concat all clips (re-encode for safe joins across slightly differing params) list_path = os.path.join(tmp, "list.txt") with open(list_path, "w") as f: for cp in clip_paths: f.write(f"file '{cp}'\n") out_path = os.path.join(tmp, "final.mp4") subprocess.run( [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_path, "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", out_path, ], check=True, capture_output=True, ) with open(out_path, "rb") as f: return f.read() # --------------------------------------------------------------------------- # Local smoke tests # --------------------------------------------------------------------------- @app.local_entrypoint() def main(mode: str = "image", prompt: str = "a red panda barista making coffee"): """ modal run modal_app.py --mode image modal run modal_app.py --mode tts modal run modal_app.py --mode video """ if mode == "image": data = Model().generate.remote(prompt, width=768, height=1344) with open("smoke_image.png", "wb") as f: f.write(data) print(f"OK image — {len(data)} bytes -> smoke_image.png") elif mode == "tts": en = TTS().synth.remote("Hello there, this is a tricket voiceover test.", "af_heart", "a") with open("smoke_en.wav", "wb") as f: f.write(en) zh = TTS().synth.remote("你好,这是一段中文配音测试。", "zf_xiaobei", "z") with open("smoke_zh.wav", "wb") as f: f.write(zh) print(f"OK tts — en {len(en)}B, zh {len(zh)}B") elif mode == "video": # build a tiny 2-scene ad end to end img1 = Model().generate.remote( "sleek wireless earbuds floating on a gradient studio background, product shot", width=768, height=1344, ) img2 = Model().generate.remote( "a happy person jogging at sunrise wearing wireless earbuds, lifestyle", width=768, height=1344, ) a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a") a2 = TTS().synth.remote("All-day battery. Crystal-clear calls. Your day, upgraded.", "af_heart", "a") scenes = [ {"image": img1, "audio": a1, "caption": "Sound that moves with you"}, {"image": img2, "audio": a2, "caption": "Your day, upgraded"}, ] mp4 = assemble_video.remote(scenes) with open("smoke_video.mp4", "wb") as f: f.write(mp4) print(f"OK video — {len(mp4)} bytes -> smoke_video.mp4") elif mode == "ltx": img = Model().generate.remote( "sleek wireless earbuds on a marble pedestal, studio product shot", width=704, height=1216, ) clip = Animate().animate.remote(img, prompt="wireless earbuds product shot, slow rotation") with open("smoke_ltx.mp4", "wb") as f: f.write(clip) print(f"OK ltx — {len(clip)} bytes -> smoke_ltx.mp4") elif mode == "ltxvideo": # full ad with LTX motion clips prompts = [ "sleek wireless earbuds floating on a gradient studio background, product shot", "a happy person jogging at sunrise wearing wireless earbuds, lifestyle", ] imgs = [Model().generate.remote(p, width=704, height=1216) for p in prompts] clips = [Animate().animate.remote(imgs[i], prompt=prompts[i]) for i in range(len(prompts))] a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a") a2 = TTS().synth.remote("All-day battery. Your day, upgraded.", "af_heart", "a") scenes = [ {"image": imgs[0], "video": clips[0], "audio": a1, "caption": "Sound that moves with you"}, {"image": imgs[1], "video": clips[1], "audio": a2, "caption": "Your day, upgraded"}, ] mp4 = assemble_video.remote(scenes) with open("smoke_ltxvideo.mp4", "wb") as f: f.write(mp4) print(f"OK ltxvideo — {len(mp4)} bytes -> smoke_ltxvideo.mp4") else: print(f"unknown mode: {mode}")