Spaces:
Running
Running
| """ | |
| Modal backend for tricket — AI product-ad video studio. | |
| Three building blocks, all deployed in one Modal app ("tricket-flux"): | |
| * Model.generate — FLUX.1-schnell text-to-image (A100 GPU) | |
| * TTS.synth — Kokoro text-to-speech, EN + ZH (CPU) | |
| * assemble_video — ffmpeg: Ken Burns stills + burned captions + voiceover | |
| concatenated into a 9:16 MP4 (CPU) | |
| Deploy: modal deploy modal_app.py | |
| The Gradio frontend (app.py) orchestrates these via the Modal SDK. | |
| Weights/models are cached in a Modal Volume so cold starts stay fast. | |
| """ | |
| import io | |
| import modal | |
| MODEL_NAME = "black-forest-labs/FLUX.1-schnell" | |
| CACHE_DIR = "/cache" | |
| app = modal.App("tricket-flux") | |
| # Persistent cache for model weights (shared across cold starts). | |
| weights_volume = modal.Volume.from_name("tricket-flux-cache", create_if_missing=True) | |
| # FLUX.1-schnell is GATED. This Modal Secret (named "huggingface") holds HF_TOKEN. | |
| # modal secret create huggingface HF_TOKEN=hf_xxx | |
| hf_secret = modal.Secret.from_name("huggingface") | |
| # --------------------------------------------------------------------------- | |
| # Images | |
| # --------------------------------------------------------------------------- | |
| flux_image = ( | |
| modal.Image.debian_slim(python_version="3.12") | |
| .pip_install( | |
| "torch==2.5.1", | |
| "diffusers==0.32.1", | |
| "transformers==4.47.1", | |
| "accelerate==1.2.1", | |
| "sentencepiece==0.2.0", | |
| "protobuf==5.29.2", | |
| "pillow==11.0.0", | |
| ) | |
| .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) | |
| ) | |
| tts_image = ( | |
| modal.Image.debian_slim(python_version="3.12") | |
| .apt_install("espeak-ng") | |
| .pip_install("kokoro==0.9.4", "misaki[zh]==0.9.4", "soundfile==0.13.1", "numpy") | |
| .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) | |
| ) | |
| video_image = ( | |
| modal.Image.debian_slim(python_version="3.12") | |
| .apt_install("ffmpeg", "fonts-noto-cjk") | |
| .pip_install("pillow==11.0.0", "numpy") | |
| ) | |
| # LTX-Video (image-to-video) — fast diffusion video model. | |
| ltx_image = ( | |
| modal.Image.debian_slim(python_version="3.12") | |
| .apt_install("ffmpeg") | |
| .pip_install( | |
| "torch==2.5.1", | |
| "diffusers==0.32.1", | |
| "transformers==4.47.1", | |
| "accelerate==1.2.1", | |
| "sentencepiece==0.2.0", | |
| "imageio==2.36.1", | |
| "imageio-ffmpeg==0.5.1", | |
| "pillow==11.0.0", | |
| ) | |
| .env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"}) | |
| ) | |
| LTX_MODEL = "Lightricks/LTX-Video" | |
| # --------------------------------------------------------------------------- | |
| # 1) Text-to-image (FLUX.1-schnell) | |
| # --------------------------------------------------------------------------- | |
| class Model: | |
| def load(self): | |
| import torch | |
| from diffusers import FluxPipeline | |
| self.pipe = FluxPipeline.from_pretrained( | |
| MODEL_NAME, | |
| torch_dtype=torch.bfloat16, | |
| ) | |
| self.pipe.to("cuda") | |
| def generate( | |
| self, | |
| prompt: str, | |
| num_inference_steps: int = 4, | |
| guidance_scale: float = 0.0, | |
| width: int = 1024, | |
| height: int = 1024, | |
| seed: int = -1, | |
| ) -> bytes: | |
| """Generate one image, return PNG bytes.""" | |
| import torch | |
| if not prompt or not prompt.strip(): | |
| raise ValueError("prompt is empty") | |
| generator = None | |
| if seed is not None and seed >= 0: | |
| generator = torch.Generator("cuda").manual_seed(int(seed)) | |
| image = self.pipe( | |
| prompt=prompt.strip(), | |
| num_inference_steps=int(num_inference_steps), | |
| guidance_scale=float(guidance_scale), | |
| width=int(width), | |
| height=int(height), | |
| generator=generator, | |
| ).images[0] | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| return buf.getvalue() | |
| # --------------------------------------------------------------------------- | |
| # 2) Text-to-speech (Kokoro, EN + ZH) | |
| # --------------------------------------------------------------------------- | |
| class TTS: | |
| def setup(self): | |
| self._pipes = {} | |
| def _pipe(self, lang_code: str): | |
| from kokoro import KPipeline | |
| if lang_code not in self._pipes: | |
| self._pipes[lang_code] = KPipeline(lang_code=lang_code) | |
| return self._pipes[lang_code] | |
| def synth(self, text: str, voice: str = "af_heart", lang_code: str = "a") -> bytes: | |
| """Synthesize speech, return 24kHz mono WAV bytes (PCM_16).""" | |
| import numpy as np | |
| import soundfile as sf | |
| text = (text or "").strip() | |
| if not text: | |
| text = "..." | |
| pipe = self._pipe(lang_code) | |
| chunks = [] | |
| for _, _, audio in pipe(text, voice=voice): | |
| if audio is not None: | |
| chunks.append(np.asarray(audio, dtype="float32")) | |
| if not chunks: | |
| # produce ~1s of silence rather than failing the whole video | |
| chunks = [np.zeros(24000, dtype="float32")] | |
| full = np.concatenate(chunks) | |
| buf = io.BytesIO() | |
| sf.write(buf, full, 24000, format="WAV", subtype="PCM_16") | |
| return buf.getvalue() | |
| # --------------------------------------------------------------------------- | |
| # 2b) Image-to-video (LTX-Video) — turns a still scene into a moving clip | |
| # --------------------------------------------------------------------------- | |
| class Animate: | |
| def load(self): | |
| import torch | |
| from diffusers import LTXImageToVideoPipeline | |
| self.pipe = LTXImageToVideoPipeline.from_pretrained( | |
| LTX_MODEL, torch_dtype=torch.bfloat16 | |
| ) | |
| self.pipe.to("cuda") | |
| def animate( | |
| self, | |
| image_png: bytes, | |
| prompt: str = "", | |
| width: int = 704, | |
| height: int = 1216, | |
| num_frames: int = 97, | |
| num_inference_steps: int = 30, | |
| fps: int = 24, | |
| ) -> bytes: | |
| """Image -> short silent MP4 clip (H.264). Returns MP4 bytes.""" | |
| import io | |
| import os | |
| import tempfile | |
| from diffusers.utils import export_to_video | |
| from PIL import Image | |
| img = Image.open(io.BytesIO(image_png)).convert("RGB").resize((width, height)) | |
| motion = (prompt or "").strip() | |
| full_prompt = ( | |
| f"{motion}. Subtle natural motion, gentle camera movement, cinematic, high detail" | |
| if motion | |
| else "Subtle natural motion, gentle camera movement, cinematic, high detail" | |
| ) | |
| frames = self.pipe( | |
| image=img, | |
| prompt=full_prompt, | |
| negative_prompt="worst quality, blurry, distorted, jittery, watermark, text", | |
| width=width, | |
| height=height, | |
| num_frames=int(num_frames), | |
| num_inference_steps=int(num_inference_steps), | |
| ).frames[0] | |
| tmp = tempfile.mkdtemp() | |
| out = os.path.join(tmp, "clip.mp4") | |
| export_to_video(frames, out, fps=fps) | |
| with open(out, "rb") as f: | |
| return f.read() | |
| # --------------------------------------------------------------------------- | |
| # 3) Video assembly (ffmpeg: Ken Burns or LTX clips + captions + voiceover) | |
| # --------------------------------------------------------------------------- | |
| def assemble_video( | |
| scenes: list, | |
| width: int = 720, | |
| height: int = 1280, | |
| fps: int = 30, | |
| ) -> bytes: | |
| """ | |
| scenes: list of dicts, each: | |
| {"image": <png bytes>, "audio": <wav bytes or None>, "caption": <str>} | |
| Returns: MP4 bytes (H.264 + AAC), <width>x<height>. | |
| """ | |
| import math | |
| import os | |
| import subprocess | |
| import tempfile | |
| from PIL import Image, ImageDraw, ImageFont | |
| FONT_PATH = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc" | |
| def probe_duration(path: str) -> float: | |
| try: | |
| out = subprocess.check_output( | |
| [ | |
| "ffprobe", "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| path, | |
| ] | |
| ) | |
| return float(out.decode().strip()) | |
| except Exception: | |
| return 0.0 | |
| def wrap_text(draw, text, font, max_w): | |
| """Wrap by pixel width; works for CJK (char-by-char) and spaced text.""" | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| # If there are spaces, wrap on words; otherwise wrap per character. | |
| tokens = text.split(" ") if " " in text else list(text) | |
| sep = " " if " " in text else "" | |
| lines, cur = [], "" | |
| for tok in tokens: | |
| trial = (cur + sep + tok).strip() if cur else tok | |
| w = draw.textlength(trial, font=font) | |
| if w <= max_w or not cur: | |
| cur = trial | |
| else: | |
| lines.append(cur) | |
| cur = tok | |
| if cur: | |
| lines.append(cur) | |
| return lines[:4] # cap lines so captions never dominate the frame | |
| def make_caption_png(caption: str, out_path: str): | |
| """Transparent WxH overlay with a translucent box + wrapped caption.""" | |
| img = Image.new("RGBA", (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| if not caption or not caption.strip(): | |
| img.save(out_path) | |
| return | |
| font_size = max(28, height // 24) | |
| try: | |
| font = ImageFont.truetype(FONT_PATH, font_size) | |
| except Exception: | |
| font = ImageFont.load_default() | |
| margin = int(width * 0.07) | |
| max_w = width - 2 * margin | |
| lines = wrap_text(draw, caption, font, max_w) | |
| line_h = int(font_size * 1.35) | |
| block_h = line_h * len(lines) | |
| pad = int(font_size * 0.6) | |
| box_top = height - block_h - pad * 2 - int(height * 0.06) | |
| # translucent rounded box | |
| draw.rounded_rectangle( | |
| [margin - pad, box_top - pad, width - margin + pad, box_top + block_h + pad], | |
| radius=24, | |
| fill=(0, 0, 0, 150), | |
| ) | |
| y = box_top | |
| for line in lines: | |
| w = draw.textlength(line, font=font) | |
| x = (width - w) / 2 | |
| # outline for readability | |
| for dx, dy in ((-2, 0), (2, 0), (0, -2), (0, 2)): | |
| draw.text((x + dx, y + dy), line, font=font, fill=(0, 0, 0, 220)) | |
| draw.text((x, y), line, font=font, fill=(255, 255, 255, 255)) | |
| y += line_h | |
| img.save(out_path) | |
| if not scenes: | |
| raise ValueError("no scenes provided") | |
| tmp = tempfile.mkdtemp() | |
| clip_paths = [] | |
| for i, scene in enumerate(scenes): | |
| img_path = os.path.join(tmp, f"img_{i}.png") | |
| with open(img_path, "wb") as f: | |
| f.write(scene["image"]) | |
| # audio (optional) | |
| audio_bytes = scene.get("audio") | |
| audio_path = None | |
| if audio_bytes: | |
| audio_path = os.path.join(tmp, f"aud_{i}.wav") | |
| with open(audio_path, "wb") as f: | |
| f.write(audio_bytes) | |
| dur = probe_duration(audio_path) | |
| else: | |
| dur = 0.0 | |
| dur = max(dur, 2.0) + 0.4 # floor + small tail | |
| cap_path = os.path.join(tmp, f"cap_{i}.png") | |
| make_caption_png(scene.get("caption", ""), cap_path) | |
| clip_path = os.path.join(tmp, f"clip_{i}.mp4") | |
| motion_bytes = scene.get("video") | |
| if motion_bytes: | |
| # ---- LTX motion clip as the base; freeze last frame to fit audio ---- | |
| base_path = os.path.join(tmp, f"base_{i}.mp4") | |
| with open(base_path, "wb") as f: | |
| f.write(motion_bytes) | |
| base_dur = probe_duration(base_path) | |
| pad = max(0.0, dur - base_dur) | |
| vf = ( | |
| f"[0:v]scale={width}:{height}:force_original_aspect_ratio=increase," | |
| f"crop={width}:{height}," | |
| f"tpad=stop_mode=clone:stop_duration={pad:.3f}," | |
| f"fps={fps},format=yuv420p[bg];" | |
| f"[bg][1:v]overlay=0:0,format=yuv420p[v]" | |
| ) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-i", base_path, # 0: motion clip | |
| "-loop", "1", "-i", cap_path, # 1: caption overlay | |
| ] | |
| else: | |
| # ---- Ken Burns on a still image ---- | |
| frames = max(int(math.ceil(dur * fps)), 1) | |
| if i % 2 == 0: | |
| z = "min(zoom+0.0012,1.18)" | |
| else: | |
| z = "if(eq(on,0),1.18,max(zoom-0.0012,1.0))" | |
| vf = ( | |
| f"[0:v]scale={width*2}:{height*2}:force_original_aspect_ratio=increase," | |
| f"crop={width*2}:{height*2}," | |
| f"zoompan=z='{z}':d={frames}:" | |
| f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':s={width}x{height}:fps={fps}," | |
| f"format=yuv420p[bg];" | |
| f"[bg][1:v]overlay=0:0,format=yuv420p[v]" | |
| ) | |
| cmd = [ | |
| "ffmpeg", "-y", | |
| "-loop", "1", "-i", img_path, # 0: scene image | |
| "-i", cap_path, # 1: caption overlay | |
| ] | |
| if audio_path: | |
| cmd += ["-i", audio_path] # 2: voiceover | |
| cmd += ["-filter_complex", vf, "-map", "[v]"] | |
| if audio_path: | |
| cmd += ["-map", "2:a", "-af", "apad", "-c:a", "aac", "-b:a", "128k"] | |
| cmd += [ | |
| "-t", f"{dur:.3f}", | |
| "-r", str(fps), | |
| "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast", | |
| clip_path, | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True) | |
| clip_paths.append(clip_path) | |
| # concat all clips (re-encode for safe joins across slightly differing params) | |
| list_path = os.path.join(tmp, "list.txt") | |
| with open(list_path, "w") as f: | |
| for cp in clip_paths: | |
| f.write(f"file '{cp}'\n") | |
| out_path = os.path.join(tmp, "final.mp4") | |
| subprocess.run( | |
| [ | |
| "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_path, | |
| "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast", | |
| "-c:a", "aac", "-b:a", "128k", | |
| "-movflags", "+faststart", | |
| out_path, | |
| ], | |
| check=True, | |
| capture_output=True, | |
| ) | |
| with open(out_path, "rb") as f: | |
| return f.read() | |
| # --------------------------------------------------------------------------- | |
| # Local smoke tests | |
| # --------------------------------------------------------------------------- | |
| def main(mode: str = "image", prompt: str = "a red panda barista making coffee"): | |
| """ | |
| modal run modal_app.py --mode image | |
| modal run modal_app.py --mode tts | |
| modal run modal_app.py --mode video | |
| """ | |
| if mode == "image": | |
| data = Model().generate.remote(prompt, width=768, height=1344) | |
| with open("smoke_image.png", "wb") as f: | |
| f.write(data) | |
| print(f"OK image — {len(data)} bytes -> smoke_image.png") | |
| elif mode == "tts": | |
| en = TTS().synth.remote("Hello there, this is a tricket voiceover test.", "af_heart", "a") | |
| with open("smoke_en.wav", "wb") as f: | |
| f.write(en) | |
| zh = TTS().synth.remote("你好,这是一段中文配音测试。", "zf_xiaobei", "z") | |
| with open("smoke_zh.wav", "wb") as f: | |
| f.write(zh) | |
| print(f"OK tts — en {len(en)}B, zh {len(zh)}B") | |
| elif mode == "video": | |
| # build a tiny 2-scene ad end to end | |
| img1 = Model().generate.remote( | |
| "sleek wireless earbuds floating on a gradient studio background, product shot", | |
| width=768, height=1344, | |
| ) | |
| img2 = Model().generate.remote( | |
| "a happy person jogging at sunrise wearing wireless earbuds, lifestyle", | |
| width=768, height=1344, | |
| ) | |
| a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a") | |
| a2 = TTS().synth.remote("All-day battery. Crystal-clear calls. Your day, upgraded.", "af_heart", "a") | |
| scenes = [ | |
| {"image": img1, "audio": a1, "caption": "Sound that moves with you"}, | |
| {"image": img2, "audio": a2, "caption": "Your day, upgraded"}, | |
| ] | |
| mp4 = assemble_video.remote(scenes) | |
| with open("smoke_video.mp4", "wb") as f: | |
| f.write(mp4) | |
| print(f"OK video — {len(mp4)} bytes -> smoke_video.mp4") | |
| elif mode == "ltx": | |
| img = Model().generate.remote( | |
| "sleek wireless earbuds on a marble pedestal, studio product shot", | |
| width=704, height=1216, | |
| ) | |
| clip = Animate().animate.remote(img, prompt="wireless earbuds product shot, slow rotation") | |
| with open("smoke_ltx.mp4", "wb") as f: | |
| f.write(clip) | |
| print(f"OK ltx — {len(clip)} bytes -> smoke_ltx.mp4") | |
| elif mode == "ltxvideo": | |
| # full ad with LTX motion clips | |
| prompts = [ | |
| "sleek wireless earbuds floating on a gradient studio background, product shot", | |
| "a happy person jogging at sunrise wearing wireless earbuds, lifestyle", | |
| ] | |
| imgs = [Model().generate.remote(p, width=704, height=1216) for p in prompts] | |
| clips = [Animate().animate.remote(imgs[i], prompt=prompts[i]) for i in range(len(prompts))] | |
| a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a") | |
| a2 = TTS().synth.remote("All-day battery. Your day, upgraded.", "af_heart", "a") | |
| scenes = [ | |
| {"image": imgs[0], "video": clips[0], "audio": a1, "caption": "Sound that moves with you"}, | |
| {"image": imgs[1], "video": clips[1], "audio": a2, "caption": "Your day, upgraded"}, | |
| ] | |
| mp4 = assemble_video.remote(scenes) | |
| with open("smoke_ltxvideo.mp4", "wb") as f: | |
| f.write(mp4) | |
| print(f"OK ltxvideo — {len(mp4)} bytes -> smoke_ltxvideo.mp4") | |
| else: | |
| print(f"unknown mode: {mode}") | |