tricket / modal_app.py
lerp666
Add LTX-Video image-to-video motion mode (toggle, keeps Ken Burns default)
ae19991
Raw
History Blame Contribute Delete
18.5 kB
"""
Modal backend for tricket — AI product-ad video studio.
Three building blocks, all deployed in one Modal app ("tricket-flux"):
* Model.generate — FLUX.1-schnell text-to-image (A100 GPU)
* TTS.synth — Kokoro text-to-speech, EN + ZH (CPU)
* assemble_video — ffmpeg: Ken Burns stills + burned captions + voiceover
concatenated into a 9:16 MP4 (CPU)
Deploy: modal deploy modal_app.py
The Gradio frontend (app.py) orchestrates these via the Modal SDK.
Weights/models are cached in a Modal Volume so cold starts stay fast.
"""
import io
import modal
MODEL_NAME = "black-forest-labs/FLUX.1-schnell"
CACHE_DIR = "/cache"
app = modal.App("tricket-flux")
# Persistent cache for model weights (shared across cold starts).
weights_volume = modal.Volume.from_name("tricket-flux-cache", create_if_missing=True)
# FLUX.1-schnell is GATED. This Modal Secret (named "huggingface") holds HF_TOKEN.
# modal secret create huggingface HF_TOKEN=hf_xxx
hf_secret = modal.Secret.from_name("huggingface")
# ---------------------------------------------------------------------------
# Images
# ---------------------------------------------------------------------------
flux_image = (
modal.Image.debian_slim(python_version="3.12")
.pip_install(
"torch==2.5.1",
"diffusers==0.32.1",
"transformers==4.47.1",
"accelerate==1.2.1",
"sentencepiece==0.2.0",
"protobuf==5.29.2",
"pillow==11.0.0",
)
.env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"})
)
tts_image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("espeak-ng")
.pip_install("kokoro==0.9.4", "misaki[zh]==0.9.4", "soundfile==0.13.1", "numpy")
.env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"})
)
video_image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg", "fonts-noto-cjk")
.pip_install("pillow==11.0.0", "numpy")
)
# LTX-Video (image-to-video) — fast diffusion video model.
ltx_image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg")
.pip_install(
"torch==2.5.1",
"diffusers==0.32.1",
"transformers==4.47.1",
"accelerate==1.2.1",
"sentencepiece==0.2.0",
"imageio==2.36.1",
"imageio-ffmpeg==0.5.1",
"pillow==11.0.0",
)
.env({"HF_HOME": CACHE_DIR, "HF_HUB_ENABLE_HF_TRANSFER": "0"})
)
LTX_MODEL = "Lightricks/LTX-Video"
# ---------------------------------------------------------------------------
# 1) Text-to-image (FLUX.1-schnell)
# ---------------------------------------------------------------------------
@app.cls(
gpu="A100",
image=flux_image,
volumes={CACHE_DIR: weights_volume},
secrets=[hf_secret],
scaledown_window=300,
timeout=600,
)
class Model:
@modal.enter()
def load(self):
import torch
from diffusers import FluxPipeline
self.pipe = FluxPipeline.from_pretrained(
MODEL_NAME,
torch_dtype=torch.bfloat16,
)
self.pipe.to("cuda")
@modal.method()
def generate(
self,
prompt: str,
num_inference_steps: int = 4,
guidance_scale: float = 0.0,
width: int = 1024,
height: int = 1024,
seed: int = -1,
) -> bytes:
"""Generate one image, return PNG bytes."""
import torch
if not prompt or not prompt.strip():
raise ValueError("prompt is empty")
generator = None
if seed is not None and seed >= 0:
generator = torch.Generator("cuda").manual_seed(int(seed))
image = self.pipe(
prompt=prompt.strip(),
num_inference_steps=int(num_inference_steps),
guidance_scale=float(guidance_scale),
width=int(width),
height=int(height),
generator=generator,
).images[0]
buf = io.BytesIO()
image.save(buf, format="PNG")
return buf.getvalue()
# ---------------------------------------------------------------------------
# 2) Text-to-speech (Kokoro, EN + ZH)
# ---------------------------------------------------------------------------
@app.cls(
image=tts_image,
volumes={CACHE_DIR: weights_volume},
scaledown_window=240,
timeout=300,
)
class TTS:
@modal.enter()
def setup(self):
self._pipes = {}
def _pipe(self, lang_code: str):
from kokoro import KPipeline
if lang_code not in self._pipes:
self._pipes[lang_code] = KPipeline(lang_code=lang_code)
return self._pipes[lang_code]
@modal.method()
def synth(self, text: str, voice: str = "af_heart", lang_code: str = "a") -> bytes:
"""Synthesize speech, return 24kHz mono WAV bytes (PCM_16)."""
import numpy as np
import soundfile as sf
text = (text or "").strip()
if not text:
text = "..."
pipe = self._pipe(lang_code)
chunks = []
for _, _, audio in pipe(text, voice=voice):
if audio is not None:
chunks.append(np.asarray(audio, dtype="float32"))
if not chunks:
# produce ~1s of silence rather than failing the whole video
chunks = [np.zeros(24000, dtype="float32")]
full = np.concatenate(chunks)
buf = io.BytesIO()
sf.write(buf, full, 24000, format="WAV", subtype="PCM_16")
return buf.getvalue()
# ---------------------------------------------------------------------------
# 2b) Image-to-video (LTX-Video) — turns a still scene into a moving clip
# ---------------------------------------------------------------------------
@app.cls(
gpu="A100",
image=ltx_image,
volumes={CACHE_DIR: weights_volume},
scaledown_window=300,
timeout=900,
)
class Animate:
@modal.enter()
def load(self):
import torch
from diffusers import LTXImageToVideoPipeline
self.pipe = LTXImageToVideoPipeline.from_pretrained(
LTX_MODEL, torch_dtype=torch.bfloat16
)
self.pipe.to("cuda")
@modal.method()
def animate(
self,
image_png: bytes,
prompt: str = "",
width: int = 704,
height: int = 1216,
num_frames: int = 97,
num_inference_steps: int = 30,
fps: int = 24,
) -> bytes:
"""Image -> short silent MP4 clip (H.264). Returns MP4 bytes."""
import io
import os
import tempfile
from diffusers.utils import export_to_video
from PIL import Image
img = Image.open(io.BytesIO(image_png)).convert("RGB").resize((width, height))
motion = (prompt or "").strip()
full_prompt = (
f"{motion}. Subtle natural motion, gentle camera movement, cinematic, high detail"
if motion
else "Subtle natural motion, gentle camera movement, cinematic, high detail"
)
frames = self.pipe(
image=img,
prompt=full_prompt,
negative_prompt="worst quality, blurry, distorted, jittery, watermark, text",
width=width,
height=height,
num_frames=int(num_frames),
num_inference_steps=int(num_inference_steps),
).frames[0]
tmp = tempfile.mkdtemp()
out = os.path.join(tmp, "clip.mp4")
export_to_video(frames, out, fps=fps)
with open(out, "rb") as f:
return f.read()
# ---------------------------------------------------------------------------
# 3) Video assembly (ffmpeg: Ken Burns or LTX clips + captions + voiceover)
# ---------------------------------------------------------------------------
@app.function(image=video_image, timeout=600)
def assemble_video(
scenes: list,
width: int = 720,
height: int = 1280,
fps: int = 30,
) -> bytes:
"""
scenes: list of dicts, each:
{"image": <png bytes>, "audio": <wav bytes or None>, "caption": <str>}
Returns: MP4 bytes (H.264 + AAC), <width>x<height>.
"""
import math
import os
import subprocess
import tempfile
from PIL import Image, ImageDraw, ImageFont
FONT_PATH = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"
def probe_duration(path: str) -> float:
try:
out = subprocess.check_output(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
]
)
return float(out.decode().strip())
except Exception:
return 0.0
def wrap_text(draw, text, font, max_w):
"""Wrap by pixel width; works for CJK (char-by-char) and spaced text."""
text = (text or "").strip()
if not text:
return []
# If there are spaces, wrap on words; otherwise wrap per character.
tokens = text.split(" ") if " " in text else list(text)
sep = " " if " " in text else ""
lines, cur = [], ""
for tok in tokens:
trial = (cur + sep + tok).strip() if cur else tok
w = draw.textlength(trial, font=font)
if w <= max_w or not cur:
cur = trial
else:
lines.append(cur)
cur = tok
if cur:
lines.append(cur)
return lines[:4] # cap lines so captions never dominate the frame
def make_caption_png(caption: str, out_path: str):
"""Transparent WxH overlay with a translucent box + wrapped caption."""
img = Image.new("RGBA", (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
if not caption or not caption.strip():
img.save(out_path)
return
font_size = max(28, height // 24)
try:
font = ImageFont.truetype(FONT_PATH, font_size)
except Exception:
font = ImageFont.load_default()
margin = int(width * 0.07)
max_w = width - 2 * margin
lines = wrap_text(draw, caption, font, max_w)
line_h = int(font_size * 1.35)
block_h = line_h * len(lines)
pad = int(font_size * 0.6)
box_top = height - block_h - pad * 2 - int(height * 0.06)
# translucent rounded box
draw.rounded_rectangle(
[margin - pad, box_top - pad, width - margin + pad, box_top + block_h + pad],
radius=24,
fill=(0, 0, 0, 150),
)
y = box_top
for line in lines:
w = draw.textlength(line, font=font)
x = (width - w) / 2
# outline for readability
for dx, dy in ((-2, 0), (2, 0), (0, -2), (0, 2)):
draw.text((x + dx, y + dy), line, font=font, fill=(0, 0, 0, 220))
draw.text((x, y), line, font=font, fill=(255, 255, 255, 255))
y += line_h
img.save(out_path)
if not scenes:
raise ValueError("no scenes provided")
tmp = tempfile.mkdtemp()
clip_paths = []
for i, scene in enumerate(scenes):
img_path = os.path.join(tmp, f"img_{i}.png")
with open(img_path, "wb") as f:
f.write(scene["image"])
# audio (optional)
audio_bytes = scene.get("audio")
audio_path = None
if audio_bytes:
audio_path = os.path.join(tmp, f"aud_{i}.wav")
with open(audio_path, "wb") as f:
f.write(audio_bytes)
dur = probe_duration(audio_path)
else:
dur = 0.0
dur = max(dur, 2.0) + 0.4 # floor + small tail
cap_path = os.path.join(tmp, f"cap_{i}.png")
make_caption_png(scene.get("caption", ""), cap_path)
clip_path = os.path.join(tmp, f"clip_{i}.mp4")
motion_bytes = scene.get("video")
if motion_bytes:
# ---- LTX motion clip as the base; freeze last frame to fit audio ----
base_path = os.path.join(tmp, f"base_{i}.mp4")
with open(base_path, "wb") as f:
f.write(motion_bytes)
base_dur = probe_duration(base_path)
pad = max(0.0, dur - base_dur)
vf = (
f"[0:v]scale={width}:{height}:force_original_aspect_ratio=increase,"
f"crop={width}:{height},"
f"tpad=stop_mode=clone:stop_duration={pad:.3f},"
f"fps={fps},format=yuv420p[bg];"
f"[bg][1:v]overlay=0:0,format=yuv420p[v]"
)
cmd = [
"ffmpeg", "-y",
"-i", base_path, # 0: motion clip
"-loop", "1", "-i", cap_path, # 1: caption overlay
]
else:
# ---- Ken Burns on a still image ----
frames = max(int(math.ceil(dur * fps)), 1)
if i % 2 == 0:
z = "min(zoom+0.0012,1.18)"
else:
z = "if(eq(on,0),1.18,max(zoom-0.0012,1.0))"
vf = (
f"[0:v]scale={width*2}:{height*2}:force_original_aspect_ratio=increase,"
f"crop={width*2}:{height*2},"
f"zoompan=z='{z}':d={frames}:"
f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)':s={width}x{height}:fps={fps},"
f"format=yuv420p[bg];"
f"[bg][1:v]overlay=0:0,format=yuv420p[v]"
)
cmd = [
"ffmpeg", "-y",
"-loop", "1", "-i", img_path, # 0: scene image
"-i", cap_path, # 1: caption overlay
]
if audio_path:
cmd += ["-i", audio_path] # 2: voiceover
cmd += ["-filter_complex", vf, "-map", "[v]"]
if audio_path:
cmd += ["-map", "2:a", "-af", "apad", "-c:a", "aac", "-b:a", "128k"]
cmd += [
"-t", f"{dur:.3f}",
"-r", str(fps),
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast",
clip_path,
]
subprocess.run(cmd, check=True, capture_output=True)
clip_paths.append(clip_path)
# concat all clips (re-encode for safe joins across slightly differing params)
list_path = os.path.join(tmp, "list.txt")
with open(list_path, "w") as f:
for cp in clip_paths:
f.write(f"file '{cp}'\n")
out_path = os.path.join(tmp, "final.mp4")
subprocess.run(
[
"ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", list_path,
"-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "veryfast",
"-c:a", "aac", "-b:a", "128k",
"-movflags", "+faststart",
out_path,
],
check=True,
capture_output=True,
)
with open(out_path, "rb") as f:
return f.read()
# ---------------------------------------------------------------------------
# Local smoke tests
# ---------------------------------------------------------------------------
@app.local_entrypoint()
def main(mode: str = "image", prompt: str = "a red panda barista making coffee"):
"""
modal run modal_app.py --mode image
modal run modal_app.py --mode tts
modal run modal_app.py --mode video
"""
if mode == "image":
data = Model().generate.remote(prompt, width=768, height=1344)
with open("smoke_image.png", "wb") as f:
f.write(data)
print(f"OK image — {len(data)} bytes -> smoke_image.png")
elif mode == "tts":
en = TTS().synth.remote("Hello there, this is a tricket voiceover test.", "af_heart", "a")
with open("smoke_en.wav", "wb") as f:
f.write(en)
zh = TTS().synth.remote("你好,这是一段中文配音测试。", "zf_xiaobei", "z")
with open("smoke_zh.wav", "wb") as f:
f.write(zh)
print(f"OK tts — en {len(en)}B, zh {len(zh)}B")
elif mode == "video":
# build a tiny 2-scene ad end to end
img1 = Model().generate.remote(
"sleek wireless earbuds floating on a gradient studio background, product shot",
width=768, height=1344,
)
img2 = Model().generate.remote(
"a happy person jogging at sunrise wearing wireless earbuds, lifestyle",
width=768, height=1344,
)
a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a")
a2 = TTS().synth.remote("All-day battery. Crystal-clear calls. Your day, upgraded.", "af_heart", "a")
scenes = [
{"image": img1, "audio": a1, "caption": "Sound that moves with you"},
{"image": img2, "audio": a2, "caption": "Your day, upgraded"},
]
mp4 = assemble_video.remote(scenes)
with open("smoke_video.mp4", "wb") as f:
f.write(mp4)
print(f"OK video — {len(mp4)} bytes -> smoke_video.mp4")
elif mode == "ltx":
img = Model().generate.remote(
"sleek wireless earbuds on a marble pedestal, studio product shot",
width=704, height=1216,
)
clip = Animate().animate.remote(img, prompt="wireless earbuds product shot, slow rotation")
with open("smoke_ltx.mp4", "wb") as f:
f.write(clip)
print(f"OK ltx — {len(clip)} bytes -> smoke_ltx.mp4")
elif mode == "ltxvideo":
# full ad with LTX motion clips
prompts = [
"sleek wireless earbuds floating on a gradient studio background, product shot",
"a happy person jogging at sunrise wearing wireless earbuds, lifestyle",
]
imgs = [Model().generate.remote(p, width=704, height=1216) for p in prompts]
clips = [Animate().animate.remote(imgs[i], prompt=prompts[i]) for i in range(len(prompts))]
a1 = TTS().synth.remote("Meet Aura buds. Sound that moves with you.", "af_heart", "a")
a2 = TTS().synth.remote("All-day battery. Your day, upgraded.", "af_heart", "a")
scenes = [
{"image": imgs[0], "video": clips[0], "audio": a1, "caption": "Sound that moves with you"},
{"image": imgs[1], "video": clips[1], "audio": a2, "caption": "Your day, upgraded"},
]
mp4 = assemble_video.remote(scenes)
with open("smoke_ltxvideo.mp4", "wb") as f:
f.write(mp4)
print(f"OK ltxvideo — {len(mp4)} bytes -> smoke_ltxvideo.mp4")
else:
print(f"unknown mode: {mode}")